在多线程环境下多个线程同时竞争一把锁,结果是获得锁的线程去执行代码,其余没有获得锁的线程只能等待获得锁的线程释放锁之后才能去竞争获取锁;在AbstractQueuedSynchronizer(AQS)中没有使用synchronized关键字,那AQS具体是如何实现锁机制的呢?它如何解决下面几个问题:
在AQS中有两种模式,在Node节点中用nextWaiter来标识是哪种模式,只有2种取值:
在AQS中,没有获取到锁的线程会被包装成Node节点添加到同步队列中(双向链表);当持有锁的线程释放锁之后会唤醒队列中下一个线程获取锁。
AbstractQueuedSynchronizer是一个基于CAS(compareAndSwap)来构建的同步队列,如果对CAS还有疑惑可以看这篇文章的CAS部分;
队列中每个元素都是Node的数据类型,下面是Node的数据结构:
Node对象中,属性值的含义:
thread:等待锁的线程;
waitStatus: 是等待节点的状态值;独占模式中waitStatus的取值有3种:
nextWaiter:获取锁的类型,在独占模式中值为:EXCLUSIVE ;
prev,next:分别表示该节点的上一个节点和下一个节点;
在AQS中获取独占锁有3个方法:
上面3个方法实现基本差不多,后2个方法只是多添加了对中断标志和超时的判断;以acquire为例分析独占锁的实现过程:
/**
*tryAcquire --》尝试获取锁(由子类实现)
*addWaiter(Node.EXCLUSIVE) ==》添加节点到队列中
*acquireQueued() ===>阻塞,唤醒线程
*selfInterrupt==》设置中断标志;
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();//acquireQueued返回值为true,线程设置中断标志;
}
获取锁到同步队列的过程:
AQS只是提供一个框架,定义了线程获取锁的流程,以及出现异常的处理;具体的实现还要看子类的实现,比如基于AQS实现的就有:ReentrantLock,ReentrantReadWriteLock,CountDownLatch,Semaphore等;这些实现类的tryAcquire各有不同;
在AQS中,竞争锁失败的线程要加入到同步队列中;AQS的同步队列(CLH)是一个双向链表,head 指向链表头节点, tail 指向链表尾节点;每一个加入队列的节点都是添加到队列尾部
private Node addWaiter(Node mode) {
//mode:表示锁的类型:互斥锁,共享锁
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {//判断尾节点tail是不是为空;
node.prev = pred;
if (compareAndSetTail(pred, node)) {//将新插入的节点放到链表尾部;新插入的node作为新的tail
pred.next = node;
return node;
}
}
//上面插入失败,说明有多个线程同时竞争锁失败,加入到同步队列中;
//同时竞争要添加到链表的末尾,添加失败的进入到enq方法中继续尝试加入队列
enq(node);
return node;
}
1. 当线程竞争锁失败时会将线程包装成Node节点加入到同步队列,包装成Node对象时会标识该线程要竞争锁的类型:SHARED 或者 EXCLUSIVE ,在独占模式中,mode = EXCLUSIVE;
2. 判断链表的尾节点 tail 是否为null。
3. 同步队列是空队列或者新node插入失败的,都会进入到enq方法中继续尝试将节点插入到队列中;
enq源码:
private Node enq(final Node node) {
for (;;) {//死循环,保证能将node节点添加到链表中;
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//初始化head
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {//CAS,竞争将node添加到表尾;
t.next = node;
return t;
}
}
}
}
在enq方法中,如果检测到tail == null,就会用一个node初始化head和tail,这个node没有有用信息,各个属性值都是初始值就是一个占位用的,没有实际意义;
在addWaiter和enq中,有一个有意思的问题:为什么链表添加新node的时候都是先让node的prev指向旧tail,CAS竞争修改表尾tail成功之后再修改旧tail的next指针?为什么不在CAS竞争成功之后,一起修改 prev 和 next 呢?
后一种写法会让新节点成为队列的 tail节点的这一刻:tail脱离队列,队列的尾节点tail不在队列中;这种写法会在同步队列释放锁时可能导致获取锁时发生异常;(文章末尾会关于这个问题做讨论)
在acquireQueued方法中关于什么时候阻塞线程什么时候唤醒线程的处理流程的大框架并不是特别复杂:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取前一个节点:prev
final Node p = node.predecessor();
//prev节点:prev节点是头节点head ====》 调用tryAcquire尝试获取锁;
if (p == head && tryAcquire(arg)) {
setHead(node);//设置该节点node为头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire:根据prev的waitState状态值,判断是否应该阻塞当前线程;
//parkAndCheckInterrupt调用LockSupport.park,阻塞当前线程;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) //线程发生异常时,failed = true;
cancelAcquire(node);//将node的waitState值,改为 CANCELLED;
}
}
流程:
源码:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 获取prev的状态值:waitStatus
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {// CANCELLED = 1
//从node节点开始往前找到最近一个waitState值不为 1 的节点,将其设置为 该节点的prev节点;
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//在EXCLUSIVE(独占)模式下waitStatus 的取值只有: 0 ,CANCELLED ,SIGNAL;
//因此在waitStatus 的值排除了 CANCELLED ,SIGNAL;到这个分支waitStatus 只能是 0;
//将prev节点的状态值从 0 -> 1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这段代码的逻辑比较简单就是根据该节点的prev节点的状态值waitStatus,来处理并返回结果;从返回结果来看:如果prev的waitStatus =SIGNAL就返回true(阻塞线程),否则返回false (不阻塞线程);
流程:
上节简单提到,在acquireQueued方法中获取锁发生异常时会进入到finally块执行cancelAcquire方法;进入到这个方法可以将node的状态waitStatus 值改为 CANCELLED;因此在这里才会判断prev节点的状态是否为CANCELLED;在发现prev节点的状态为CANCELLED时,会往前找到一个状态值正常(0或SIGNAL)的节点作为prev节点,将状态值为CANCELLED的节点清除出队列;
每一个新加入队列的节点,都会检查自己的prev节点的状态是否是CANCELLED,如果是这个状态就会将这个节点清除出队列;在cancelAcquire方法中,除了将自己所在节点的状态标记为CANCELLED之外,该线程还会将自己所在节点删除,并且往前检查,将不合法(CANCELLED)的状态节点删除。
shouldParkAfterFailedAcquire(pred, node) && parkAndCheckInterrupt(),当该方法返回true时,会执行parkAndCheckInterrupt方法阻塞当前线程;
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//阻塞当前线程
return Thread.interrupted();
}
在这个方法中,调用LockSupport.park(this);实现了对当前线程的阻塞;在当前线程被唤醒之后调用:Thread.interrupted() 清除中断标记;如果线程设置了中断标记返回true,反之返回false;
流程图:
源码:
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;//获取prev节点
// 检查当前节点的prev节点是否存在状态为CANCELLED的节点;
//如果prev异常,就找到离最近node最近的正常节点,当作node节点的prev
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
//如果node是tail尾节点,就将pred节点设置为tail;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);//修改pred节点的next指针
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {//pred不是head节点,且状态正常
Node next = node.next;
//pred的next指向node的next指向的节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {//pred是head节点,或者node节点的状态是CANCELLED;
unparkSuccessor(node);//释放node
}
node.next = node; // help GC
}
}
在删除node节点时有一个细节,就是没有彻底将node节点从队列中删除,只是将node的next指针删除,而保留了prev指针;
可以考虑下面的场景:当新增的node节点,添加到队列的末尾,成为了队列的tail节点;新节点成为tail有2个步骤:
队列是双向链表,因此在成为当newNode成为队列的tail节点之后还需要将oldTail的next指针更新为 newNode :oldTail.next = newNode 这才算newNode完整的加入到了队列中;如果在这一步操作之前,因为操作系统的线程调度导致该线被挂起,暂停运行。这个时候:oldTail.next =null ;在该线程被挂起的期间,oldTail节点发生异常,需要被清理出队列;可以分别看:清除prev指针和不清除prev的结果:
删除旧tail的prev指针,导致新加入队列的节点与整个队列脱节;
源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(arg)方法由子类实现,unparkSuccessor方法是关键,由这个方法将head的下一个节点持有的线程唤醒;
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
释放next节点;
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//找到距离node最近的下一个正常节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒节点持有的线程;
}
获取到next节点时首先会判断next是否为空或者是状态是否异常(CANCELLED )?如果是,则会从队列的末端tail开始向前遍历,找到距离head节点最近的状态正常的节点将其释放;
现在有一个问题,当遇到next节点的状态异常或者是next为null的时候,为什么要从后往前遍历队列呢?
在next=null的时候为什么不直接认定next已经是队列的最后一个节点?在队列新加入节点时当新节点成为tail节点时,可能存在旧的tail节点next指针还没更新还是null,这个时候就不能认定队列中没有节点了;因此从前往后找就可能会漏掉队列中的其他节点;
在添加新节点到队列时,每个新节点的prev指针都是先指向旧tail节点,当新节点成为tail时新节点就已经在队列中而不是和整个队列脱节;并且在删除异常节点时,也是保留了异常节点的prev指针,无论在什么情况下都保证了队列能够从tail节点开始从后向前能够遍历整个队列,因此从后往前一定能够找到指定的节点;
新加入的node节点都是先设置prev指针,再利用CAS机制竞争成为队列的tail节点。如果加入新节点是先成为tail,再设置prev指针会有什么后果?试想下这样一个场景:当一个节点发生异常会进入到cancelAcquire方法中自我清除,但这个过程不是原子性的分为几个步骤,前面已经分析过不再赘述cancelAcquire;如果自我清除没有完成,这时前一个节点已经释放锁检测到该节点状态异常,就不会唤醒该节点;会从tail开始从后往前遍历找到合适的节点将其唤醒;
此时如果tail节点与队列脱节,那么遍历到 tail节点时就不会继续向前遍历,也就是说head节点会唤醒tail;而tail节点本来就是可运行状态,当tail节点继续正常的流程:设置prev—> 发现prev节点不是head节点 —>不会获取到锁—>阻塞自己,至此整个同步队列都会被阻塞;因为已经没有线程来唤醒队列中的节点了,后来的线程只能被阻塞在同步队列中从而造成整个同步队列不可用;
shared模式下的获取锁,释放锁与exclusive模式下的都很相似;shared模式下同一时刻下,同一把锁可以被多个线程共同持有;
在AQS的实现共享锁的机制是什么呢?当一个 shared类型的节点获取到锁之后,会检查next节点是否也是shared类型的节点,如果是的话会继续将next节点唤醒;这样就可以实现多个线程同时持有一把锁了。但是,唤醒的条件不只是判断next的节点类型是否是:shared。还有其他的条件。
我们以Semaphore(该类使用AQS的共享锁来实现功能)为例分析;这个类的作用场景:假设一个停车场只有50个停车位,最多只能停50辆车;现在如果有100辆车需要进停车场停车,那么停车场最多只能停50辆车,其余50辆车只能等待。需要等待已经进入停车场的车开走空出位置,后面的车才能进去。如果只空出10个位置,那么只能放10辆车进入,剩下的40辆车依旧需要等待;
这个例子中 :等待停车位就相当于是在队列中排队,而进入停车场就是将线程唤醒。从这个例子可以看出来,在唤醒线程时不是无限传播,也是需要满足条件的。
在shared模式下获取之后的共享锁机制是在setHeadAndPropagate方法中实现的:
private void doAcquireShared(){
...
...
...
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
...
...
...
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();//唤醒下一个节点
}
}
在决定是否继续唤醒next节点时,是由propagate的值和next节点的类型共同决定的;而propagate的值由tryAcquireShared方法返回,这是由子类实现决定的。以上个停车场的例子中使用Semaphore为例;
Semaphore semaphore = new Semaphore(50);//AQS.state=50;
for(int i = 0; i< 100 ; i++){
new Thread(()->{
semaphore.lock();
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+" acquire lock... ");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
在获取到锁时 state-= 1,当state=0时:acquire()获取不到锁,会阻塞线程;调用release方法state += 1 ;利用state变量值来决定是否阻塞线程;
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
释放锁的时候,引入了一个状态:PROPAGATE;那为什么需要这个状态呢?总感觉这个状态时多余的;于是查看了很多博客,有一篇博客讲的非常好:AQS源码分析。这篇文章分析了,PROPAGATE引入前bug是如何产生的;
总的来说,引入PROPAGATE是为了解决释放共享锁的bug;juc这个包是在jdk1.5的时候最开始引入的,期间修改过bug所以jdk1.8版本的AQS已经和最开始的版本有差别了;下面是jdk1.5时共享锁的释放releaseShared;
JDK1.5
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
/*
* Don't bother fully figuring out successor. If it
* looks null, call unparkSuccessor anyway to be safe.
*/
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
问题代码(这里直接copy一下那片文章的代码):
public class TestSemaphore {
private static Semaphore sem = new Semaphore(0);
private static class Thread1 extends Thread {
@Override
public void run() {
sem.acquireUninterruptibly();
}
}
private static class Thread2 extends Thread {
@Override
public void run() {
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
//获取锁
Thread t1 = new Thread1();
Thread t2 = new Thread1();
//释放锁
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}
这段代码比较简单:线程t1,t2,t3,t4按照顺序执行;最开始t1,t2线程获取不到锁会被阻塞加入到同步队列中;而t3,t4用来唤醒前2个线程;
分析bug产生的过程:
在t3线程释放锁时唤醒t1线程,t1线程获取锁:int r = tryAcquireShared(arg);获取到 r = 0;此时t1线程因为操作系统的线程调度被切换暂停运行;此时:head节点还是没有变,状态从 -1 -> 0
在这个时候 , t4线程也释放锁;检查到 head节点的状态值 0 不满足条件,不会调用unparkSuccessor释放锁;
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) //ws = 0 不会调用unparkSuccessor
unparkSuccessor(h);
return true;
}
return false;
}
time3时刻:t1线程恢复运行,进入到setHeadAndPropagate(propagate)中,此时:propagate = r = 0;
/**
node.thread = t1;
propagate = 0;
*/
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
/*
* Don't bother fully figuring out successor. If it
* looks null, call unparkSuccessor anyway to be safe.
*/
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
由于此时 propagate = 0 不会t1线程,不会继续释放同步队列中的下一个节点t2了;因此t2获取不到锁,会被一直阻塞在队列中;
JDK1.8
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
time1:t3线程调用doReleaseShared释放锁,唤醒t1线程;此时head.waitStatus = 0; r = tryAcquireShared(arg); r = 0;t1线程被切换,暂停运行;
time2:t4线程,调用doReleaseShared释放锁,发现head.waitStatus = 0;因此不会调用:unparkSuccessor方法;但是会将head状态值修改为:head.waitStatus = PROPAGATE;
time3:t1恢复运行,进入到setHeadAndPropagate(propagate)中,propagate = 0;
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
此时:h.waitStatus = propagate(-3),满足:h.waitStatus < 0 ;因此t1线程会继续调用:doReleaseShared唤醒next节点:t2;