• Java多线程之:队列同步器AbstractQueuedSynchronizer原理剖析


    Java多线程之:队列同步器AbstractQueuedSynchronizer原理剖析

    AQS (AbstractQueuedSynchronizer,抽象队列同步器) 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLock,Semaphore,其他的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等皆是基于 AQS 的。可见,AQS是我们理解其他Java JUC并发容器的基础。

    一、AQS的核心思想

    AQS 核心思想是为了解决多线程环境下对共享资源的请求同步问题。对于请求的共享资源,如果资源存在空闲,则当前请求获取到对应资源正常执行,否则,会转换为阻塞状态,直到有资源被释放后,由工作线程来唤醒。

    AQS借助CLH队列锁(由于是 Craig、Landin 和 Hagersten三位大佬的发明,因此命名为CLH锁。)来实现上述的线程阻塞等待功能。这里CLH锁是一种自旋锁,当多线程竞争一把锁时,获取不到锁的线程,会排队进入CLH队列的队尾,然后自旋等待,直到其前驱线程释放锁。CLH队列锁的好处是唤醒线程时避免了惊群效应,因为只会唤醒当前线程的下一个等待线程。

    除了CLH队列外,AQS中还有一个很重要的队列:Condition条件队列Condition声明了一组等待/通知的方法,这些方法的功能与Object中的wait/notify/notifyAll等方法相似,只不过Condition 中的方法则要配合锁对象使用,并通过newCondition方法获取实现类对象。注意Condition对象只能在独占锁中才能使用。

    下图给出了AQS组件的结构框架图:

    在这里插入图片描述

    AQS对资源的共享方式包括两种:

    • Exclusive(独占):只允许一个线程能执行,且忽略中断,比如ReentrantLock。又可以分为公平锁和非公平锁
      • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
      • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
    • Share(共享):非排他性锁。多个线程可同时向共享资源申请资源,直到没有可用资源时,对应线程才阻塞,如 Semaphore/CountDownLatch。SemaphoreCountDownLatCh、 CyclicBarrier、ReadWriteLock 等。

    二、AQS中关键的内部结构

    一、Node内部类

    AQS内部维护着一个FIFO的队列,该队列就是用来实现线程的并发访问控制。队列中的元素是一个Node类型的节点,Node的主要属性如下:

    static final class Node {
        int waitStatus;
        Node prev;
        Node next;
        Node nextWaiter;
        Thread thread;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    各属性表示的含义如下:

    • waitStatus:表示节点的状态,其中包含的状态有:
      • CANCELLED:值为1,表示当前节点已取消调度(当timeout或被中断,会触发变更为此状态)
      • SIGNAL:值为-1,表示当前节点的的后继节点将要或者已经被阻塞,在当前节点释放的时候需要unpark后继节点。后继节点入队时,会将前驱节点的状态更新为SIGNAL。
      • CONDITION:值为-2,表示当前节点在condition队列中等待,当其他线程调用了Condition的signal()方法后,CONDITION状态的节点将从Condition等待队列中转移到同步队列中,等待获取资源。
      • PROPAGATE:值为-3,表示releaseShared需要被传播给后续节点(仅在共享模式下使用)。即前驱节点不仅会唤醒其后驱节点,同时也可能会唤醒后驱的后驱节点,所以叫传播。
      • 0:无状态,表示当前节点在队列中等待获取锁。
    • prev:CLH队列中的前继节点
    • next:CLH队列中的后继节点
    • nextWaiter:存储Condition条件队列的后继节点
    • thread:当前线程

    其中,CLH队列里还有一个head节点和一个tail节点,分别表示头结点和尾节点,其中头结点是空节点,本身不存储Thread,仅保存next结点的引用。

    二、CLH队列

    CLH队列用来存储Node节点封装的线程阻塞队列。每个获取资源失败的线程,根据先来后到,利用CAS原子操作依次插入到CLH队列的尾部,等待被唤醒获取资源。为了便于尾节点插入数据,CLH队列做成了双端队列,即包含head和tail的FIFO队列。因此可以说,AQS实现多线程间对共享资源的同步访问,就是依靠CLH队列来实现的。

    CLH队列每次unpark解除线程阻塞时,都会从队列的头节点来选择线程进行接触阻塞。

    CLH每一个线程都是一个自旋锁,非常消耗CPU。等待锁的每个线程在自己的某个变量上自旋等待,这个变量指向自己的前驱节点中的变量,通过不断地自旋,感知到前驱节点的变化后成功获取到锁。

    我们可以看到公平锁就是最初的实现理念就是CLH队列。

    三、同步状态 state

    在AQS中维护了一个同步状态变量state,用来表示同步状态。

    不同线程获取和释放资源时,都会对state进行更新。比如state > 0表示可以获取共享资源,否则无法获取。该变量对不同的子类实现具有不同的意义。

    比如,对ReentrantLock来说,它表示加锁的状态:

    • 无锁时state=0,有锁时state>0;
    • 第一次加锁时,将state设置为1;
    • 由于ReentrantLock是可重入锁,所以持有锁的线程可以多次加锁,经过判断加锁线程就是当前持有锁的线程时(即exclusiveOwnerThread==Thread.currentThread()),即可加锁,每次加锁都会将state的值+1,state等于几,就代表当前持有锁的线程加了几次锁;
    • 解锁时每解一次锁就会将state减1,state减到0后,锁就被释放掉,这时其它线程可以加锁;
    • 当持有锁的线程释放锁以后,如果是等待队列获取到了加锁权限,则会在等待队列头部取出第一个线程去获取锁,获取锁的线程会被移出队列;

    其他

    • ReentrantReadWriteLock 的 state 高 16 位代表读锁状态,低 16 位代表写锁状态
    • Semaphore 的 state 用来表示可用信号的个数
    • CountDownLatch 的 state 用来表示计数器的值

    state变量定义如下:

    /**
     * The synchronization state.
     */
    private volatile int state;
    
    • 1
    • 2
    • 3
    • 4

    四、Condition条件队列

    Object 的 wait、notify 函数是配合 Synchronized 锁实现线程间同步协作的功能,AQS 的 ConditionObject 条件变量也提供这样的功能,通过 ConditionObject 的 await 和 signal 两类函数完成。

    ConditionObject是通过基于单链表的条件队列来管理等待线程的。线程在调用await方法进行等待时,会释放同步状态。同时线程将会被封装到一个等待节点中,并将节点置入条件队列尾部进行等待。当有线程在获取独占锁的情况下调用signal或singalAll方法时,队列中的等待线程将会被唤醒,重新竞争锁。另外,需要说明的是,一个锁对象可同时创建多个 ConditionObject 对象,这意味着多个竞争同一独占锁的线程可在不同的条件队列中进行等待。在唤醒时,可唤醒指定条件队列中的线程。

    不同于 Synchronized 锁,一个 AQS 可以对应多个条件变量,而 Synchronized 只有一个。

    在这里插入图片描述

    如上图所示,AQS中维护这两个队列:CLH队列和Condititon条件队列。Condition条件队列时单向队列,利用上述提及的nextWaiter指针指向下一个节点。Condition队列只入队执行await的线程节点,并等待被调用singnal方法唤醒。从Condition队列中被唤醒的线程会入队到CLH队列。过程类似于Object类的wait/notify原理过程。

    三、AQS同步机制的实现源码解析

    AQS针对上述提及的资源共享的两种不同模式,利用不同的方式进行实现:

    独占式

    • acquire():获取资源
    • release():释放资源

    共享式

    • acquireShard:共享模式下获取资源
    • releaseShard:共享模式下释放资源

    一、AQS获取独占锁的实现

    acquire方法

    acquire是AQS中的方法,代码如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    该方法主要工作如下:

    • 执行 tryAcquire 函数,tryAcquire 是由子类实现,代表获取资源是否成功,如果资源获取失败,执行下面的逻辑
    • 执行 addWaiter 函数(前面已经介绍过),根据当前线程创建出独占式节点,并入队 CLH 队列
    • 执行 acquireQueued 函数,自旋阻塞等待获取资源
    • 如果 acquireQueued 函数中获取资源成功,根据线程是否被中断状态,来决定执行线程中断逻辑

    在这里插入图片描述

    图中比较重要的几个方法:

    addWaiter方

    看下addWaiter方法的定义:

    private Node addWaiter(Node mode) {
        // 根据当前线程创建一个Node对象
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 判断tail是否为空,如果为空表示队列是空的,直接enq
        if (pred != null) {
            node.prev = pred;
            // 这里尝试CAS来设置队尾,如果成功则将当前节点设置为tail,否则enq
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    该方法核心逻辑就是将当前线程封装为一个Node,然后利用CAS操作添加到队列尾部。

    acquireQueued方法

    addWaiter()方法把Thread对象加⼊阻塞队列之后的⼯作就要靠acquireQueued()方法完成。线程⼀旦进⼊acquireQueued()就会被无限期阻塞,即使有其他线程调用interrupt()方法也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued()返回。此时会删除队列的第⼀个元素(head指针前移1个节点)。
    该方法的功能是循环的尝试获取锁,直到成功为止,最后返回中断标志位。

    /**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            // 异常状态,默认是true
            boolean failed = true;
            try {
                // 表示当前线程是否被中断过,默认是否
                boolean interrupted = false;
                // 一直自旋
                for (;;) {
                    // 获取node节点的前驱节点
                    final Node p = node.predecessor();
                    // 如果前驱节点是首节点,说明当前线程所在的节点是列表中第一个节点,则获取资源成功
                    if (p == head && tryAcquire(arg)) {
                        // 获取资源成功,设置当前节点为头节点,清空当前节点的信息
                        setHead(node);
                        // 原来首节点的next置为null,这样该节点就变成一个孤立的节点,有利于下次GC时,比如使用标记清除法遍历时快速GC掉
                        p.next = null; // help GC
                        // 非异常状态,防止指向finally逻辑
                        failed = false;
                        // 返回线程中断状态
                        return interrupted;
                    }
                    /**
                     * 如果前驱节点不是首节点,先执行shouldParkAfterFailedAcquire函数,
                     * shouldParkAfterFailedAcquire做了三件事
                     * 1.如果前驱节点的等待状态是SIGNAL,返回true,执行parkAndCheckInterrupt函数,返回false
                     * 2.如果前驱节点的等大状态是CANCELLED,把CANCELLED节点全部移出队列(条件节点)
                     * 3.以上两者都不符合,更新前驱节点的等待状态为SIGNAL,返回false
                     */
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        //使用LockSupport类的静态方法park挂起当前线程,直到被唤醒,
                        // 唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                // 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    acquireQueued()方法有⼀个返回值,表示什么意思呢?虽然该方法不会中断响应,但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该方法会返回true;否则,返回false。

    基于这个返回值,也就能理解上面acquire方法的selfInterrupt();的作用:
    acquireQueued()返回 true 时,会调用 selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿。这样⼀来,如果该线程在lock代码块内部有调⽤sleep()之类的阻塞方法,就可以抛出异常,响应该中断信号。

    阻塞就发生在下面这个方法中

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
       
    
    • 1
    • 2
    • 3
    • 4
    • 5

    线程调用park()方法,自己把自己阻塞起来,直到被其他线程唤醒,该方法返回。
    park()方法返回有两种情况。

    1. 其他线程调用了unpark(Thread t)
    2. 其他线程调用了t.interrupt()。这里要注意的是, lock()不能响应中断,但LockSupport.park()会响应中断。

    也正因为LockSupport.park()可能被中断唤醒, acquireQueued()方法才写了⼀个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁。如果拿不到锁,则再次自己阻塞自己。不断重复此过程,直到拿到锁。

    被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,会返回false,如果是情况2,则返回true。

    acquireQueued整体流程如下图所示:

    在这里插入图片描述

    二、AQS释放独占锁的实现

    AQS使用release()方法来对资源进行释放,release方法中,唤醒线程时会首先唤醒CLH队列中的队头节点所对应的线程,代码如下:

    public final boolean release(int arg) {
        	// 释放资源成功,tryRelease子类实现
            if (tryRelease(arg)) {
                // 获取头部线程节点
                Node h = head;
                // 首节点不为null,并且等待状态不为0
                if (h != null && h.waitStatus != 0)
                    // 唤醒CLH队列队首节点对应的线程(首节点的下一个节点)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
     private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
         	// 获取节点等待状态
            int ws = node.waitStatus;
            if (ws < 0)
                // CAS操作更新节点状态为0
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
         	// s指向队首第一个节点(不包括首节点)
            Node s = node.next;
         	// 如果队首节点状态为CANCELLED,则从队尾开始循环向前,直到获取到第一个正常节点为止
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                // 唤醒队首节点对应线程
                LockSupport.unpark(s.thread);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    释放流程如下所示:
    在这里插入图片描述

    三、AQS获取共享锁的实现

    acquireShared 是个模板函数,模板流程就是线程获取共享资源,如果获取到资源,线程直接返回,否则进入 CLH 队列,直到获取到资源为止,且整个过程忽略中断的影响,acquireShared 函数代码如下:

        /**
         * Acquires in shared mode, ignoring interrupts.  Implemented by
         * first invoking at least once {@link #tryAcquireShared},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquireShared} until success.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquireShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         */
        public final void acquireShared(int arg) {
            /**
            	1. 负数表示失败
            	2. 0表示成功,但没有剩余可用资源
            	3. 正数表示成功且有剩余资源
            */
            // <0表示获取资源失败
            if (tryAcquireShared(arg) < 0)
                // 自旋阻塞等待获取资源
                doAcquireShared(arg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    doAcquireShared 函数与独占式的 acquireQueued 函数逻辑基本一致,区别在于:

    • addWaiter方法:在共享锁的方式中,创建节点的方式是:final Node node = addWaiter(Node.SHARED);这里,Node node = new Node(Thread.currentThread(), mode);表明添加了一个共享式节点,节点标记是共享式,并入队CLH队列。
    • setHeadAndPropagate(node, r)方法:设置自己尾队列头节点,并尝试唤醒后继节点。即获取资源成功时,还会尝试唤醒后继资源,因为资源数可能>0,代表还有资源可获取,所以需要做后续线程节点的唤醒。

    四、AQS释放共享锁的实现

    同样的,AQS中提供了releaseShared方法来释放共享资源,唤醒CLH队列的头节点的下一个节点,也就是CLH队列中第一个等待线程所对应的节点。

    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {//释放资源成功,tryReleaseShared子类实现
                //唤醒后继节点
                doReleaseShared();
                return true;
            }
            return false;
        }
        
    private void doReleaseShared() {
            for (;;) {
                //获取头节点
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
        
                    if (ws == Node.SIGNAL) {//如果头节点等待状态为SIGNAL
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//更新头节点等待状态为0
                            continue;            // loop to recheck cases
                        //唤醒头节点下个线程节点
                        unparkSuccessor(h);
                    }
                    //如果后继节点暂时不需要被唤醒,更新头节点等待状态为PROPAGATE
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;               
                }
                if (h == head)              
                    break;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    与独占式释放资源区别不大,都是唤醒头节点的下个节点,就不做过多描述了。

    总结

    本文从独占锁的实现出发,比较完整的分析了AQS内部独占锁和共享锁的实现。总体来说实现的思路很清晰,就是使用了标志位+队列的方式来处理锁的状态,包括锁的获取,锁的竞争以及锁的释放。在AQS中,利用CLH队列和Condition队列来维护多线程队共享资源的同步访问,state可以表示锁的数量,也可以表示其他状态,state的含义由子类去定义,自己只是提供了对state的维护。AQS通过state来实现线程对资源的访问控制,而state具体的含义要在子类中定义。

    对于 AbstractQueuedSynchronizer 的分析,最核心的就是 sync queue 的分析。

    • 每一个结点都是由前一个结点唤醒
    • 当结点发现前驱结点是 head 并且尝试获取成功,则会轮到该线程运行。
    • condition queue 中的结点向 sync queue 中转移是通过 signal 操作完成的。
    • 当结点的状态为 SIGNAL 时,表示后面的结点需要运行。
  • 相关阅读:
    OpenHarmony Axios组件使用过程中,Api9不适配问题
    C++ 虚析构函数的作用?
    动态规划01背包问题
    【证明】线性映射不影响向量组的线性组合
    数据结构--动态储存顺序表(含完整代码)
    Bit.Store:熊市漫漫,稳定Staking产品或成主旋律
    【论文阅读】AttnDreamBooth | 面向文本对齐的个性化图片生成
    缩短从需求到上线的距离:集成多种工程实践的稳定框架 | 开源日报 No.55
    【C++初阶】C++STL详解(四)—— vector的模拟实现
    Mendix:企业成功执行数字化转型的9个因素
  • 原文地址:https://blog.csdn.net/Urbanears/article/details/128177063