简介
AQS(Abstract Queue Synchronizer
)在java.util.concurrent.locks
包下面,是一个用来构建锁和同步器的框架,使用AQS可以简单高效地构造出大量应用广泛的同步器,比如我们提到的 ReentrantLock
,Semaphore
,其他的诸如 ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。
基本原理
AQS内部维护了一个FIFO的CLH队列,用来对获取资源线程的阻塞和排队。 还使用一个 int 成员变量state来表示同步状态,AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。
private volatile int state; // 共享变量,使用volatile修饰保证线程可见性
状态信息通过 protected
类型的getState()
,setState()
,compareAndSetState()
进行操作。
不同的自定义同步器争用共享资源的方式也不同,实际上就是对共享资源state的获取与释放方式进行不同的实现,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
-
tryAcquire(int)
:独占方式。尝试获取资源,成功则返回true,失败则返回false。 -
tryRelease(int)
:独占方式。尝试释放资源,成功则返回true,失败则返回false。 -
tryAcquireShared(int)
:共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 -
tryReleaseShared(int)
:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。 AQS使用了模板方法的设计模式,用户实现自己的同步组件的时候只需要重写以上几个方法,实现自己对state操作的逻辑,然后这些子类重写的方法就会被AQS顶层的一些方法调用去实现线程排队阻塞唤醒等具体操作。实现例子
-
ReentrantLock
:state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。 -
CountDownLatch
:任务分为N个子线程去执行,state也初始化为N,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会唤醒主调用线程,然后主调用线程就会从await()函数返回,继续后续动作。锁的分类
- 独占锁:也就是同一时刻只允许一个线程访问资源,类似写锁。
- 共享锁:允许多个线程同时访问一个资源,类似读锁。
Node节点状态
CANCELLED
(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。SIGNAL
(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。CONDITION
(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。(使用到Condition
时才有等待队列的概念,原本的CLH队列是同步队列)PROPAGATE
(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。共享式同步状态获取将会无条件传播下去。初始值
(0):新结点入队时的默认状态。
负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用
>0
、<0
来判断结点的状态是否正常。
独占锁源码
获取
acquire()
acquire()
方法就是用来获取锁的,ReentrantLock的lock()方法实际上也就是调用AQS这个方法。
public final void acquire(int arg) {
// 先看尝试获取同步状态看是否成功,如果成功则方法结束返回
// 若失败则先调用addWaiter()方法加入到等待队列尾
// 再调用acquireQueued()方法在队列中等待重试竞争锁或休眠
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire()根据tryAcquire()
尝试获得同步状态成功与否做了两件事情:
- 成功,则方法结束返回
- 失败,则先调用
addWaiter()
然后在调用acquireQueued()
方法。
addWaiter()
获取同步状态失败,进行入队操作 当线程获取独占式锁失败后就会将当前线程加入同步队列,那么加入队列的方式是怎样的了?我们接下来就应该去研究一下addWaiter()和acquireQueued()。
addWaiter()
源码如下:
private Node addWaiter(Node mode) {
// 1.将当前线程构建成Node类型
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 2.尾节点是否为null?
Node pred = tail;
if (pred != null) {
// 2.2 将当前节点以尾插的方式插入同步队列中
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//2.1 当前同步队列尾节点为null,说明当前线程是第一个加入同步队列进行等待的线程
enq(node);
return node;
}
分析可以看上面的注释。程序的逻辑主要分为两个部分:
- 当前同步队列的尾节点为null,调用方法enq()插入;
- 当前队列的尾节点不为null,则采用尾插入(先node.prev = tail再compareAndSetTail()方法)的方式入队。
enq()
此时还会有另外一个问题:如果 if (compareAndSetTail(pred, node))
为false怎么办?确实会继续执行到enq()
方法,同时很明显compareAndSetTail是一个CAS操作,通常来说如果CAS操作失败会继续自旋(死循环)进行重试。因此,enq()方法可能承担两个任务:
-
处理当前同步队列尾节点为null时进行入队操作,同时完成头节点初始化;
-
如果CAS尾插入节点失败后负责自旋进行重试,直到成功。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //1. 构造头结点 if (compareAndSetHead(new Node())) tail = head; } else { // 2. 尾插入,CAS操作失败自旋尝试 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
在上面的分析中我们可以看出,如果当前插入的节点是第一个入队的(tail为null时),那么会会先创建头结点,说明同步队列是带头结点的链式存储结构(实际上这个头节点是个傀儡节点,后面可以体会到真正在竞争资源的是老二节点)。带头结点与不带头结点相比,会在入队和出队的操作中获得更大的便捷性,因此同步队列选择了带头结点的链式存储结构。
compareAndSetTail(t, node)方法会利用CAS操作设置尾节点,如果CAS操作失败会在for (;;)
死循环中不断尝试,直至成功return返回为止。
acquireQueued()
现在我们已经知道获取独占式锁失败的线程被包装成Node,然后插入同步队列的过程了。那么紧接着会有下一个问题:进入了同步队列中的节点会做什么事情了来保证自己能够有机会再次尝试获取独占式锁?来看看acquireQueued()
方法,从方法名就可以很清楚,这个方法的作用就是排队来获取锁的过程,源码如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 1.获得当前节点的先驱节点
final Node p = node.predecessor();
// 2.当前节点能否获取独占式锁
// 2.1 如果当前节点的先驱节点是头结点并且成功tryAcquire获取同步状态,即可以获得独占式锁
if (p == head && tryAcquire(arg)) {
// 队列头指针用指向当前节点
setHead(node);
// 释放前驱节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2.2 获取锁失败,线程进入等待状态,尝试park挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
程序逻辑通过注释已经标出,整体来看这是一个这又是一个自旋的过程(for (;;)
),代码首先获取当前节点的先驱节点,如果先驱节点是头结点的并且成功获得同步状态的时候(if (p == head && tryAcquire(arg))),当前节点所指向的线程就能够获取锁。反之,获取锁失败进入等待状态park挂起。整体示意图为下图:
获取锁成功,出队操作
获取锁的节点出队的逻辑是:
// 队列头结点引用指向当前节点,因为头结点是傀儡节点,所以相当于出队了
setHead(node);
// 释放前驱节点
// 拆掉原头结点的next引用,指向它的prev引用在setHead()中已经拆掉
p.next = null; // help GC
failed = false;
return interrupted;
acquireQueued()在自旋过程中主要完成了两件事情:
- 如果当前节点的前驱节点是头节点,并且能够获得同步状态也就是锁的话,头节点出队,当前节点成为新的头结点,方法结束;
- 获取锁失败的话,先将前驱节点状态设置成SIGNAL表示当前节点之后需要被唤醒,然后调用LookSupport.park()方法使得当前线程阻塞(shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())。等待下一次唤醒,一般就是此节点的前驱结点获取到了锁后执行完了他想要的操作,然后要release了,就会去唤醒它(unparkSuccessor())。
setHead()
setHead()方法设置头结点为:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
拆掉原头节点的next的prev,无任何引用方便GC时能够将内存进行回收。示意图如下:
shouldParkAfterFailedAcquire()
那么当获取锁失败的时候会调用shouldParkAfterFailedAcquire()
方法和parkAndCheckInterrupt()
方法,看看他们做了什么事情。shouldParkAfterFailedAcquire()方法源码为:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点为signal状态
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 前驱节点为cancel状态
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 将前驱节点设置为signal,表示当前需要被唤醒
// 相当于给自己设一个闹钟再去睡,这个闹钟会在恰当的时候叫醒自己
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire()方法主要逻辑是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
使用CAS将节点状态由初始值设置成SIGNAL,表示当前线程阻塞。当compareAndSetWaitStatus设置失败则说明shouldParkAfterFailedAcquire方法返回false,然后会在acquireQueued()方法中for (;;)死循环中会继续重试,直至compareAndSetWaitStatus设置节点状态位为SIGNAL时shouldParkAfterFailedAcquire返回true时才会执行方法parkAndCheckInterrupt()
方法,该方法的源码为:
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
//使得该线程阻塞
LockSupport.park(this);
return Thread.interrupted();
}
该方法的关键是会调用LookSupport.park()
方法,该方法是用来阻塞当前线程的。
经过上面的分析,独占式锁的获取过程也就是acquire()方法的执行流程如下图所示:
释放
release()
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 唤醒头节点的后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
如果tryRelease成功的话,就获取头结点h,当判断到头节点不为null且状态不为0,就使用unparkSuccessor()
唤醒头结点的下一个节点
h != null && h.waitStatus != 0的意思
当一个head节点的waitStatus
为0说明什么呢,说明这个head节点后面没有在挂起等待中的后继节点了(如果有的话, head的ws就会被后继节点设为Node.SIGNAL
了), 自然也就不用执行 unparkSuccessor
操作了.
unparkSuccessor()
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取头节点的后继节点
Node s = node.next;
// 此后继节点为null或处于cancel状态
if (s == null || s.waitStatus > 0) {
s = null;
// 通常情况下, 要唤醒的节点就是自己的后继节点
// 如果后继节点存在且也在等待锁, 那就直接唤醒它
// 但是有可能存在 后继节点是取消等待锁(ws>0)的情况
// 此时从尾节点开始向前找起, 直到找到距离head节点最近的ws<=0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 后继节点不为null时唤醒节点对应线程
LockSupport.unpark(s.thread);
}
首先获取头节点的后继节点,当后继节点的时候会调用LookSupport.unpark()方法,该方法会唤醒该节点的后继节点所包装的线程。因此,每一次锁释放后就会唤醒队列中该节点的后继节点所引用的线程,从而进一步可以佐证获得锁的过程是一个FIFO(先进先出)的过程。
独占锁总结
通过学习源码的方式非常深刻的学习到了独占式锁的获取和释放的过程以及同步队列。可以做一下总结:
- 线程获取锁失败,线程被封装成Node进行入队操作,核心方法在于addWaiter()和enq(),同时enq()完成对同步队列的头结点初始化工作以及CAS操作失败的重试;
- 线程获取锁是一个自旋的过程,当且仅当 当前节点的前驱节点是头结点并且成功获得同步状态时,节点出队即该节点引用的线程获得锁,否则,当不满足条件时就会调用LookSupport.park()方法使得线程阻塞;
- 释放锁的时候会唤醒后继节点,也就是在unparkSuccessor()中使用LookSupport.unpark();
总体来说:在获取同步状态时,AQS维护一个同步队列,获取同步状态失败的线程会加入到队列中进行自旋;移除队列(或停止自旋)的条件是前驱节点是头结点并且成功获得了同步状态。在释放同步状态时,同步器会调用unparkSuccessor()方法唤醒后继节点。
共享锁源码
获取
acquireShared()
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared返回值是一个int类型,当返回值为大于等于0的时候方法结束说明获得成功获取锁,否则,表明获取同步状态失败即所引用的线程获取锁失败
doAcquireShared()
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 当前节点的前驱节点是头结点并且tryAcquireShared()返回值大于等于0即表示能成功获得同步状态。
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 当该节点的前驱节点是头结点且成功获取同步状态
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这段逻辑和独占锁的accquireQueued()
的大同小异,只是对获取到同步状态的判断有点不同而已(见上方注释)。而且由于是共享式,会有多个线程同时获取到线程,也可能同时释放线程,空出很多同步状态,所以当排队中的老二获取到同步状态,如果还有可用资源,会继续传播下去(通过setHeadAndPropagate()
方法)。
setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 设置新的头节点就是为了让获取共享资源的操作传播下去
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
// 如果下个节点为null或是共享的就进行释放
if (s == null || s.isShared())
doReleaseShared();
}
}
这个方法就是设置了新的头结点,就是为了让获取资源这个操作传播下去,直到资源都被用完了(因为只有前驱节点为头结点的节点才可以去tryAcquireShared),如果下个节点为空节点或也是共享节点就要调用doReleaseShared释放,但是实际上是为了唤醒后续的节点,使其也来拿锁。
释放
releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
这个就没啥好说的,就尝试释放,成功了就doReleaseShared。
doReleaseShared()
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后续节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
这段方法跟独占式锁释放过程有一点点不同,在共享式锁的释放过程中,持有同步状态的线程可能有多个,必须保证多个线程能够安全的释放同步状态,这里采用的CAS保证,当CAS操作失败continue,在下一次循环中进行重试。
细节
为什么从后往前遍历?
unparkSuccessor()方法中的遍历是从尾节点开始的,那为什么不从头开始呢? 看到入队方法addWaiter()中有这么一段
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
可以看到是先设置了prev指针,再通过CAS去设置tail,成功后再设置next指针,所以如果在遍历过程中,恰好有一个节点入队,但是只设置了prev指针,在CAS操作之后t.next=node
之前,这时候切换上下文到别的线程,那么从头往后遍历就会因为没有next指针而漏掉这个节点以及其之后的节点,从后往前因为有prev指针而不会漏掉。可以看图:
interrupted变量的作用
在源码中,我们随处可见一个叫interrupted的变量,从字面意义上来看它就是用来标志当前线程是否被终端的,但是它其实只是一个标志位,并不能真正强行进行强行中断线程,具体进行什么操作还是线程自己决定的(这样有利于资源回收,和Thread类中的中断标志位如出一辙)。 那具体怎么发挥作用的呢? 看到parkAndCheckInterrupt()方法,最后是return了Thread.interrupted(),这个方法是返回线程的中断标志位,并清除它。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 在这里被挂起了, 唤醒之后就能继续往下执行了
return Thread.interrupted();
}
如果这里返回true,那么返回来到下面的语句(acquireQueued()方法)
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
那么if判断为true,来到语句interrupted = true;
,因为这里是for (;;)
死循环,要抢到锁才返回,所以就继续下一次循环直到抢到锁然后会返回interrupted
,acquireQueued()方法结束,再返回到acquire()方法
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
此时上面这个大if为true,就执行selfInterrupt();,源码如下:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
这个方法就是自我中断了。
那么这么一圈下来,它的意义是什么?
当我们从park(this)
处被唤醒,我们并不知道是因为什么原因被唤醒,可能是因为别的线程释放了锁,调用了unpark(s.thread)
,也有可能是因为当前线程被其他线程调用interrupt()方法中断了park(),因此我们通过Thread.interrupted()
方法检查了当前线程的中断标志,并将它记录下来,在我们最后返回acquire()
方法后,如果发现当前线程曾经被其它线程中断过,那我们就把当前线程再中断一次。
为什么要这么做呢?
从上面的代码中我们知道,即使线程在等待资源的过程中被中断唤醒,它还是会不依不饶的继续抢锁,直到它抢到锁为止。也就是说,当线程处于等待队列中时,是不去响应外部的中断请求的,仅仅是记录下自己被人中断过。 最后,当它抢到锁返回了,如果它发现自己曾经被中断过,它就再中断自己一次,将这个中断补上。
注意,中断对线程来说只是一个建议,一个线程被中断只是其中断状态被设为true
,线程可以选择忽略这个中断,中断一个线程并不会影响线程的执行,当然也可以自己决定响应这个中断要去执行的操作。
总结
AQS
是JUC中很多同步组件的构建基础,简单来讲,它内部实现主要是状态变量state和一个FIFO队列来完成,同步队列的头结点是当前获取到同步状态的结点,获取同步状态state失败的线程,会被构造成一个结点(或共享式或独占式)加入到同步队列尾部(采用自旋CAS来保证此操作的线程安全),随后线程会阻塞;释放时唤醒头结点的后继结点,使其加入对同步状态的争夺中。
AQS为我们定义好了顶层的处理实现逻辑,我们在使用AQS构建符合我们需求的同步组件时,只需重写tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由AQS为我们完成了,这也是非常典型的模板方法的应用。AQS定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现。
参考
https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html
https://github.com/CL0610/Java-concurrency
https://segmentfault.com/a/1190000015752512