• 被问到可重入锁条件队列,看这一篇就够了!|原创


    本文深入解读了高频面试点——ReentrantLock的条件队列使用方法及其原理。源码有详细注释,建议收藏阅读。

    点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达

    Jdk中独占锁的实现除了使用关键字synchronized外,还可以使用ReentrantLock。虽然在性能上两者没有什么区别,但ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景,其原理之前已经介绍过,请自行阅读。

    0910213ec0fc3a10f360657aa47fcab3.jpeg

    重点,一文掌握ReentrantLock加解锁原理!|原创

     

    使用synchronized结合Object上的waitnotify方法可以实现线程间的等待通知机制。ReentrantLockCondition同样可以实现这个功能,而且相比前者使用起来更清晰也更简单。前者是java底层级别的,后者是语言级别的,后者可控制性和扩展性更好。

    Condition与Object的wait/notify区别

    1. Condition能够支持不响应中断,而通过使用 Object 方式不支持

    2. Condition能够支持多个等待队列(new 多个Condition对象),而 Object 方式只能支持一个

    3. Condition能够支持超时时间的设置,而 Object 不支持

    使用示例

    为了方便理解源码,我们先用一个Demo展示一下ReentrantLock的线程停止和通知是如何使用的。这里使用的是一个生产者和消费者的模型,一个线程负责加,另一个线程负责减。

    1. static volatile int i = 0;
    2. static final ReentrantLock LOCK = new ReentrantLock();
    3. static final Condition condition = LOCK.newCondition();
    4. public static void add() throws InterruptedException {
    5.     LOCK.lock();
    6.     try {
    7.         while (i == 0) {
    8.             Thread.sleep(1000);
    9.             System.out.print("add\t");
    10.             System.out.println(++i);
    11.             condition.signal();
    12.             condition.await();
    13.         }
    14.     } finally {
    15.         LOCK.unlock();
    16.     }
    17. }
    18. public static void sub() throws InterruptedException {
    19.     LOCK.lock();
    20.     try {
    21.         while (i == 1) {
    22.             Thread.sleep(1000);
    23.             System.out.print("sub\t");
    24.             System.out.println(--i);
    25.             condition.signal();
    26.             condition.await();
    27.         }
    28.     } finally {
    29.         LOCK.unlock();
    30.     }
    31. }
    32. public static void main(String[] args) throws InterruptedException {
    33.     new Thread(() -> {
    34.         while (true) {
    35.             try {
    36.                 add();
    37.             } catch (InterruptedException e) {
    38.                 e.printStackTrace();
    39.             }
    40.         }
    41.     }).start();
    42.     new Thread(() -> {
    43.         while (true) {
    44.             try {
    45.                 sub();
    46.             } catch (InterruptedException e) {
    47.                 e.printStackTrace();
    48.             }
    49.         }
    50.     }).start();
    51. }

    可以看到,想要获得一个Condition对象,需要首先通过一个ReentrantLock锁来创建,而最终调用其实为AQS中的内部类ConditionObject。

    condition是要和lock配合使用的,而lock的实现原理又依赖于AQS,所以AQS内部实现了ConditionObject。我们知道在锁机制的实现上,AQS内部维护了一个双向的同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列。condition内部也是使用相似的方式,内部维护了一个单向的等待队列,所有调用condition.await方法的线程会加入到等待队列中,并且线程状态转换为等待状态。

    4f5cd36022ac9d50e8352838347e0e36.png

    ConditionObject中有两个成员变量:头节点firstWaiter 和 尾节点lastWaiter ,同步队列的成员Node 复用了实现同步队列的内部类Node。用nextWaiter保存了下一个等待节点,源码如下。

    1. Condition condition = LOCK.newCondition();
    2. //ReentrantLock内部类Sync
    3. abstract static class Sync extends AbstractQueuedSynchronizer {
    4.     final ConditionObject newCondition() {
    5.         return new ConditionObject();
    6.     }
    7. }
    8. // AQS内部类 ConditionObject
    9. public class ConditionObject implements Condition, java.io.Serializable {
    10.     /** First node of condition queue. */
    11.     private transient Node firstWaiter;
    12.     /** Last node of condition queue. */
    13.     private transient Node lastWaiter;
    14.     //真正的创建Condition对象
    15.     public ConditionObject() { }
    16. }
    17. static final class Node {
    18.   Node nextWaiter;
    19. }

    用Object的方式Object对象监视器上只能拥有一个同步队列和一个等待队列,而使用Lock可以有有一个同步队列和多个等待队列。可以多次调用lock.newCondition()创建多个Condition,所以一个Lock可以持有多个等待队列。

    22418f15572bff59f199aeb81505856d.png

    下面开始解读await()signal()方法。

    Await方法原理

    阻塞前:

    1.在条件队列尾部添加新节点(状态CONDITION=-2),如果头节点为空则把当前节点设为头节点。

    2.获取当前线程占有的state,无论state是几,都清空为0,代表完全释放锁。并且在释放当前线程所占用的锁之后,会唤醒同步队列中的下一个节点。

    3.进入自旋判断逻辑:如果当前节点状态是 CONDITION(-2)或者 prev 节点(表示在同步队列中有前驱节点)为空,返回false,进入while逻辑,阻塞当前线程;如果有继承者,表示肯定在同步队列中,直接跳出循环;如果从同步队列队尾开始寻找,找到当前节点,同样表示在队列中,跳出循环。

    bbc301f1fd2ef3345c6be2497a51f18f.png

    注意!! 是先添加到条件队列,再释放锁。所以有可能出现以下的情况,A插入条件队列调用await唤醒B,但是在A唤醒后准备park时,B已经执行完需要的逻辑,并且再次Park。此时的A线程可能已经状态不再是CONDITION,说明已经进入同步队列,那就可以跳过Park再次直接争夺锁,所以这里需要自旋锁去不断尝试判断。

    1. public final void await() throws InterruptedException {
    2.     if (Thread.interrupted())
    3.         throw new InterruptedException();
    4.     // 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾
    5.     Node node = addConditionWaiter();
    6.     // 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点
    7.     int savedState = fullyRelease(node);
    8.     int interruptMode = 0;
    9.     // 当不在同步队列中(处于condition状态或者前一个节点为null)
    10.     while (!isOnSyncQueue(node)) {
    11.         // 3. 当前线程进入到等待状态
    12.         LockSupport.park(this);
    13.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    14.             break;
    15.     }
    16.     // 4. 自旋等待获取到同步状态(即获取到lock)
    17.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    18.         interruptMode = REINTERRUPT;
    19.     if (node.nextWaiter != null) // clean up if cancelled
    20.         //删除无效的等待节点
    21.         unlinkCancelledWaiters();
    22.     // 5. 处理被中断的情况
    23.     if (interruptMode != 0)
    24.         reportInterruptAfterWait(interruptMode);
    25. }

    阻塞后:

    1. 恢复执行后,检查是否中断。然后自旋再次判断是否已经进入同步队列,返回true,跳出循环继续执行。

    2. 调用acquireQueued,尝试去争夺锁,这里逻辑和lock一样,已经是同步队列去竞争锁的逻辑。并且会将之前清空的state值按照原来的大小设置。

    3. 最后都是一些中断标记的处理,主流程已经结束。

    注意:退出await方法一定表明当前线程已经获得了与condition关联的锁资源。

    c3373e755fa4106bdae7529903b9b55e.png

    具体请看代码:

    1. // AQS
    2. public final void await() throws InterruptedException {
    3.     if (Thread.interrupted())
    4.         throw new InterruptedException();
    5.     // 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾
    6.     Node node = addConditionWaiter();
    7.     // 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点
    8.     int savedState = fullyRelease(node);
    9.     int interruptMode = 0;
    10.   
    11.   //是先添加到等待队列,再释放锁。所以有可能出现以下的情况,A插入条件队列调用await唤醒B,但是在A唤醒后准备park时,B已经执行完需要的逻辑,并且再次Park,此时的A就可以跳过Park再次直接争夺锁。
    12.     while (!isOnSyncQueue(node)) {
    13.         // 3. 关键节点!!!:当前线程进入到等待状态
    14.         LockSupport.park(this);
    15.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    16.             break;
    17.     }
    18.     // 4. 自旋等待获取到同步状态(即获取到lock)
    19.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    20.         interruptMode = REINTERRUPT;
    21.   
    22.   // 如果节点线程被取消才会进入这里的逻辑。正常不会
    23.     if (node.nextWaiter != null) // clean up if cancelled
    24.         //删除无效的等待节点
    25.         unlinkCancelledWaiters();
    26.     // 5. 处理被中断的情况
    27.     if (interruptMode != 0)
    28.         reportInterruptAfterWait(interruptMode);
    29. }
    30. // 添加新的条件队列节点
    31. private Node addConditionWaiter() {
    32.     Node t = lastWaiter;
    33.     // 清除被取消的尾节点
    34.     if (t != null && t.waitStatus != Node.CONDITION) {
    35.         //解除关联
    36.         unlinkCancelledWaiters();
    37.         t = lastWaiter;
    38.     }
    39.     //将当前线程保存在Node中
    40.     Node node = new Node(Thread.currentThread(), Node.CONDITION);
    41.     if (t == null)
    42.         firstWaiter = node;
    43.     else
    44.         //队尾插入
    45.         t.nextWaiter = node;
    46.     //更新lastWaiter (如果是第一次插入节点,头尾节点都是同一个)
    47.     lastWaiter = node;
    48.     return node;
    49. }
    50. //完全释放锁状态
    51. final int fullyRelease(Node node) {
    52.     boolean failed = true;
    53.     try {
    54.         int savedState = getState();
    55.       // 这里会释放锁,并且唤醒后继节点
    56.         if (release(savedState)) {
    57.             //成功释放同步状态
    58.             failed = false;
    59.             return savedState;
    60.         } else {
    61.             //不成功释放同步状态抛出异常
    62.             throw new IllegalMonitorStateException();
    63.         }
    64.     } finally {
    65.         if (failed)
    66.             node.waitStatus = Node.CANCELLED;
    67.     }
    68. }

    Signal

    1. 检查本线程是否持有锁,正常是持有锁,如果不符合就抛出异常。

    2. 从等待队列中拿到第一个节点。如果头节点为空代表条件队列为空,谁也不通知直接结束。

    3. 将头节点从条件队列中移除,并且把nextWaiter置为null。然后把节点状态设为0,转移进入同步队列。如果队列为空则初始化同步队列。

    4. 如果前驱节点不是 signal 状态或者前一个节点已经被取消,直接对头节点线程解除阻塞。返回true跳出循环。

    5. 至此本线程方法执行结束。依旧持有锁,但是转移了条件队列的头节点到同步队列中,就做了这一件事。

    1. //AQS
    2. public final void signal() {
    3.     //1. 先检测当前线程是否已经获取lock
    4.     if (!isHeldExclusively())
    5.         throw new IllegalMonitorStateException();
    6.     //2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
    7.     Node first = firstWaiter;
    8.     if (first != null)
    9.         doSignal(first);
    10. }
    11. //ReentrantLock
    12. protected final boolean isHeldExclusively() {
    13.     // While we must in general read state before owner,
    14.     // we don't need to do so to check if current thread is owner
    15.     return getExclusiveOwnerThread() == Thread.currentThread();
    16. }
    17. //AQS
    18. private void doSignal(Node first) {
    19.     do {
    20.         if ( (firstWaiter = first.nextWaiter) == null)
    21.             lastWaiter = null;
    22.         //1. 将头结点从等待队列中移除
    23.         first.nextWaiter = null;
    24.         //2. while中transferForSignal方法对头结点做真正的处理
    25.     } while (!transferForSignal(first) &&
    26.              (first = firstWaiter) != null);
    27. }
    28. final boolean transferForSignal(Node node) {
    29.     //1. 更新状态为0
    30.     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    31.         return false;
    32.     //2.将该节点移入到同步队列中去
    33.    // 这里的处理和同步队列的生成用的同一个方法
    34.    // node p 为前驱节点(原尾节点)
    35.     Node p = enq(node);
    36.     int ws = p.waitStatus;
    37.    // 如果前驱节点不是signal状态或者前一个节点已经被取消,直接对头节点解除阻塞。返回true跳出循环
    38.     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    39.         LockSupport.unpark(node.thread);
    40.     return true;
    41. }

    具体原理图如下:

    feb70e6bbb0d750292ddad22c311e539.png

    SignalAll

    signalAll与signal方法的区别体现在doSignalAll方法上,前面我们已经知道doSignal方法只会对等待队列的头节点进行操作,而doSignalAll将条件队列中的所有Node都转移到了同步队列中,即“通知”当前调用condition.await()方法的每一个线程,代码如下。

    1. private void doSignalAll(Node first) {
    2.     lastWaiter = firstWaiter = null;
    3.     do {
    4.         Node next = first.nextWaiter;
    5.         first.nextWaiter = null;
    6.         transferForSignal(first);
    7.         first = next;
    8.     } while (first != null);
    9. }

    最后,欢迎大家提问和交流。

    如果对你有帮助,欢迎点赞、评论或分享,感谢阅读!

    update在MySQL中是怎样执行的,一张图牢记|原创

    2022-11-19

    eb697c182f74ee74d74b1d09d9bb5a23.jpeg

    讲真,这篇最全HashMap你不能错过!|原创

    2022-11-17

    a96ce6f765bfa33ec4ee251352719e62.jpeg

    MySQL主从数据不一致,怎么办?

    2022-11-15

    6e263aa5fa9758bc7a664efe60d69195.jpeg
  • 相关阅读:
    “看片”神器没了,又将有谁突出重围?
    Effective C++改善程序与设计的55个具体做法 1. 让自己习惯 c++
    Java Web DTO 以及 VO 等实际意义以及作用
    深入学习和理解Django模板层:构建动态页面
    【如何学习CAN总线测试】——CAN数据链路层测试
    XShell快速连接虚拟机(Ubuntu系统)
    在模拟器上安装magisk实现Charles抓https包(二)
    java计算机毕业设计在线毕设选题系统源码+系统+mysql数据库+lw文档
    vue页面和 iframe多页面无刷新方案和并行存在解决方案
    深入ftrace kprobe原理解析
  • 原文地址:https://blog.csdn.net/sinat_32873711/article/details/128059728