• AQS之Condition分析 (六)


    1.Condition 介绍

    ConditionAQS中基于排斥锁的另一应用,其await和sign,signAll方法可以用于替代Object的wait和notify,notifyAll方法。

    ReentrantLock借助Condition可以实现多路选择通知,Synchronized通过wait()notify()/notifyAll()方法可以实现等待/通知机制(单路通知)

    具体实现类是AbstractQueuedSynchronizer的内部类ConditionObjec

    代码中调用的lock.newCondition()实际调用的是Sync类中的newCondition方法。

    final ConditionObject newCondition() {
        return new ConditionObject();
    }
    
    • 1
    • 2
    • 3
    1.1 结构介绍

    Condition

    在这里插入图片描述

    ConditionObject

    在这里插入图片描述

    在这里插入图片描述

    ConditionObject内部维护了一个基于Node的FIFO单向队列,我们把它称为等待队列。

    firstWaiter指向首节点,lastWaiter指向尾节点,Node中的nextWaiter指向队列中的下一个元素,并且等待队列中节点的waitStatus都是-2。我们是通过Node nextWaiter;字段将队列关联起来的。

    2.等待方法介绍

    1. 将当前线程封装成node加入等待队列尾部;
    2. 彻底释放锁资源,也就是将它的同步队列节点从同步队列队首移除;
    3. 如果当前节点不在同步队列中,挂起当前线程;
    4. 自旋,直到该线程被中断或者被唤醒移动到同步队列中;
    5. 阻塞当前节点,直到它获取到锁资源;
    2.1 await()

    Condition的await也有不少的方法,包括不响应中断的,带超时的等待

    public final void await() throws InterruptedException {
    	//1. 响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        //2. 同步队列添加节点
        Node node = addConditionWaiter();
        //3. 释放锁
        long savedState = fullyRelease(node);
        int interruptMode = 0;
        //4.不在同步队列中,阻塞线程
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            //5.等待过程中被中断,直接退出循环。0对应非中断,说明阻塞被正常唤醒
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //6.在同步队列中排队获取锁,如果期间被中断需要设置interruptMode 
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        //8. 整理条件队列,移除取消节点
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        //9. 针对中断做处理
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    await过程会释放锁, 不论重入几次。
    当释放锁后, 线程进行阻塞, 退出租售的条件是:
    (1)线程被中断,对应5的break。线程中断即取消阻塞,对应逻辑checkInterruptWhileWaiting
    (2)节点由条件队列转化到了同步队列,对应4的循环条件
    对于第9步,可以看到,对于期间出现的中断,有两种处理:抛出异常和重新自我中断。对于同步队列排队获取锁的过程只会重新自我中断,条件队列过程中则都有可能。如果两个过程中都出现了抛异常,则按照优先级判断,抛异常的优先级更高。

    2.2 addConditionWaiter()

    当前线程封装为Node, 加入到等待队列尾部

    private Node addConditionWaiter() {
     Node t = lastWaiter;
     if (t != null && t.waitStatus != Node.CONDITION) {
      //将不处于等待状态的结点从等待队列中移除
      unlinkCancelledWaiters();
      t = lastWaiter;
     }
     Node node = new Node(Thread.currentThread(), Node.CONDITION);
     //尾节点为空
     if (t == null)
            //将首节点指向node
      firstWaiter = node;
     else
      //将尾节点的nextWaiter指向node节点
      t.nextWaiter = node;
     //尾节点指向node
     lastWaiter = node;
     return node;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    如果尾结点为空, 那么首尾节点指向当前节点。
    如果尾结点不为空, 那么将当前尾节点的nextWaiter指向当前节点,将当前节点置为尾节点。

    在这里插入图片描述

    2.3 unlinkCancelledWaiters()

    将不处于等待状态的结点从等待队列中移除。

    private void unlinkCancelledWaiters() {
     Node t = firstWaiter;
     //trail是t的前驱结点
     Node trail = null;
     while (t != null) {
      //next为t的后继结点
      Node next = t.nextWaiter;
      //如果t节点的waitStatus不为-2即失效节点
      if (t.waitStatus != Node.CONDITION) {
       t.nextWaiter = null;
       //如果t的前驱节点为空,则将首节点指向next
       if (trail == null)
        firstWaiter = next;
       else
        //t的前驱结点不为空,将前驱节点的后继指针指向next
        trail.nextWaiter = next;
       //如果next为null,则将尾节点指向t的前驱节点
       if (next == null)
        lastWaiter = trail;
      }
      else
       trail = t;
      t = next;
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    t为当前节点,trail为t的前驱节点,next为t的后继节点。
    while方法会从首节点顺着等待队列往后寻找waitStatus!=-2的节点,将当前节点的nextWaiter置为空。
    如果当前节点的前驱节点为空,代表当前节点为首节点,则将next设置为首节点。

    在这里插入图片描述

    如果不为空,则将前驱节点的nextWaiter指向后继节点。

    在这里插入图片描述

    如果后继节点为空,则直接将前驱节点设置为尾节点。

    在这里插入图片描述

    2.4 checkInterruptWhileWaiting()

    用于检测等待过程中有没有中断。

    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 没中断,返回0.这也说明线程等待不是被中断的,而是被唤醒的。唤醒之后要确保节点进入同步队列。
    2. 中断。既然是中断,也相当于取消等待,需要对节点做相应处理。
      而且,前面提到对于不同中断,具体的判断依据是中断的时间点,在signal的前后所决定。
    2.5 transferAfterCancelledWait()

    在线程被中断后必要的时候把节点从条件队列转移到同步队列,并返回是否是在signalled前被中断的。

    final boolean transferAfterCancelledWait(Node node) {
    	//1. 节点状态修改,不再是条件队列节点。0是同步队列节点的初始状态
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        	//加入同步队列
            enq(node);
            return true;
        }
        //2. 不再同步队列,自旋
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    1. signalled之前中断
      修改节点状态并加入同步队列
    2. signalled之后中断
      等待signal方法把节点加入同步队列。由于这个过程很快,如果还没有加入同步队列,自旋即可。
      signal方法会执行修改节点状态并加入同步队列的逻辑,所以对于signalled之后被中断的,不需要再次执行,只需要确认节点已经加入同步队列即可。而且还可以根据节点状态是否是CONDITION来作为两种场景的依据。

    3.唤醒方法介绍

    • signal
    • signalAll
    3.1 signal()
    public final void signal() {
    	//非本线程,抛异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    transferForSignal是针对signal做节点从条件队列到同步队列转化的逻辑方法,可以看到doSignal方法是从条件队列第一个节点开始,只要有一个接单成功就可以。

    3.2 signalAll()
    public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }
    
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    signAll方法是对条件队列所有节点做transferForSignal方法

    3.3 transferForSignal()
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        //1. 修改节点状态
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        //2. 节点加入同步队列
        Node p = enq(node);
        int ws = p.waitStatus;
        //3. 前驱节点是有效节点,状态改为signal
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        	//4. 前驱节点时已经取消,没法修改,唤醒本线程
            LockSupport.unpark(node.thread);
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    4.整体逻辑介绍

    1. 节点加入条件队列
    2. 释放锁资源
    3. 阻塞等待
    4. 修改节点状态,加入同步队列
    5. 排队获取锁资源
    6. 整理条件队列,移除非条件节点
    7. 根据中断场景处理中断:唤醒前的中断抛异常;唤醒后的中断重新中断

    1-3为await, 5-7为await
    (1)对于被signal前中断方式离开阻塞的,第4步在await中,线程已经在第4步前不再阻塞。
    (2)否则第4步在signal方法中,线程具体离开阻塞状态的时间可能是在第3–6步之间,可能是被signal执行后中断方式离开阻塞的,也有可能是排队获取锁资源后唤醒的,还有可能是被signal方法唤醒的。

  • 相关阅读:
    DFER-CLIP——使用创新视觉语言模型进行动态面部表情识别
    jdk动态代理使用详解
    webpack和vite区别
    CSS 技术
    【炼金术士】BatchSize对网络训练的影响
    01 【前言 基础使用 核心概念】
    TCP首部(报头)理解
    Kamailio Debian安装
    【postgresql】查询结果添加一个额外的自增编号
    Android 跨进程通信
  • 原文地址:https://blog.csdn.net/qq_43141726/article/details/127769091