• AQS源码解析 3.lock() & unlock() 加锁解锁过程


    AQS源码解析 3.lock() & unlock() 加锁解锁过程

    Lock() 过程

    • 这里使用 ReentrantLock 的公平锁去看 AQS 的加锁过程。
    • 在 ReentrantLock 的实现中,其默认构造的锁是非公平锁

    详细流程图

    • 尝试获取锁 + 构造节点入队过程

      在这里插入图片描述

    • 在队列中被挂起 + 被唤醒重新抢锁的过程

      在这里插入图片描述

    AQS.acquire()

     	// ReentrantLock.lock() 公平锁入口
    	public void lock() {
            sync.acquire(1); // AQS提供的acquire()
        }
    			   ||
                   ||
                   \/ 
                       
    	// AQS.acquire()
    	public final void acquire(int arg) {
            /*
             * 条件1:!tryAcquire()尝试获取锁,取反后 获取成功返回false,获取失败返回true 进入条件2的逻辑
             * 条件2:
             *     2.1 addWaiter() 获取锁失败 将当前线程封装成一个Node入队
             *     2.2 acquireQueued() 在队列中尝试获取锁的过程,如果获取失败则挂起当前线程,还有唤醒后相关的逻辑
             *         返回值表示挂起过程中线程 是否是被中断唤醒过,true表示被中断唤醒过,false表示未被中断过..
             */
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                //  打断线程 再次设置中断标志位为true
                selfInterrupt();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    ReentrantLock.tryAcquire()

    	// ReentrantLock.tryAcquire()方法在内部类 FairSync公平锁 中
    	static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
            /*
         	 * 公平锁提供的 tryAcquire 方法 尝试获取锁
    	 	 * 抢占成功:返回true 包含重入..
         	 * 抢占失败:返回false
         	 */
            @ReservedStackAccess // 这个注解的作用:它会保护被注解的方法,通过添加一些额外的空间,防止在多线程运行的时候出现栈溢出
            protected final boolean tryAcquire(int acquires) {
               	// 当前线程
                final Thread current = Thread.currentThread();
                // 获取state的值(即当前锁的状态)
                int c = getState();
                // c == 0表示当前处于无锁状态
                if (c == 0) {
                    /*
                     * 条件1:!hasQueuedPredecessors()
                     * 因为这里是fairSync公平锁,所以任何时候必须先查看队列中是否有节点(等待者),有节点就得去排队,不允许竞争锁
                     * hasQueuedPredecessors()查看队列中是否有节点:true表示队列中有节点,false表示队列中没有节点,
                     * 当队列中没有节点时,取反后 false->true 才会走到条件2的逻辑。
                     * 条件2:compareAndSetState(0, acquires)
                     * 成功:说明当前线程抢占锁成功
                     * 失败:说明存在竞争,且当前线程竞争失败
                     * 只有当队列中没有节点时才能使用CAS操作去获取锁。
                     */
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {      
                        // 抢锁成功 将当前线程设置为获取锁的线程
                        setExclusiveOwnerThread(current);
                        // 直接return true 尝试获取锁成功
                        return true;
                    }
                }
                /*
                 * 走到这里就说明当前锁已经被占用了,因为ReentrantLock是可重入锁,所以会判断持有锁的线程是否就是当前线程,即这里是重入的逻辑。
                 */
                else if (current == getExclusiveOwnerThread()) {
                    // 重入的逻辑
                    // nextc更新值
                    int nextc = c + acquires;
                    // 越界判断:当重入的深度很深时,会导致nextc < 0,int达到最大值之后 再+1 会爆int 值会变为负数..
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    // 设置新的state,setState是一个无锁方法,因为只有持锁线程才能进到这里
                    setState(nextc);
                    // 重入锁再次加锁成功 也返回true
                    return true;
                }
                /*
                 * 什么时候返回false?
                 * 1.state = 0,无锁状态,但是CAS抢锁失败(有并发)
                 * 2.state > 0,且持锁线程非当前线程
                 */
                return false;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    AQS.addWaiter()

    	/*
         * 当前线程抢占锁失败,将当前线程封装为Node,并入队
         * @param mode:模式,这里传入的是独占模式(Node.EXCLUSIVE)
         * @return 最终会返回当前线程包装后的node节点
         */
    	private Node addWaiter(Node mode) {
            // 将当前线程封装成一个Node节点,mode是独占模式(ReentrantLock => Node.EXCLUSIVE)
            Node node = new Node(Thread.currentThread(), mode);
            /*
         	 * 快速入队
         	 */ 
            // 获取队尾节点 作为前置节点pred(入队操作就是将当前节点放入当前队尾节点的后面)
            Node pred = tail;
    		// 条件成立:队列中已经有node了
            if (pred != null) {
                // 将当前节点的前驱指向pred
                node.prev = pred;
                // CAS设置tail指向当前node(这里可能有并发,所以要使用CAS操作)
                if (compareAndSetTail(pred, node)) {
                    // CAS成功,将尾结点的后继指向node 完成双向绑定
                    pred.next = node;
                    // 说明入队成功,返回node即可
                    return node;
                }
            }
          	/*
          	 * 什么时候会执行到这里?
          	 * 1.当前队列是空队列 tail == null
          	 * 2.CAS竞争入队失败.. 会来到这里
          	 * enq()方法,完整入队,不断自旋入队。
          	 */  
            enq(node);
            return node;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    AQS.enq()

        private Node enq(final Node node) {
            // 自旋,保证一定可以入队,只有当前node入队才会跳出循环
            for (;;) {
                Node t = tail;	
                /*
                 * 当前队列是空队列,tail == null
                 * 说明当前锁被占用,但是队列没有节点,说明当前线程可能是第一个获取锁失败的线程(为什么是可能?=> 存在并发)
                 * 那么作为当前持锁线程的第一个后继线程,需要做什么?
                 * 因为第一个当前持锁的线程,获取锁时直接tryAcquire成功了,没有向阻塞队列中添加任何的node,
                 * 所以作为后继需要为它补一个Node,随后将自己入队。
                 */
                if (t == null) { // Must initialize
                    // CAS为当前抢占锁成功的线程追加一个Node 存入队列的头结点中(因为头结点一定是抢占到锁的线程),然后继续自旋
                    // CAS成功 说明当前线程成为 head.next 节点
                    if (compareAndSetHead(new Node()))
                        // 此时当前队列中只有1个节点,就是当前抢占锁成功线程的节点,且为头结点,将tail指向head
                        tail = head;
                    	// 注意:这里没有return 会继续自旋(因为自己还没入队)
                } else {
                    /*
                 	 * 普通入队
                 	 */
                    // 将自己入队,只不过在for自旋中,会保证一定入队成功!
                    node.prev = t;
                    // CAS将自己设置为尾结点
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        // 返回前置节点
                        return t;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    AQS.acquireQueued()

        /*
         * acquireQueued()方法的作用?
     	 * 1.当前节点入队后有没有被挂起呢?没有 => 挂起的操作
    	 * 2.如果被挂起了?那么唤醒之后的逻辑在哪?=> 唤醒之后的逻辑
    	 * 都在此方法中。
    	 * @param node 表示当前线程封装的node 且当前时刻 已经入队成功了...
    	 * @param arg  表示加锁的arg参数,设置state会用到
         */
    	final boolean acquireQueued(final Node node, int arg) {
            /*
             * true 表示当前线程抢占锁成功(一般情况下【lock】,最终肯定会获取到锁)
             * false 表示失败,需要执行出队的逻辑(后面讲响应中断的lock方法时再讲)
             */
            boolean failed = true;
            try {
                // 表示当前线程是否被中断
                boolean interrupted = false;   
                // 自旋 获取锁
                for (;;) {
                    /*
                     * 什么情况会执行这里?
                     * 1.进入for循环时,在线程尚未park前
                     * 2.线程park后,被唤醒后,也会执行这里(自旋)
                     */
                    // 拿到节点的前驱节点
                    final Node p = node.predecessor();
                    /*
                     * 这里判断当前节点是不是head.next节点,head.next节点在任何时候 都有权利去争夺锁
                     * 条件1:p == head 成立的话才有机会调用tryAcquire()尝试获取锁
                     * 条件2:tryAcquire(arg)
                     * 成立:说明head对应的线程 已经释放锁了,head.next节点对应的线程,正好获取到锁了
                     * 不成立:说明head对应的线程 还没释放锁呢,head.next仍然需要被park..
                     */
                    if (p == head && tryAcquire(arg)) {
                        // 拿到锁之后,设置头结点为当前节点
                        setHead(node);
                        // 将上个线程对应的node的next引用置为null 协助老的head出队
                        p.next = null; // help GC
                        // 当前线程获取锁的过程中 没有发生异常
                        failed = false;
                        // 返回当前线程的中断标记
                        return interrupted;
                    } 
                    /*
                     * shouldParkAfterFaildAcquire():当前线程获取锁失败后,是否需要挂起? 
                     * true -> 需要挂起 | false -> 不需要挂起
                     * -------
                     * parkAndCheckInterrupt():将当前线程挂起,被唤醒后返回中断标志位(会清除中断标志位),返回当前线程是否是被中断唤醒的。
                     * (唤醒:1.正常唤醒 其他线程unpark 2.其他线程给当前挂起的线程一个中断信号) 
                     * 当被唤醒时,继续进入自旋尝试获取锁。
                     */
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        // 如果当前线程是被中断唤醒的,就将interrupted置为ture
                        // 因为在parkAndCheckInterrupt()方法中 interrupted()方法会清除打断标记 也就是将interrupted置为false 所以要重新置为true
                        interrupted = true;
                }
            } finally {
                if (failed)
                    // 取消竞争
                    cancelAcquire(node);
            }
        }
    
    //---------------------------------------------------------------------------- 
    	// AQS.parkAndCheckInterrupt()
    	// park当前线程 将当前线程挂起,唤醒后返回当前线程 是否为 中断信号 唤醒
    	private final boolean parkAndCheckInterrupt() {
            // 使用LockSupport挂起当前线程
            LockSupport.park(this);
            // 返回当前线程是否被中断,该方法会清除线程的中断标志
            // 如果被打断过则返回true,并清除打断标记 置为false
            return Thread.interrupted();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    AQS.shouldParkAfterFailedAcquire()

    总结

    • 当前节点的前驱节点是 CANCELLED(1) 取消状态,第一次来到方法时,会将前面所有处于CANCELLED的节点全部删除,最终找到第一个状态是 0 的节点,然后将其状态设置为 -1(SIGNAL),然后继续自旋后返回 true。
    • 当前节点的前驱节点状态是 0,当前线程会设置前驱节点的状态为 -1,然后再次自旋时会返回 true。

    主要做的事就是删除当前节点前面连续的所有处于 CANCELLED 的节点找到第一个状态为 0 的节点,将其状态设置为-1(SIGNAL),然后退出。

    	/*
    	 * 总结:
         * 1.当前节点的前置节点是CANCELLED(1)取消状态,第一次来到这个方法时 会越过 取消状态的节点,第二次 会返回true 然后park当前线程。
         * 2.当前节点的前置节点状态是0,当前线程会设置前置节点的状态为-1(为了唤醒后继节点),第二次自旋来到这个方法时 会返回true 然后park当前线程。
    	 * @param pred 当前线程node的前置节点
    	 * @param node 当前线程对应的node
    	 * @return true -> 当前线程需要挂起 | false -> 当前线程不需要挂起
    	 */  
    	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            /*
             * 获取当前节点前驱节点的状态(waitStatus)
             * waitStatus = 0   初始默认状态
             *   	      > 0   表示节点时CANCELLED(1)取消状态
             * 			  = -1  SIGNAL表示当前节点释放锁后会唤醒它的第一个后继节点
             */
        	int ws = pred.waitStatus;
        	// 表示当前节点的前驱节点状态就是SIGNAL(-1),是个可以唤醒当前节点的节点,所以返回true => parkAndCheckInterrupt():park当前线程了..
            // 普通情况下,第一次来到shouldParkAfterFailedAcquire时,ws不会是-1
            if (ws == Node.SIGNAL)
                return true;
        	/*
        	 * ws > 0 表示当前节点node的前驱节点的waitStatus > 0是CANCELLED(1)取消状态,
        	 * 取消状态的节点无法唤醒后继节点,所以需要一直向前找到第一个waitStatus <= 0的节点
        	 * 在这个过程中,会将waitStatus > 0的节点全部删除
        	 */
            if (ws > 0) { 
                // 找爸爸(前置节点)的过程,条件是什么呢?前置节点的 waitStatus <= 0 的情况
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);   
                // 找到的第一个ws <= 0的节点的next指针指向当前node(相当于将之间所有状态为CANCELLED(1)取消状态的节点全部删除(出队))
                pred.next = node;
            } else {
                // 到这里 说明 ws = 0,
                // 使用CAS将当前线程node的前置节点强制设置为SIGNALE,表示前置节点释放锁之后需要唤醒我
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    unlock() 过程

    • 以 ReentrantLock 中的公平锁的 unlock() 为例。

    大致流程图

    在这里插入图片描述

    AQS.release()

    • unlock 底层调用的是 release()
        // ReentrantLock.unlock() 释放锁入口
    	public void unlock() {
            sync.release(1); // AQS提供的release()
        }
    				||
                    ||
                    \/
                    
        // AQS.release()
    	public final boolean release(int arg) {  
            /*
             * tryRelease()尝试释放锁 
             * true -> 表示当前线程已经完全释放锁 
             * false -> 表示当前线程尚未完全释放锁
             */
            if (tryRelease(arg)) {
                // 队列头结点
                /*
                 * head什么情况下会被创建出来?
                 * 当持锁线程未释放线程时,且持锁期间 有其它线程想要获取锁时,其它线程发现获取不了锁,而且队列是空队列,
                 * 此时后续线程会为当前持锁中的线程 构建出来一个head节点,然后后续线程 会追加到 head 节点后面。
                 */
                Node h = head;
                /*
                 * 条件1:h != null 成立 说明队列不为空 也就是说队列中的head节点已经初始化过了,ReentrantLock 在使用期间发生过多线程竞争了..
                 * 条件2:h.waitStatus != 0 (大概率是-1(Signal)) 条件成立,说明当前head后一定插入过node节点 可以进行唤醒节点的操作
                 */
                if (h != null && h.waitStatus != 0)
                    /*
                     * 唤醒后继节点(注意,unparkSuccessor里也有寻找后继的逻辑,即唤醒的不一定就是h.next节点)
                     */
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    ReentrantLock.tryRelease()

        // ReentrantLock.tryRelease()方法在内部类 Sync 中 继承于AQS
    	abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
            final boolean nonfairTryAcquire(int acquires) {...}
            
            @ReservedStackAccess // 这个注解的作用:它会保护被注解的方法,通过添加一些额外的空间,防止在多线程运行的时候出现栈溢出
    		protected final boolean tryRelease(int releases) {
                // c = 当前锁的状态 - 释放的锁的状态,减去释放的值 拿到最新值
                int c = getState() - releases;
                // 这里判断 当前调用释放锁的线程是否是获取锁的线程,不是的话直接抛出异常
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                // free -> 是否已经完全释放锁
                boolean free = false;
                // c == 0 表示满足完全释放锁的条件
                if (c == 0) {
                    // free 置为true
                    free = true;
                    // 设置当前独占锁的线程为null
                    setExclusiveOwnerThread(null);
                }
                // 更新state
                setState(c);
                // 返回free的值(完全释放锁返回true 反之返回false)
                return free;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    AQS.unparkSuccessor()

        // 唤醒当前节点的后继节点
    	private void unparkSuccessor(Node node) {
            // 获取当前节点的waitStatus
            int ws = node.waitStatus;
            // 小于0 就是-1(Signal) 使用CAS的方式将状态变为0
            // 改成0的原因:因为当前节点即将完成唤醒后继节点的任务了..
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);       
     		// 获取当前当前节点的后继节点
            Node s = node.next;
            /*
             * 条件1:
             * s 什么时候等于null?
             * 1.当前节点就是tail节点时 s == null
             * 2.当新节点入队未完成时(1.设置新节点的prev指向pred 2.cas设置新节点为tail 3.(未完成)pred.next -> 新节点)需要找到可以被唤醒的节点..
             * 条件2:s.waitStatus > 0 前提:s != null
             * 成立:说明 当前node节点的后继节点是 取消状态 需要找一个合适的可以被唤醒的节点
             *
             * 这里就是判断当前节点的后继节点的状态是否是0(初始状态)或者-1(Signal),
             * 不是的话,就去队列中找到一个距离当前节点最近的可以被唤醒的节点赋值给s
             */
            if (s == null || s.waitStatus > 0) {
                s = null;
            	// 在队列中找到一个距离当前节点最近的并且可以被唤醒的节点 node可能找不到(也就是s可能为null)
            	for (Node t = tail; t != null && t != node; t = t.prev)
                	/*
                	 * 状态合法,赋值给s,注意这里找到时并没有break,即最终找到的节点就是离node最近的节点(从后往前 找到最前面的node)
                	 */
                    if (t.waitStatus <= 0)
                        s = t;
            }
            // 找到了一个可以被唤醒的节点 没找到则什么也不做
            if (s != null)
                // 调用LockSupport的unpark将其唤醒
                LockSupport.unpark(s.thread);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    扩展:AQS 响应中断加锁逻辑

    • 本篇文章的前面的 加锁、解锁 部分,都是在介绍 ReentrantLock 的 lock() 普通加锁方式,这种加锁方式是不可以响应中断的,下面我们分析可以被响应中断的加锁方式 lockInterruptibly()

    ReentrantLock.lockInterruptibly() 可以被响应中断的加锁方法

        // ReentrantLock.lockInterruptibly():可以被响应中断的加锁方法
        public void lockInterruptibly() throws InterruptedException {
            // 可以被响应中断的方式去竞争资源
            sync.acquireInterruptibly(1);
        }
    
        // AQS.acquireInterruptibly():竞争资源的方法(可以被响应中断)
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            // 如果当前线程已经是有中断标记interrupted为true了,则直接抛出中断异常
            if (Thread.interrupted())
                throw new InterruptedException();
            // 尝试获取锁
            if (!tryAcquire(arg))
                // 获取锁失败,进入该方法逻辑
                doAcquireInterruptibly(arg);
        }
    
        // 也是将当前节点封装为Node,并执行入队操作
        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        // 这里直接抛出打断异常 直接响应
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    // 我们主要来分析下cancelAcquire这个方法: 取消指定node参与竞争
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    AQS.cancelAcquire() 响应中断出队逻辑

    	/*
    	 * 取消指定node参与竞争 
         */
    	private void cancelAcquire(Node node) {
    		// 判空
            if (node == null)
                return;
            // 取消node排队,直接将内部关联的线程置为null
            node.thread = null;
            // 获取当前取消排队node的前驱
            Node pred = node.prev;
            // 有可能它的前驱也处于取消状态,继续往前找 找到一个正常的Node
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;	
            /*
             * 拿到前驱节点的next节点 这里有两种情况:
             * 1.就是当前node
             * 2.可能也是ws > 0的节点
             */
            Node predNext = pred.next;
    		// 设置当前节点的状态为 CANCELLED(1)取消状态
            node.waitStatus = Node.CANCELLED;
            /*
             * 当前取消排队的node所在的队列的位置不同,执行的出队的逻辑是不一样的,一共分为三种情况:
             * CASE1:当前node是队尾,tail -> node
             * CASE2:当前node非head.next节点,也不是tail 
             * CASE3:当前node是head.next节点
             */   
            /*
             * CASE1:
             * 条件1:node == tail 成立:当前node是队尾,tail -> node,执行条件2
             * 条件2:compareAndSetTail(node, pred) 使用CAS方式将tail指向node的前驱节点,成功的话,说明修改tail完成
             */
            if (node == tail && compareAndSetTail(node, pred)) {
                // CAS修改pred.next -> null. 完成node出队
                compareAndSetNext(pred, predNext, null);       
            /*
             * CASE2:当前node非head.next节点,也不是tail
             * CASE3:当前node是head.next节点
             */
            } else {   
                // 保存节点的状态(waitStatus)
                int ws;    
                 /*
                  * 这一堆判断 判断的就是 CASE2:当前node非head.next也不是tail节点的情况
                  * pred != head 成立,说明当前node不是head.next节点,也不是tail(CASE1)
                  * 条件2:((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
                  * 2.1 成立:表示当前ndoe节点的前驱状态是signal。不成立:前驱状态可能为0,极端情况下:前驱节点也取消排队了
                  * 2.2 成立:ws <= 0 则需要设置前驱节点状态为SIGNAL(-1)状态,表示要唤醒后继节点
                  * if里面做的事,就是让pred.next -> node.next,所以需要保证pred节点状态为SIGNAL
                  */
                  if (pred != head &&
                      ((ws = pred.waitStatus) == Node.SIGNAL ||
                       (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                      pred.thread != null) {
                     /*
                      * 当前node不是head.next节点,也不是tail节点
                      * 出队:pred.next -> node.next节点后,当node.next节点被唤醒后,
                      * 调用shouldParkAfterFailedAcquire方法会让node.next越过取消状态的节点,完成真正的出队
                      */
                     Node next = node.next;
                     if (next != null && next.waitStatus <= 0)
                         // CAS设置pred.next -> node.next
                         compareAndSetNext(pred, predNext, next);
                // CASE3:当前node是head.next节点
                } else {
                    // node是head.next节点,唤醒node的后继节点,然后调用
                    // 类似CASE2,后继节点唤醒后会调用shouldParkAfterFailedAcquire方法让node.next越过取消状态的节点,与head建立双重指向的关系
                    // 假设当前head.next的后继节点就是第三个节点 则:head.next -> 第三个node 中间就是被出队的head.next 第三个node.prev -> head
                    unparkSuccessor(node);
                }
                node.next = node; // help GC
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    小总结

    • AQS 是 Java 中几乎所有锁和同步器的一个基础框架,这里说的是“几乎”,因为有极个别确实没有通过 AQS 来实现;

    • AQS 是结合 ReentrantLock 一起看的,因为里面的公平锁和非公平锁,是使用 AQS 的实现;

    • AQS 中维护了一个队列,这个队列使用双链表实现,用于保存等待锁排队的线程;

    • AQS 中维护了一个状态变量 state,在多线程并发场景下,通过控制状态变量 state 就可以实现加锁解锁操作了;

      • 判断当前持有锁的线程是否已经释放了锁,使得没有拿到锁的线程进入队列等待。
    • 然后当前线程执行完成之后,会 unpark 队列的下一个线程,使其进行工作,这样就协调了多线程场景下锁竞争的问题;

    • 本文章是基于 ReentrantLock 的公平锁观察加锁解锁流程,而 ReentrantLock 的默认实现是非公平锁

      • 非公平锁的逻辑相对简单,调用 lock() 时直接 CAS 抢锁,抢锁失败再去执行其它逻辑,不需要判断阻塞队列中有没有其它线程在等待,上来就直接 CAS 抢占锁。
      • 非公平锁的吞吐量相对与公平锁要好一些。
      	// ReentrantLock非公平锁 NonfairSync
          static final class NonfairSync extends Sync {
              private static final long serialVersionUID = 7316153563782823691L;
      
              /**
               * Performs lock.  Try immediate barge, backing up to normal
               * acquire on failure.
               */
              final void lock() {
                  if (compareAndSetState(0, 1))
                      setExclusiveOwnerThread(Thread.currentThread());
                  else
                      acquire(1);
              }
              // acquire方法 调用 tryAcquire方法
              protected final boolean tryAcquire(int acquires) {
                  return nonfairTryAcquire(acquires);
              }
          }
      	// ReentrantLock内部类 Sync
      	abstract static class Sync extends AbstractQueuedSynchronizer {
              ...
              /**
               * Performs non-fair tryLock.  tryAcquire is implemented in
               * subclasses, but both need nonfair try for trylock method.
               */
              final boolean nonfairTryAcquire(int acquires) {
                  final Thread current = Thread.currentThread();
                  int c = getState();
                  if (c == 0) {
                      // 不需要关心阻塞队列中是否有线程在等待,直接抢锁
                      if (compareAndSetState(0, acquires)) {
                          setExclusiveOwnerThread(current);
                          return true;
                      }
                  }
                  else if (current == getExclusiveOwnerThread()) {
                      int nextc = c + acquires;
                      if (nextc < 0) // overflow
                          throw new Error("Maximum lock count exceeded");
                      setState(nextc);
                      return true;
                  }
                  return false;
              }
              ...
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47

    AQS 独占模式情景分析

    image-20221026195413896

    image-20221027144053765


    参考

  • 相关阅读:
    瑞吉外卖之移动端菜品数据的展示
    abstract抽象 与interface接口
    【Java】Vector 与 ArrayList 集合区别
    arcgis中坡向计算工作原理说明
    Linux之VFS之美
    一本通1084;幂的末尾
    rabbitMq死信队列
    Centos(Linux)服务器安装Dotnet8 及 常见问题解决
    Spring AOP:原理与示例代码
    MySQL: String 字符串相关函数整理
  • 原文地址:https://blog.csdn.net/weixin_53407527/article/details/127948836