内存一致性

在多核处理器中,每个核心都有自己的缓存,线程在履行时可能会读取和修正同享的内存数据。假如没有内存一致性确保,不同核心上的线程可能会看到不同的内存值,导致数据不一致的问题。

其实也便是常说的原子性、可见性、有序性

解决方案:

  1. 运用volatile关键字:将同享变量声明为volatile,能够确保对该变量的操作当即改写到主内存,而且对该变量的操作都从主内存中获取最新的值。
  2. 运用synchronized关键字或Lock:经过同步机制来确保多线程之间的原子性和有序性。synchronized关键字和Lock都能够创立一个临界区,确保同一时间只有一个线程能够进入该临界区。
  3. 运用原子类:Java提供了一些原子类,例如AtomicInteger、AtomicLong等,它们提供了根本的原子操作,能够避免多线程竞争的问题。

指令重排序是什么?

如下指令的重排导致例如在双检索中,第一次查看条件满足而直接回来instance实例,但实际上该目标的结构还未履行完毕

instance = new Singleton();
句子优化成如下的序列:
1new分配指针2、instance 赋值;
3、调用Singleton结构函数。

你是否了解内存屏障?

举个简单的例子认识下为什么内存模型进行优化的思路,比方x\y都进行写操作,其次序对全体无影响性,那么其乱序履行对终究成果无干扰,亦或是x\y进行无相关的读写操作。全体上来说乱序的进步比其带来的影响大的多,所以引入内存屏障来解决特殊情况下的问题

x = 10;
y = 1;
// 此刻尽管x\y无相关,但假如a写入,比y读取a时更晚,那y读到的将是初始值0
a = 1;
x = 10;
y = a;

# 要是不理解,那就再看看这篇,前面内容能够快速看,我认为精华在中心以后部分

关于指令排序,无非便是store(写),load(读)的排列组合:s-s(sfence),s-l(mfence),l-l(lfence),l-s,其中fence(栅栏),能够理解为屏障

  • 写volatile变量v之后,插入一个sfence,禁用跨sfence的store重排序;且sfence之前修正的值都会被写回缓存,并符号其他CPU中的缓存失效
  • 读volatile变量v之前,插入一个lfence,禁用跨lfence的load重排序;且lfence之后,会首先改写无效缓存,然后得到最新的修正值,与sfence合作确保内存可见性。

你是否了解final关键字?

  1. 常量:一旦变量被赋值,就不能再修正它的值
  2. 不行承继:final 能够用来润饰类,表明该类不能被承继
  3. 不行重写:final 能够用来润饰办法,表明该办法不能被子类重写(覆盖)
  4. 进步性能:在某些情况下,运用 final 关键字能够让编译器对代码进行优化,进步程序的性能。

JVM会在初始化final变量<init>()后插入一个sfence,确保final字段初始化之前(include)的内存更新都是可见的。

锁是如何完结可见性的

对总线和缓存上锁,具有mfence语义

再谈线程

线程池提交使命有哪些办法?

  1. execute(Runnable task): 提交一个Runnable使命给线程池。该办法没有回来值。

  2. submit(Runnable task): 提交一个Runnable使命给线程池,并回来一个表明该使命的Future目标。能够经过Future目标来获取使命的履行成果。

  3. submit(Callable<T> task): 提交一个Callable使命给线程池,并回来一个表明该使命的Future目标。能够经过Future目标来获取使命的履行成果。

  4. invokeAny(Collection<? extends Callable<T>> tasks): 提交一个Callable使命调集给线程池,回来其中一个使命的履行成果。当有一个使命成功履行完毕后,该办法就会回来。

  5. invokeAll(Collection<? extends Callable<T>> tasks): 提交一个Callable使命调集给线程池,回来一切使命的履行成果。当一切使命都履行完毕后,该办法才会回来。

总结:execute便是普通的提交,submit会回来Future,其get()获取成果时会堵塞,invoke提交一个调集,根据需求挑选回来的时机(注意会堵塞)

线程sleep和yield办法的差异?

Thread.sleep()Thread.yield()办法都能够用于线程的操控,但它们有一些差异。

  1. Thread.sleep(): 暂停线程让渡CPU,休眠指定固定时长,但锁不开释,可能抛出InterruptedException反常

  2. Thread.yield(): CPU让渡,运行态->安排妥当态,阻滞时长取决于CPU调度器,只能在同步代码块中运用

调用目标hashcode办法,对锁有什么影响?

Java目标的目标头里包括两个部分,一个是Mark Word,另一个是类型指针。而关于Mark Word而言,经过精密的区分,其在64 bit的空间内能够变着花样的存储各类信息。而Hash Code正是其中之一

一文搞定面试 | Java并发编程-基础篇(二)

结论: 一个目标在调用原生hashCode办法后(来自Object的,未被重写过的),该目标将无法进入倾向锁状况,起步就会是轻量级锁。若hashCode办法的调用是在目标已经处于倾向锁状况时调用,它的倾向状况会被当即撤销,而且锁会升级为重量级锁。 hashCode 存储在当前持有锁的线程内部,解锁时便是恢复至加锁前的目标头状况

没看理解的,能够详细看看这篇

AQS模型与线程同步东西

CountDownLatch

CountDownLatch经过一个计数器来完结线程之间的同步。在创立CountDownLatch目标时,需求指定一个初始计数值。当一个线程完结了自己的使命后,能够调用CountDownLatch的countDown()办法,将计数值减1。其他等候的线程能够调用CountDownLatch的await()办法来等候计数值变为0。当计数值变为0时,一切等候的线程会被唤醒,持续履行。

import java.util.concurrent.CountDownLatch
fun main() {
  val numThreads = 5
  val latch = CountDownLatch(numThreads)
  for (i in 0 until numThreads) {
    val thread = Thread(Worker(latch))
    thread.start()
  }
  // 等候一切线程完结使命
  latch.await()
  println("一切线程已完结使命,持续履行主线程")
}
class Worker(private val latch: CountDownLatch) : Runnable {
  override fun run() {
    try {
      // 模仿线程履行使命
      Thread.sleep((Math.random() * 1000).toLong())
      println("线程${Thread.currentThread().id}完结使命")
            // 计数器减1
      latch.countDown()
    } catch (e: InterruptedException) {
      e.printStackTrace()
    }
  }
}

首先在结构时传入计数器初始值

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

当线程作业完结时,调用countDown(),对计数器-1,当-1后为0的话,需求进行doReleaseShared()开释,其中经过LockSupport.unpark进行对堵塞线程的唤醒

public void countDown() {
    sync.releaseShared(1);
}
// AQS
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
// CountDownLatch.Sync
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

等候的线程去获取锁,当state为0时,即不会堵塞,不然doAcquireSharedNanos中,经过LockSupport.parkNanos进行堵塞

// CountDownLatch
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// AQS
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}
// CountDownLatch.Sync
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

CyclicBarrier

CyclicBarrier也经过一个计数器来完结线程之间的同步。在创立CyclicBarrier目标时,需求指定一个参与同步的线程数量。当一个线程抵达了CyclicBarrier的屏障点时,会调用CyclicBarrier的await()办法进行等候。当一切参与线程都抵达了屏障点时,CyclicBarrier会开释一切等候的线程,然后重置计数器,能够持续运用。

import java.util.concurrent.CyclicBarrier
fun main() {
    val numThreads = 5
    val barrier = CyclicBarrier(numThreads) {
        println("一切线程已抵达屏障点,持续履行主线程")
    }
    for (i in 0 until numThreads) {
        val thread = Thread(Worker(barrier))
        thread.start()
    }
}
class Worker(private val barrier: CyclicBarrier) : Runnable {
    override fun run() {
        try {
            // 模仿线程履行使命
            Thread.sleep((Math.random() * 1000).toLong())
            println("线程${Thread.currentThread().id}抵达屏障点")
            // 等候其他线程抵达屏障点
            barrier.await()
            // 一切线程抵达屏障点后持续履行后续操作
            println("线程${Thread.currentThread().id}持续履行")
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }
    }
}

结构获取了需求计数器和同步屏障时需求履行的Runnable,内部借助ReentrantLock,差异于CountDownLatch是内部完结了AQS的Sync。command.run()为同步屏障Runnable,breakBarrier()为屏障开释,不然trip.await()进行休眠。差异于CountDownLatch的LockSupport.park(),这儿运用的是Lock.condition.await()\signalAll()

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        int index = --count;
        // 当计数器为0时,履行内容并开释屏障
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }
            if (g.broken)
                throw new BrokenBarrierException();
            if (g != generation)
                return index;
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

ReadWriteLock

ReentrantReadWriteLock 经过 AQS 完结了读写锁的高效管理,读锁能够同享,写锁是独占的,能够满足读多写少的并发需求

写操作(独占锁)的时候,需求判别是否有读操作在进行。这是由于在读操作期间,写操作是不允许的,需求等候一切的读操作完结后才干进行写操作,以确保数据的一致性。

ReentrantReadWriteLock经过保护一个计数器来记载当前获取读锁的线程数量。当有线程获取读锁时,计数器会增加;当线程开释读锁时,计数器会削减。经过查看计数器是否为0,能够判别是否有读操作在进行。

在写操作获取写锁之前,会先查看计数器是否为0,假如不为0,阐明有读操作在进行,写操作就会被堵塞,直到计数器为0,即一切读操作都完结,才干获取写锁进行写操作。

import java.util.concurrent.locks.ReentrantReadWriteLock
class ReadWriteLockExample {
    private val lock = ReentrantReadWriteLock()
    private var value: Int = 0
    fun writeValue(newValue: Int) {
        lock.writeLock().lock()
        try {
            // 写操作,对同享数据进行更新
            value = newValue
        } finally {
            lock.writeLock().unlock()
        }
    }
    fun readValue(): Int {
        lock.readLock().lock()
        try {
            // 读操作,对同享数据进行读取
            return value
        } finally {
            lock.readLock().unlock()
        }
    }
}

看一下读锁,是对同享资源的操作,当没有写操作时,是允许多线程一起读,当发生写操作时,会获取失利,进入等候状况

public static class ReadLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquireShared(1);
    }
    public void unlock() {
        sync.releaseShared(1);
    }
}
// AQS
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        // 测验获取失利的话,会进入等候状况
        doAcquireShared(arg);
}
// 对计量器c,进行位拆分
// 同享部分
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
// 独占部分
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// Sync
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 只需exclusiveCount独占资源没被占有 且 写线程不为当前线程
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

再看写锁,是对独占资源的操作,这儿就越过AQS的部分调用,直接看Sync部分了。当需求进行写操作时,由于写操作的互斥性,计数器需求为0,不然即意味着读\写位有计数。其中关于读写发生堵塞时都依靠AQS内部的LockSupport.park()\unpark(),而实际上Lock.condition.await()终究也依靠于AQS的ConditionObject,其本质是一样的,差异于Object.wait()\notify(),不需求与特定目标关联,也不会出现 IllegalMonitorStateException 反常,由于 LockSupport 运用了许可证的方式来进行线程的堵塞和唤醒。

public static class WriteLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquire(1);
    }
    public void unlock() {
        sync.release(1);
    }
}
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 这儿判别c是写仍是读,假如没有写的话,那便是读,需求等候。假如是读,那就看读是不是本线程,进行重入
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}