本文我们来聊一聊 AQS。
文章开始之前,我们先来思考一个问题:Java 中提供了 synchronized
关键字来保证只有一个线程能够访问同步代码块。既然已经提供了 synchronized
关键字,那为何在 Java 的 SDK 包中,还会提供 Lock 接口呢?这是不是重复造轮子,多此一举呢?
让我们一起带着疑问往下看。
很多小伙伴可能会听说过,在 Java 1.5 版本中,synchronized
的性能不如 Lock,但在 Java 1.6 版本之后,synchronized
做了很多优化,性能提升了不少。那既然 synchronized
关键字的性能已经提升了,那为何还要使用 Lock 呢?
我们常说:「避免重复造轮子」,如果有了轮子还是要坚持再造个轮子,那么肯定传统的轮子在某些应用场景中不能很好的解决问题。
如果我们向更深层次思考的话,就不难想到了:我们使用 synchronized
加锁是无法主动释放锁的,这就会涉及到死锁问题。
不知各位是否还记得,产生死锁的四个必要条件:
{P1,P2,…,Pn}
,其中 P1 等待 P2 占有的资源,P2 等待 P3 占有的资源,…,Pn 等待 P1 占有的资源,形成一个进程等待环路,环路中每一个进程所占有的资源同时被另一个申请,也就是前一个进程占有后一个进程所申请的资源。只要系统发生死锁,这些条件必然成立,而只要上述条件之一不满足,就不会发生死锁。
如果我们的程序使用 synchronized
关键字发生了死锁,主要是因为无法破坏「不可剥夺」 这个条件。
要想破坏这个条件,就需要具备申请不到进一步资源,就释放已有资源的能力。
很显然,这个能力是 synchronized
不具备的,使用 synchronized
如果线程申请不到资源就会进入阻塞状态,我们做什么也改变不了它的状态,这是 synchronized
的致命弱点。
对于「不可剥夺」这个条件,我们希望占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了,就可以防止死锁的产生。
旧轮子有弱点,新轮子就要解决这些问题,所以要具备不会阻塞的功能,下面我们就来看下 Lock 是如何解决死锁问题的。
一般来说,破坏「不可剥夺」条件有三种方式:
synchronized
的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可剥夺条件了。体现在 Lock 接口上,就是 Lock 接口提供的三个方法,如下所示。
// 支持中断的API
void lockInterruptibly() throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();
好的方案有了,但鱼和熊掌不可兼得,Lock 多了 synchronized
不具备的特性,自然不会像 synchronized
那样一个关键字三种玩法走遍全天下,在使用上也相对复杂了。
public synchronized void normalSyncMethod() {
Lock lock = new ReentrantLock();
lock.lock();
try {
} finally {
lock.unlock();
}
}
lock.unlock()
放到 finally 语句块
,保证当前线程执行过程中出现异常时,锁依然能被释放掉,避免死锁的产生。try{}
外面获取锁,在 try-finally
外加锁,如果因为异常导致加锁失败,try-finally
块中的代码不会执行。如果在 try{}
代码块中加锁失败,finally 中的代码无论如何都会执行,但是由于当前线程加锁失败并没有持有 lock 对象锁 ,所以程序会抛出异常。如果你熟悉 synchronized
,你知道程序编译成 CPU 指令后,在临界区会有 moniterenter
和 moniterexit
指令的出现,可以理解成进出临界区的标识。
从范式上来看:
lock.lock()
获取锁,“等同于” synchronized
的 moniterenter指令
lock.unlock()
释放锁,“等同于” synchronized
的 moniterexit 指令
那 Lock 是怎么做到的呢?
这里先简单说明一下,这样一会到源码分析时,你可以远观设计轮廓,近观实现细节,会变得越发轻松。
其实很简单,比如
ReentrantLock
内部维护了一个 volatile 修饰的变量 state,通过 CAS 来进行读写(最底层还是交给硬件来保证原子性和可见性),如果 CAS 更改成功,即获取到锁,线程进入到 try 代码块继续执行;如果没有更改成功,线程会被【挂起】,不会向下执行。
但 Lock 是一个接口,里面根本没有 state 这个变量的存在:
那它又是怎么处理这个 state 呢?很显然需要一点设计的加成了,接口定义行为,具体都是需要实现类的。
Lock 接口的实现类基本都是通过【聚合】了一个【队列同步器】的子类完成线程访问控制的。
那什么是队列同步器呢?我们下面开始进入正题。
队列式的同步器 (AbstractQueuedSynchronizer) 简称:AQS。它提供了「把锁分配给谁」这一问题的一种解决方案,使得锁的开发人员可以将精力放在「如何加解锁上」,避免陷于把锁进行分配而带来的种种细节陷阱之中。
例如 JUC 中,如 CountDownLatch
、Semaphore
、ReentrantLock
、ReentrantReadWriteLock
等并发工具,均是借助 AQS 完成他们的所需要的锁分配问题。
我们通过下面的架构图来整体了解一下 AQS 框架:
下面我们会从整体到细节,从流程到方法逐一剖析 AQS 框架。
AQS 核心思想是:
CLH:Craig、Landin and Hagersten 队列,是单向链表,AQS 中的队列是 CLH 变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
主要原理图如下:
AQS 使用一个 volatile
的 int 类型的成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取的排队工作,通过 CAS 完成对 state 值的修改。
下面先来看下 AQS 中最基本的数据结构——Node,Node 即为上面 CLH 变体队列中的节点。
AQS 内部维护了一个同步队列,用于管理同步状态。
为了将上述步骤弄清楚,我们需要来看一看 Node 结构。
乍一看有点杂乱,我们解释一下几个方法和属性值的含义:
waitStatus
:当前节点在队列中的状态thread
:表示处于该节点的线程prev
:前驱指针predecessor
:返回前驱节点,没有的话抛出 npenextWaiter
:指向下一个处于 CONDITION
状态的节点next
:后继指针线程两种锁的模式:
SHARED
:表示线程以共享的模式等待锁EXCLUSIVE
:表示线程正在以独占的方式等待锁waitStatus
有下面几个枚举值:
CANCELLED(1)
:当前节点取消获取锁。当等待超时或被中断(响应中断),会触发变更为此状态,进入该状态后节点状态不再变化;SIGNAL(-1)
:后面节点等待当前节点唤醒;CONDITION(-2)
:condition 中使用,当前线程阻塞在 condition。如果其他线程调用了 condition 的 signal 方法,这个结点将从等待队列转移到同步队列队尾,等待获取同步锁;PROPAGATE(-3)
:当前线程处在 SHARED
情况下,该字段才会使用。在了解数据结构后,接下来了解一下 AQS 的同步状态—— state。
AQS 中维护了一个名为 state 的字段,意为同步状态,是由 volatile 修饰的,用于展示当前临界资源的获锁情况。
private volatile int state;
下面提供了几个访问这个字段的方法:
protected final int getState()
:获取 state 的值protected final void setState(int newState)
:设置 state 的值protected final boolean compareAndSetState(int expect, int update)
:使用 CAS 方式更新 state这几个方法都是 final 修饰的,说明子类中无法重写它们。我们可以通过修改 state 字段表示的同步状态来实现多线程的「独占模式」和「共享模式」(加锁过程)。
前置知识基本铺垫完毕,我们下面以 ReentrantLock
为例, 分析 AQS 如何实现独占锁。
ReentrantLock
意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。ReentrantLock
支持公平锁和非公平锁,公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析 AQS 获取独占锁的方式。
在非公平锁中,有一段这样的代码:
static final class NonfairSync extends ReentrantLock.Sync {
...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}
看下 acquire()
方法。
public final void acquire(int arg) {
// 调用自定义同步器重写的 tryAcquire 方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
再看一下 tryAcquire
方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这里只是 AQS 的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的。
如果该方法返回了 true
,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。
下面我们分析下线程是何时以及怎样被加入进等待队列中的。
加入队列的时机:当执行 acquire(1)
时,会通过 tryAcquire
获取锁。在这种情况下,如果获取锁失败,就会调用 addWaiter
加入到等待队列中去。
如何加入队列:通过调用 addWaiter
方法构造 Node 节点(Node.EXCLUSIVE
独占式)并安全的加入到同步队列的「尾部」。
具体实现方法如下:
private Node addWaiter(Node mode) {
// 构造Node节点,包含当前线程信息以及节点模式【独占/共享】
Node node = new Node(Thread.currentThread(), mode);
// 新建变量 pred 将指针指向tail指向的节点
Node pred = tail;
// 如果尾节点不为空
if (pred != null) {
// 新加入的节点前驱节点指向尾节点
node.prev = pred;
// 因为如果多个线程同时获取同步状态失败都会执行这段代码
// 所以,通过CAS方式确保安全的设置当前节点为最新的尾节点
if (compareAndSetTail(pred, node)) {
// 曾经的尾节点的后继节点指向当前节点
pred.next = node;
// 返回新构建的节点
return node;
}
}
// 如果tail为空,或者CAS设置tail失败,需要一个入队操作
enq(node);
return node;
}
主要的流程如下:
compareAndSetTail
方法,完成尾节点的设置。pred 指针为空
或者 cas 失败(当前 pred 指针和 tail 指向的位置不同,说明被别的线程已经修改)
,调用 enq
将节点添加到 AQS 队列。private Node enq(final Node node) {
// 通过“死循环”确保节点被正确添加,最终将其设置为尾节点之后才会返回
for (;;) {
Node t = tail;
// 如果是第一次添加到队列,那么tail=null
if (t == null) { // Must initialize
// 构建一个哨兵结点(空结点),并将头部指针指向它,
if (compareAndSetHead(new Node()))
// 此时队列中只一个头结点,所以tail也指向它
tail = head;
} else {
// 进行第二次循环时,tail不为null,进入else区域。
// 将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node
node.prev = t;
// 将新节点加入到队列尾节点
if (compareAndSetTail(t, node)) {
//t此时指向tail,所以可以CAS成功,将tail重新指向Node。
//此时t为更新前的tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点
t.next = node;
return t;
}
}
}
}
主要的流程如下:
其实,addWaiter 就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点,也叫哨兵结点。
有些同学可能会有疑问,为什么会有哨兵节点?
哨兵,顾名思义,是用来解决国家之间边界问题的,不直接参与生产活动。同样,计算机科学中提到的哨兵,也用来解决边界问题,如果没有边界,指定环节,按照同样算法可能会在边界处发生异常,比如要继续向下分析的 acquireQueued()
方法。
到目前为止,通过 addwaiter
方法构造了一个 AQS 队列,并且将线程添加到了队列的节点中。返回的是一个包含该线程的 Node。而这个 Node 会作为参数,进入到 acquireQueued
方法中。acquireQueued
方法可以对排队中的线程进行「获锁」操作。
总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued
会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
下面我们从「何时出队列?」和「如何出队列?」两个方向来分析一下acquireQueued
源码。
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
//记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点,若为null即刻抛npe
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node,把当前节点置为虚节点
setHead(node);
// 将哨兵节点的后继节点置为空,方便GC
p.next = null; // help GC
failed = false;
// 返回中断标识
return interrupted;
}
// 当前节点的前驱节点不是头节点【或者】当前节点的前驱节点是头节点但获取同步状态失败
//这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
大概流程:
tryAcquire
抢占锁;注:
setHead
方法是把当前节点置为虚节点,但并没有修改 waitStatus,因为它是一直需要用的数据。
获取同步状态成功会返回可以理解了,但是如果失败就会一直陷入到“死循环”中浪费资源吗?
很显然不是,shouldParkAfterFailedAcquire(p, node)
和 parkAndCheckInterrupt()
就会将线程获取同步状态失败的线程挂起,我们继续向下看。
从上面的分析可以看出,只有队列的第二个节点可以有机会争用锁,如果成功获取锁,则此节点晋升为头节点。
对于第三个及以后的节点,if (p == head)
条件不成立,首先进行shouldParkAfterFailedAcquire(p, node)
操作,shouldParkAfterFailedAcquire
方法是靠前驱节点判断当前线程是否应该被阻塞。
它首先判断一个节点的前置节点的状态是否为 Node.SIGNAL
,如果是,说明此节点已经将状态设置-如果锁释放,则应当通知它,所以它可以安全的阻塞了,返回 true。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
// 如果是 SIGNAL 状态,代表此节点可以挂起,直接返回 true,准备继续调用 parkAndCheckInterrupt 方法
//因为前置节点状态为SIGNAL在适当状态 会唤醒后继节点
if (ws == Node.SIGNAL)
return true;
// ws 大于0说明是CANCELLED状态,
if (ws > 0) {
do {
//循环向前查找取消节点,把取消节点从队列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将当前节点的前驱节点设置为设置为 SIGNAL 状态,用于后续唤醒操作
// 程序第一次执行到这返回为false,还会进行外层第二次循环,最终从代码第7行返回
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
到这里你也许有个问题:
这个地方设置前驱节点为 SIGNAL 状态到底有什么作用?
保留这个问题,我们陆续揭晓。
如果前驱节点的 waitStatus
是 SIGNAL
状态,即 shouldParkAfterFailedAcquire
方法会返回 true ,程序会继续向下执行 parkAndCheckInterrupt
方法,用于将当前线程挂起,阻塞调用栈,返回当前线程的中断状态。
private final boolean parkAndCheckInterrupt() {
// 线程挂起,程序不会继续向下执行
LockSupport.park(this);
// 根据 park 方法 API描述,程序在下述三种情况会继续向下执行
// 1. 被 unpark
// 2. 被中断(interrupt)
// 3. 其他不合逻辑的返回才会继续向下执行
// 因上述三种情况程序执行至此,返回当前线程的中断状态,并清空中断状态
// 如果由于被中断,该方法会返回 true
return Thread.interrupted();
}
上述方法的流程图如下:
从上图可以看出,跳出当前循环的条件是当前置节点是头结点,且当前线程获取锁成功
。
为了防止因死循环导致 CPU 资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire
流程):
被唤醒的程序会继续执行 acquireQueued
方法里的循环,如果获取同步状态成功,则会返回 interrupted = true
的结果。
从队列中释放节点的疑虑打消了,那么又有新问题了:
shouldParkAfterFailedAcquire
中取消节点是怎么生成的呢?什么时候会把一个节点的 waitStatus
设置为 -1
?acquireQueued
方法中的 finally
代码:
if (failed)
cancelAcquire(node);
这段代码被执行的条件是 failed 为 true,正常情况下,如果跳出循环,failed 的值为 false,如果不能跳出循环貌似怎么也不能执行到这里,所以只有不正常的情况才会执行到这里,也就是会发生异常,才会执行到此处。
通过查看 try 代码块,有两个方法会抛出异常:
node.processor()
方法tryAcquire()
方法经过分析这两处地方执行抛异常的概率很小,目前没有分析出来可以执行
cancelAcquire
方法的时机。也有一种说法:在ReentrantLock
中,不管是公平锁还是非公平锁,cancelAcquire
都不会运行。有可能道格李老爷子是保持代码统一,反正用到的时候会用到,用不到的时候也不会被执行到。
通过 cancelAcquire
方法,将 Node 的状态标记为 CANCELLED
。接下来,我们逐行来分析这个方法的原理:
private void cancelAcquire(Node node) {
// 将无效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点
node.thread = null;
Node pred = node.prev;
// 通过前驱节点,跳过取消状态的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前node的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
看到这个注释你可能有些乱了,其核心目的就是从等待队列中移除 CANCELLED
的节点,并重新拼接整个队列。
当前的流程:
CANCELLED
,那就一直往前遍历,找到第一个 waitStatus <= 0
的节点,将找到的 pred 节点和当前 Node 关联,将当前 Node 设置为 CANCELLED
。总结来看,其实设置 CANCELLED
状态节点只是有三种情况,我们通过画图来分析一下:
当前节点是尾节点。
当前节点是 Head 的后继节点。
当前节点不是 Head 的后继节点,也不是尾节点。
程序继续向调用栈上层返回,最终回到 AQS 的模版方法 acquire
。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
你也许会有疑惑:
程序已经成功获取到同步状态并返回了,怎么会有个自我中断呢?这个问题我们留到下一篇解答。
至此,获取同步状态的过程就结束了,我们简单的用流程图说明一下整个过程。
我们上面还没有说明 SIGNAL
的作用, SIGNAL
状态信号到底是干什么用的?这就涉及到锁的释放了。
由于篇幅原因我们下篇文章继续分析。