导言
AQS (AbstractQueuedSynchronizer) 是一个供给了锁和同步器根底功用的结构。
本文测验经过以 ReentrantLock 为比如,去介绍 AQS 这个结构是怎么从结构的角度上,去辅佐完成同步机制的。
从源码的角度上,侧重介绍 AQS 中的 acquire 模版办法里边涉及到的整个流程。
这儿并不会去过分去介绍 ReentrantLock 的特性,仅仅一个辅佐解说 AQS 的引子。
为什么我会挑选
acquire这个办法捏?
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
// 公正锁
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}
咱们能够看到 ReentrantLock,无论是 公正锁还是非公正锁,里边都会出现这个 acquire 办法的调用。
由此可见,里边很有或许就包含着咱们想要知道的全部。
注意:本文是依据
JDK8分析的,在高版本中源码的写法或许不一样。
acquire
public final void acquire(long arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
在 acquire 办法中,咱们能够看到 首要的中心办法有:
- tryAcquire
- addWaiter
- acquireQueued
下面就带着大家一同分析这三个办法到底在整个流程是干什么的。
tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
在 AQS 中,tryAcquire 办法是直接抛反常。
相信读者也能从许多结构源码中看到很多类似的,一个父类办法写的是直接一个抛反常。
这样的写法结构的作者通常是考虑:
- 不需求一切的子类都完成。
- 假如有的子类没有自己完成就调用了,那么我应该直接给他抛反常,完成
fail-fast机制。
快去复习一下
fail-fast和fail-save两种机制!
addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
测验增加 Node 节点到 tail 尾部。
假如队列为空,才会去走 enq 办法。
源码注释中提到了 fast path 这个词,addwaiter 这儿考虑的状况是 尾节点 tail 不为空的状况。
- 假如
tail不为空,那么中心逻辑就只存在addWaiter这儿。 - 假如
tail为空,那么中心逻辑会去走enq,enq里边评论了 空 和 非空 两种状况。
这儿仅仅一个 “偷鸡” ,fast path 的测验。
enq
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 队尾为 null 则经过 CAS 把自己增加到队尾
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 发现不为空表明这儿已经出现并发竞赛,有一个线程成功把他加入到队尾了
// 那么我这个线程就只能在他后边加入了
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueue
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
// 只要前驱节点是 head 才干再测验 tryAcquire 去获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 上面测验获取锁失败了走到这儿,决定当时线程是否该堵塞
// p 和 前驱节点,也便是依据 前驱节点 判别 当时线程是否堵塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
CLH 锁机制
这儿咱们能够看到只要 前驱节点 是 head,才有资历进行 tryAcquire 去获取锁,并不是一切在同步队列的线程,都有机会去获取锁。AQS 所选用的这种机制,其实是 CLH 锁的一种变种。
CLH锁是一种简略高效的 自旋锁,得名于其发明者 Craig, Landin 和 Hagersten。
它依据 链表,其每个节点代表一个企图获取锁的线程。
在 CLH 锁中,线程不会直接对锁进行自旋等候,而是在其前驱节点的一个副本上自旋,这样减少了对共享变量的访问,然后降低了体系的总线和缓存压力。
要说 CLH 好,那么它好在哪里?它又处理了什么问题?
-
在没有
CLH的时分,10个线程过来,经过自旋去获取锁。 -
有了
CLH,10个线程过来,只要1个线程会自旋去获取锁。
自旋的成本,就能够大大的降下来了。所以咱们会说 CLH 是一种 高效的自旋锁。
那么就会有人会问:保护竞赛的线程,
addWaiter也是经过CAS将线程增加到队尾保护的呀,不也是自旋嘛?
好好好,当你注意到这点的时分,我只能说你很细,可是还不够!
需求注意的是在 addWaiter 自旋,它的竞赛目标是 前驱线程目标,由于我要把自己加到前驱目标的后边。
可是另外一个是详细的一个 共享资源目标,这两者的 竞赛压力是彻底不一样的。
而且当我 CLH 线程排好队了之后,以后的竞赛压力就大大减少,性能就愈加高效,所以咱们说他是一个好东西。
CLH 是天然的公正锁,非公正锁怎么办?
在前面咱们已经知道了 CLH 是一种 公正锁 的机制。AQS 作为一种通用的同步结构,那是不是意味着一切的子类都是公正锁呢?那么咱们为什么说 ReentrantLock 有公正和非公正两种形式。
直接上源码!
ReentrantLock的 公正形式 是经过 里边的 sync 类型来完成的:
-
公正锁:
FairSync -
非公正锁:
NonfairSync
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
咱们再去看两者有啥区别:
// 非公正锁
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
// 公正锁
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}
咱们能够很清楚地看到,非公正锁很 “鸡贼” 地在提前去经过 CAS 就进行获取锁了。
它先不走 acquire 那套模版办法,所以这儿就表现了 非公正锁 的非公正之处。
可是假如这次这个 非公正锁 没有成功获取到锁,那么它也是会被扔到同步队列里边去,让他 乖乖地排队去排队自旋。
分布式锁羊群效应
CLH 其实跟分布式锁中处理 羊群效应 非常类似。
羊群效应:在分布式锁体系中,当多个恳求一同到达时,它们或许会构成一种“群集”,或在某个资源(例如数据库或服务)上排队。
以 ZooKeeper 举比如,当一个持有分布式锁的节点开释锁时,它会在 ZooKeeper 中更新对应的节点状况。这个状况更新会被其他正在等候锁的节点调查到,然后触发它们简直一同去测验获取锁。
Curator 是一个优秀的 ZooKeeper 客户端结构,它是经过 次序节点 和 最小节点调查 的办法去处理这种羊群问题。
原理上跟咱们这儿的 CLH 思想是一致的:我只关心前驱节点,而不是详细的共享资源目标。
由此咱们还能够联想到:push 和 pull 两种负载均衡模型他们各自的特色。
由于该内容并不是本文首要介绍目标,所以这儿就点到为止。
shouldParkAfterFailedAcquire
/**
** Node 节点状况
**/
static final class Node {
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点状况是 SIGNAL 的话,当时线程能够直接挂起
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 前驱节点状况 > 0,也便是 CANCELLED,会一直往前去找非 CANCELLED 的节点
// 把自己的前驱指向它
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 经过 CAS 办法将前驱节点修改为 SIGNAL,让他来通知我当时线程
// 我当时线程睡着就能够了
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
依据前驱节点的状况,来判别我当时线程是否应该 park,也便是堵塞,前驱节点的取值便是要看 waitStatus。
waitStatus 的取值有 4 种,咱们这边只针对以 ReentrantLock 为比如,由于其他的涉及到一些 读写锁 的,样例不够通用,欠好进行阐释。因此咱们这儿只针对 ReentrantLock 举比如,那么 waitStatus 就有:
-
SIGNAL:假如前驱节点是SIGNAL,当时驱节点 获取到锁 或 开释锁 时,会唤醒我当时节点,因此我当时节点直接挂起即可,让出 CPU 资源。
能够幻想这么一个场景,小明和你一同去问老师问题(锁获取),可是老师每次只能处理一个学生,那么小明跟你说,我先去问问题,等我问完(锁开释),就来告知你(唤醒你),你先睡一会吧(当时线程挂起),这样就会更好懂一点。
-
CANCELLED:假如是前驱节点是CANCELLED,代表前驱节点是一个 由于中止Interrupt而放弃竞赛的线程,那么我这个节点的前驱节点要往前一直修改去 找到前面是有用的节点,也便是 非CANCELLED的前驱节点。 -
其他的:关于其他的状况值,会经过
CAS,将前驱节点修改为SIGNAL,让他能够通知我当时这个节点。
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
能够看到 AQS 这儿是经过 LockSupport 来堵塞当时线程的。
为什么这儿用的是 LockSupport.park 而不必 Object.wait
LockSupport.park供给了一种更灵敏、简略且有用的办法来操控线程的堵塞和唤醒。 特别适合于构建杂乱的同步工具和用于高级并发场景。 相比之下,Object.wait和notify尽管也是有用的同步机制,但在运用上更为杂乱和受限。
不需求持有锁
-
LockSupport:
LockSupport.park不需求线程持有任何特定的锁。这意味着它能够在任何上下文中被运用,而无需忧虑死锁或许恪守特定的锁协议。 -
Object.wait:相比之下,
Object.wait必须在同步块(即持有目标锁的范围内)中调用,否则会抛出IllegalMonitorStateException反常。
Object lock = new Object();
synchronized (lock) {
// 必须在同步块内调用 wait,确保持有 lock 的锁
lock.wait();
}
假如你没有拿到锁,然后你调用了 Object.wait 办法,那么就会抛出 IllegalMonitorStateException 反常。
LockSupport 无需忧虑这个,因此它减少了死锁的风险,简化了一些编码,我并不需求去忧虑我是否当时线程能否拿到锁。
所以说 LockSupport 很适合 同步工具和 一些并发场景。
permit 许可机制:答应先 unpark 然后再 park
-
LockSupport:
LockSupport供给了一种“许可”机制,其间unpark操作能够预先产生,为后续的park操作“积累”一个许可。这种机制使得线程操控愈加精密。 -
Object.wait / notify :
Object.wait依赖于notify或notifyAll办法来唤醒等候的线程。假如wait在notify之后产生,线程仍然会堵塞,直到另一个notify或notifyAll被调用。
Thread thread = Thread.currentThread();
// 提前 unpark
LockSupport.unpark(thread);
// 后续的 park 会当即回来,由于已经有一个可用的许可
LockSupport.park();
System.out.println("Thread continued execution");
简化的运用
-
LockSupport:
LockSupport的运用相对简略,不需求与特定的目标或锁关联。 -
Object.wait / notify:
Object.wait和notify必须在特定目标的上下文中运用,而且需求正确处理锁和反常,这使得它们的运用愈加杂乱。
总结
-
AQS供给了一系列的模版办法aquire、tryaquire、aquireQueue,来帮助咱们完成同步机制。 -
AQS内保护的同步队列中的大量操作,都是经过CAS去完成的。 -
AQS内的CLH锁机制供给了一种 高效的自旋锁 完成。
拥有 AQS,咱们就能够经过对 state 的语义不同,亦或许去重写某个办法,然后完成更为灵敏,愈加强大的锁:
-
ReentranLock:可重入锁。 -
ReentrantReadWriteLock:读写锁。 -
CountDownLatch:异步转同步,需求等候的。 - …
下篇文章就讲 AQS 的完成类是怎么完成自己的特性!
来都来了,点个赞再走吧彦祖,这对我来说非常重要!

