本文深入解读了高频面试点——ReentrantLock的条件队列使用方法及其原理。源码有详细注释,建议收藏阅读。
点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达
Jdk中独占锁的实现除了使用关键字synchronized
外,还可以使用ReentrantLock
。虽然在性能上两者没有什么区别,但ReentrantLock
相比synchronized
而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景,其原理之前已经介绍过,请自行阅读。
使用synchronized
结合Object
上的wait
和notify
方法可以实现线程间的等待通知机制。ReentrantLock
的Condition
同样可以实现这个功能,而且相比前者使用起来更清晰也更简单。前者是java底层级别的,后者是语言级别的,后者可控制性和扩展性更好。
Condition能够支持不响应中断,而通过使用 Object 方式不支持
Condition能够支持多个等待队列(new 多个Condition对象),而 Object 方式只能支持一个
Condition能够支持超时时间的设置,而 Object 不支持
为了方便理解源码,我们先用一个Demo展示一下ReentrantLock的线程停止和通知是如何使用的。这里使用的是一个生产者和消费者的模型,一个线程负责加,另一个线程负责减。
- static volatile int i = 0;
- static final ReentrantLock LOCK = new ReentrantLock();
- static final Condition condition = LOCK.newCondition();
-
- public static void add() throws InterruptedException {
- LOCK.lock();
- try {
- while (i == 0) {
- Thread.sleep(1000);
- System.out.print("add\t");
- System.out.println(++i);
- condition.signal();
- condition.await();
- }
- } finally {
- LOCK.unlock();
- }
- }
-
- public static void sub() throws InterruptedException {
- LOCK.lock();
- try {
- while (i == 1) {
- Thread.sleep(1000);
- System.out.print("sub\t");
- System.out.println(--i);
- condition.signal();
- condition.await();
- }
- } finally {
- LOCK.unlock();
- }
- }
-
- public static void main(String[] args) throws InterruptedException {
- new Thread(() -> {
- while (true) {
- try {
- add();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- new Thread(() -> {
- while (true) {
- try {
- sub();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
可以看到,想要获得一个Condition对象,需要首先通过一个ReentrantLock锁来创建,而最终调用其实为AQS中的内部类ConditionObject。
condition是要和lock配合使用的,而lock的实现原理又依赖于AQS,所以AQS内部实现了ConditionObject。我们知道在锁机制的实现上,AQS内部维护了一个双向的同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列。condition内部也是使用相似的方式,内部维护了一个单向的等待队列,所有调用condition.await
方法的线程会加入到等待队列中,并且线程状态转换为等待状态。
ConditionObject中有两个成员变量:头节点firstWaiter 和 尾节点lastWaiter ,同步队列的成员Node 复用了实现同步队列的内部类Node。用nextWaiter保存了下一个等待节点,源码如下。
- Condition condition = LOCK.newCondition();
- //ReentrantLock内部类Sync
- abstract static class Sync extends AbstractQueuedSynchronizer {
- final ConditionObject newCondition() {
- return new ConditionObject();
- }
- }
- // AQS内部类 ConditionObject
- public class ConditionObject implements Condition, java.io.Serializable {
- /** First node of condition queue. */
- private transient Node firstWaiter;
- /** Last node of condition queue. */
- private transient Node lastWaiter;
-
- //真正的创建Condition对象
- public ConditionObject() { }
- }
- static final class Node {
- Node nextWaiter;
- }
用Object的方式Object对象监视器上只能拥有一个同步队列和一个等待队列,而使用Lock可以有有一个同步队列和多个等待队列。可以多次调用lock.newCondition()创建多个Condition,所以一个Lock可以持有多个等待队列。
下面开始解读await()
和signal()
方法。
1.在条件队列尾部添加新节点(状态CONDITION=-2),如果头节点为空则把当前节点设为头节点。
2.获取当前线程占有的state,无论state是几,都清空为0,代表完全释放锁。并且在释放当前线程所占用的锁之后,会唤醒同步队列中的下一个节点。
3.进入自旋判断逻辑:如果当前节点状态是 CONDITION(-2)或者 prev 节点(表示在同步队列中有前驱节点)为空,返回false,进入while逻辑,阻塞当前线程;如果有继承者,表示肯定在同步队列中,直接跳出循环;如果从同步队列队尾开始寻找,找到当前节点,同样表示在队列中,跳出循环。
注意!! 是先添加到条件队列,再释放锁。所以有可能出现以下的情况,A插入条件队列调用await唤醒B,但是在A唤醒后准备park时,B已经执行完需要的逻辑,并且再次Park。此时的A线程可能已经状态不再是CONDITION,说明已经进入同步队列,那就可以跳过Park再次直接争夺锁,所以这里需要自旋锁去不断尝试判断。
- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- // 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾
- Node node = addConditionWaiter();
- // 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点
- int savedState = fullyRelease(node);
- int interruptMode = 0;
- // 当不在同步队列中(处于condition状态或者前一个节点为null)
- while (!isOnSyncQueue(node)) {
- // 3. 当前线程进入到等待状态
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- // 4. 自旋等待获取到同步状态(即获取到lock)
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null) // clean up if cancelled
- //删除无效的等待节点
- unlinkCancelledWaiters();
- // 5. 处理被中断的情况
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
恢复执行后,检查是否中断。然后自旋再次判断是否已经进入同步队列,返回true,跳出循环继续执行。
调用acquireQueued,尝试去争夺锁,这里逻辑和lock
一样,已经是同步队列去竞争锁的逻辑。并且会将之前清空的state值按照原来的大小设置。
最后都是一些中断标记的处理,主流程已经结束。
注意:退出await
方法一定表明当前线程已经获得了与condition
关联的锁资源。
具体请看代码:
- // AQS
- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- // 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾
- Node node = addConditionWaiter();
- // 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点
- int savedState = fullyRelease(node);
- int interruptMode = 0;
-
- //是先添加到等待队列,再释放锁。所以有可能出现以下的情况,A插入条件队列调用await唤醒B,但是在A唤醒后准备park时,B已经执行完需要的逻辑,并且再次Park,此时的A就可以跳过Park再次直接争夺锁。
- while (!isOnSyncQueue(node)) {
- // 3. 关键节点!!!:当前线程进入到等待状态
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- // 4. 自旋等待获取到同步状态(即获取到lock)
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
-
- // 如果节点线程被取消才会进入这里的逻辑。正常不会
- if (node.nextWaiter != null) // clean up if cancelled
- //删除无效的等待节点
- unlinkCancelledWaiters();
- // 5. 处理被中断的情况
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
- // 添加新的条件队列节点
- private Node addConditionWaiter() {
- Node t = lastWaiter;
- // 清除被取消的尾节点
- if (t != null && t.waitStatus != Node.CONDITION) {
- //解除关联
- unlinkCancelledWaiters();
- t = lastWaiter;
- }
- //将当前线程保存在Node中
- Node node = new Node(Thread.currentThread(), Node.CONDITION);
- if (t == null)
- firstWaiter = node;
- else
- //队尾插入
- t.nextWaiter = node;
- //更新lastWaiter (如果是第一次插入节点,头尾节点都是同一个)
- lastWaiter = node;
- return node;
- }
- //完全释放锁状态
- final int fullyRelease(Node node) {
- boolean failed = true;
- try {
- int savedState = getState();
- // 这里会释放锁,并且唤醒后继节点
- if (release(savedState)) {
- //成功释放同步状态
- failed = false;
- return savedState;
- } else {
- //不成功释放同步状态抛出异常
- throw new IllegalMonitorStateException();
- }
- } finally {
- if (failed)
- node.waitStatus = Node.CANCELLED;
- }
- }
检查本线程是否持有锁,正常是持有锁,如果不符合就抛出异常。
从等待队列中拿到第一个节点。如果头节点为空代表条件队列为空,谁也不通知直接结束。
将头节点从条件队列中移除,并且把nextWaiter置为null。然后把节点状态设为0,转移进入同步队列。如果队列为空则初始化同步队列。
如果前驱节点不是 signal 状态或者前一个节点已经被取消,直接对头节点线程解除阻塞。返回true跳出循环。
至此本线程方法执行结束。依旧持有锁,但是转移了条件队列的头节点到同步队列中,就做了这一件事。
- //AQS
- public final void signal() {
- //1. 先检测当前线程是否已经获取lock
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- //2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
- Node first = firstWaiter;
- if (first != null)
- doSignal(first);
- }
-
- //ReentrantLock
- protected final boolean isHeldExclusively() {
- // While we must in general read state before owner,
- // we don't need to do so to check if current thread is owner
- return getExclusiveOwnerThread() == Thread.currentThread();
- }
-
- //AQS
- private void doSignal(Node first) {
- do {
- if ( (firstWaiter = first.nextWaiter) == null)
- lastWaiter = null;
- //1. 将头结点从等待队列中移除
- first.nextWaiter = null;
- //2. while中transferForSignal方法对头结点做真正的处理
- } while (!transferForSignal(first) &&
- (first = firstWaiter) != null);
- }
-
- final boolean transferForSignal(Node node) {
- //1. 更新状态为0
- if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
- return false;
- //2.将该节点移入到同步队列中去
- // 这里的处理和同步队列的生成用的同一个方法
- // node p 为前驱节点(原尾节点)
- Node p = enq(node);
- int ws = p.waitStatus;
- // 如果前驱节点不是signal状态或者前一个节点已经被取消,直接对头节点解除阻塞。返回true跳出循环
- if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
- LockSupport.unpark(node.thread);
- return true;
- }
具体原理图如下:
signalAll与signal方法的区别体现在doSignalAll
方法上,前面我们已经知道doSignal
方法只会对等待队列的头节点进行操作,而doSignalAll
将条件队列中的所有Node都转移到了同步队列中,即“通知”当前调用condition.await()方法的每一个线程,代码如下。
- private void doSignalAll(Node first) {
- lastWaiter = firstWaiter = null;
- do {
- Node next = first.nextWaiter;
- first.nextWaiter = null;
- transferForSignal(first);
- first = next;
- } while (first != null);
- }
最后,欢迎大家提问和交流。
如果对你有帮助,欢迎点赞、评论或分享,感谢阅读!