• 【JAVA并发】AQS原理详解


    1. 概述

    • AQS全称为 AbstractQueuedSynchronizer,是Java的 juc 当中的一个抽象类。

    • AQS是一个用于构建同步器协作工具类的工具类(框架)。有了AQS之后,更多的协作工具类都可以方便得被写出来。

    • 控制并发流程的类,都需要线程的 等待唤醒 功能,这是这些类的共同特点,因此可以抽象出一个基类,这就是AQS。

    • AQS广泛应用于控制并发流程的类中,如 CountDownLatchSemaphoreReentrantLockReentrantReadWriteLock 等。

    • 其中,sync 是这些类中都有的内部类,其结构如下图,可以看到Sync类是AQS的实现类。

    • AQS 主要完成的任务:
      1)同步状态(比如说计数器)的原子性管理;
      2)现成的阻塞和唤醒;
      3)等待队列的管理;

    2. AQS原理

    AQS最核心的就是3个部分:
    1)状态:state;
    2)控制线程抢锁和配合FIFO队列(双向链表);
    3)期望协作工具类去实现的获取/释放等重要方法(重写);

    2.1 状态state

    这里state的具体含义,会根据具体实现类的不同而不同:

    • Semaphore里,它表示剩余许可证的数量
    • CountDownLatch里,它表示还需要倒数的数量
    • ReentrantLock里,state用来表示锁的占有情况,包括可重入计数,当state的值为0时,表示该Lock不被任何线程所占有。

    state是volatile修饰的,并被并发修改,所以修改state的方法都需要保证线程安全,比如getStatesetState以及compareAndSetState操作来读取和更新这个状态。

    getStatesetState是直接操作的state变量,volatile能够保证操作的原子性,compareAndSetState依赖于Unsafe类(Unsafe类是sun.misc包下的类,能够直接操作内存,操作成功返回true,操作失败返回false,能够保证原子性)。


    2.2 FIFO队列

    这个队列用来存放等待的线程,AQS就是“排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当释放时,锁管理器就会挑选一个合适的线程来占用这个刚刚释放的锁;

    AQS会维护一个等待的线程队列,把线程都放到这个队列里,并且这个队列是双向链表形式的。


    2.3 实现获取/释放等方法

    这里的 获取释放 方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且根据各自的功能含义不相同;

    获取方法:获取state变量的操作,经常会被阻塞(比如获取不到锁的情况)

    • Semaphore中,获取就是acquire方法,作用是获取一个许可证;
    • 而在CountDownLatch里面,获取就是await方法,作用是等待,知道倒数结束。

    释放方法

    • Semaphore中,释放就是release方法,作用是释放一个许可证;
    • CountDownLatch里面,获取就是countDown方法,作用是将倒数的数减一;

    需要每个实现AQS的类重写tryAcquiretryRelease等方法。


    3. AQS源码分析

    AQS在juc中用法套路:

    • 第一步:写一个类,想好协作的逻辑,实现获取/释放方法;
    • 第二步:类的内部写一个Sync类,继承 AbstractQueuedSynchronizer 类;
    • 第三步:根据是否独占来重写 tryAcquire / tryReleasetryAcquireShared(int acquired) / tryReleaseShared(int releases) 等方法在之前写的 获取/释放 方法中调用AQS的 acquire/release 或者 acquireShared/releaseShared 方法。

    (AQS中的acquire/release方法调的是tryAcquire/tryRelease,AQS的acquireShared/releaseShared方法调的是tryAcquireShared/tryReleaseShared方法,但这两个方法在AQS中没有具体实现,是在具体的类中实现的,因此调的时候会根据类动态绑定到对应实现类的方法)

    1)AQS中 acquire/release方法实现的是独占锁,调tryAcquire/tryRelease方法,这两个方法在AQS中没有实现,需要在实现类中去具体实现,调用时会动态绑定到具体的实现类重写的方法。

    2)AQS中 acquireShared/releaseShared 方法实现的是共享锁,调 tryAcquireShared/tryReleaseShared 方法,同上,也是在实现类中具体实现的,调用时再动态绑定。


    4. AQS在juc中各类工具中的使用分析

    4.1 AQS在CountDownLatch中的应用

    CountDownLatch是共享锁

    新建一个 CountDownLatch 对象

    CountDownLatch cdl = new CountDownLatch(3);
    
    • 1

    会在对象cdl的内部创建一个Sync的对象,并将 this.sync中的state设置为3,代表要倒数3次。

    当调用cdl.await()时,就会进行如下调用

    调了AQS的tryAcquireShared方法,这里会动态绑定到CountDownLatch中的Sync类中的tryAcquireShared方法。

    可以看到,这里state设置为了3,导致程序进入到acquireSharedInterruptibly方法中,首先会阻塞当前线程。

    当调用 countDown() 函数,会使得state–,当state值变成0后,就会唤醒等待队列中的第一个线程。

    其中,countDown() 函数具体如下所示,调用AQS的releaseShared方法,AQS再调tryReleaseShared方法,动态绑定到CountDownLatch中所实现的tryReleaseShared方法。



    4.2 Semaphore

    Semaphore是共享锁

    新建一个 Semaphore 对象时:

    Semaphore s = new Semaphore(3);
    
    • 1

    会在 s 内部创建一个 NonfairSync 对象,其继承自 Sync 类,并将 this.sync 中的 state 值设置为3,代表有3个通行证。

    当我们调用s.acquire(2),会获取到两个许可证,分析如下:
    acquire方法会调AQS中的acquireSharedInterruptibly方法,

    AQS中的acquireSharedInterruptibly方法会调AQS中的tryAcquireShared方法,会被动态绑定到Semaphore中的静态内部类NonfairSync中实现的tryAcquireShared方法。

    NonfairSync中的tryAcquireShared又调了AQS中的doAcquireSharedInterruptibly。可以看到当请求的通行证数大于state的值时,返回的是负数,那么前面的acquireSharedInterruptibly方法就会进入到doAcquireSharedInterruptibly方法,就会将当前线程阻塞并放入等待队列中。当通行证数量足够时才会唤醒该线程。

    当调用s.release(2)时,会归还2个许可证,分析如下:

    会调AQS的releaseShared方法

    让后调AQS的tryReleaseShared方法,会被动态绑定到Semaphore的tryReleaseShared方法,该方法将释放的通行证数加回到state

    当调s.release()时,分析如下:
    会调Sync的releaseShared方法

    然后调AQS的releaseShared方法

    然后调AQS的tryReleaseShared方法

    Semaphore中的Sync重写了tryReleaseShared方法,就会动态绑定到Semaphore中的tryReleaseShared

    之后就会调AQS中的doReleaseShared方法,跟前面调s.acquire()方法时的类似,尝试唤醒等待队列中的线程。


    AQS中关键代码阅读

    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
        final AbstractQueuedSynchronizer.Node node = addWaiter(AbstractQueuedSynchronizer.Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final AbstractQueuedSynchronizer.Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        /*
                         * 进入到这里,说明当前线程获取到了通行证,通行证有可能还有多的,
                         * 就需要看看能不能唤醒接下来的线程
                         */
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private void setHeadAndPropagate(AbstractQueuedSynchronizer.Node node, int propagate) {
        AbstractQueuedSynchronizer.Node h = head; // Record old head for check below
        /*
         * 在尝试唤醒下一个线程时,把等待队列的头结点设置为当前线程
         * 即把当前线程从等待队列中去掉
         */
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            AbstractQueuedSynchronizer.Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            AbstractQueuedSynchronizer.Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == AbstractQueuedSynchronizer.Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, AbstractQueuedSynchronizer.Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    /*
                     * 将下一个线程的状态由挂起态设置为非挂起,也就是唤醒了线程,能够再被调度获取到CPU
                     * 这样这个线程就会去尝试调acquire()方法获取通行证
                     * 由doAcquireSharedInterruptibly的代码可知,只有当获取到了通行证后才会重新设置等待队列的head
                     * 也就是只有当被唤醒的线程获取到了通信证,它才会被从等待队列中移除
                     * 否则,仍然在原来的位置,又被挂起了,只有等到它前面那个线程释放了通行证,才会再去尝试唤醒等待队列中的线程
                     * 且,每次唤醒的都是等待队列中的第一个,如果通行证还有剩余,则会一直传递下去尝试唤醒线程
                     */
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, AbstractQueuedSynchronizer.Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    4.3 ReentrantLock

    ReentrantLock是独占锁

    新建一个ReentrantLock对象(需要注意ReentrantLock有公平锁和非公平锁之分,默认是非公平锁,公平锁需要再构造函数中传一个true):

    ReentrantLock lock = new ReentrantLock();
    
    • 1

    会在lock内部创建一个NonfairSync类的对象sync(继承自Sync),其调的是默认构造函数,也就是state的初始值为0。

    当我们调用lock.lock()时,分析如下
    首先调ReentrantLock中的lock方法

    让后调Sync中的接口lock

    当是非公平锁时,syncNonfairSync类的实例,因此非公平锁调lock()函数也就会动态绑定到NonfairSync中的lock函数

    当第一个线程到达时,通过CAS检查state的值(即第一个线程会把state的值设置为1,后面再到来的线程就无法进入了,要注意debug时state的值是不准确的,有缓存),设置该lock对象被哪个线程占有。

    当非独占该锁的线程进入后,接着调AQS中的acquire函数

    然后就会调AQS中的tryAcquire函数,会被动态绑定到ReentrantLock中实现的tryAcquire函数

    接着调ReentrantLock的nonfairTryAcquire,从nonfairTryAcquire函数可以看到,占有该锁的线程进入后会返回true,就不会执行后面的阻塞线程的操作了,并且当不是第一次进入时,每进入一次state的值都会被加1,就是为了实现同一个线程的 可重入 ;非占有该锁的线程进入后直接返回false,接着执行后面的阻塞线程操作。

    未获取到锁,就会进入到AQS的acquireQueued函数。只有当当前线程获取到了锁才会退出这个死循环。

    lock.unlock()的执行分析:

    接着调AQS的release函数

    接着调AQS的tryRelease函数,然后动态绑定到ReentrantLock中的tryRelease函数上

    释放锁后当state的值变成0后就会唤醒等待队列中的线程,然后调AQS的unparkSuccessor函数,看它们是否能够获取到刚刚被释放的锁。

    公平锁与非公平锁
    公平锁每次获取到锁为同步队列中的第一个节点,保证请求资源时间上的绝对顺序(强制保证每次都是等待队列中的第一个线程),而非公平锁有可能刚释放锁的线程下次继续获取该锁(刚释放锁的线程或者新来的线程与释放锁时被唤醒的等待队列中的第一个线程竞争锁,有可能被唤醒的线程没有竞争赢,而在公平锁中,刚释放锁的线程和新来的线程都要到等待队列后面去排队),则有可能导致其他线程永远无法获取到锁,造成“饥饿”现象。


    5. AQS总结

    从上面的3个同步工具类的分析可以看到,AQS是一个构建JAVA同步工具的框架

    控制并发的工具类,都需要修改同步变量以达到控制并发流程的目的,这些共同的功能被写到了AQS类中:共享锁的acquire() / release() 和 独占锁的 acquireShared() / releaseShared();然后分别调AQS中的 tryAcquire() / tryRelease()tryAcquireShared() / tryReleaseShared(),这些函数在AQS中都没有被具体实现。

    各个工具类中都自己写了一个Sync,公平锁和非公平锁还由Sync派生出了FairSyncNonfairSync,在这里面详细地写了 tryAcquire() / tryRelease()tryAcquireShared() / tryReleaseShared() 的控制流程。

    同时,等待队列的管理,线程的阻塞和唤醒,这些共同功能都被抽取出来写入到了AQS中。


    6. 参考文献

    [1] https://blog.csdn.net/weixin_42638946/article/details/120252441。
    [2] https://www.jianshu.com/p/15f33406543b。

  • 相关阅读:
    AtomicLong与LongAdder(上)
    springboot毕设项目大学生档案管理系统6d4gz(java+VUE+Mybatis+Maven+Mysql)
    工程机械比例阀电流采集方案
    【数据测试】之后端(一)
    leetcode做题笔记189. 轮转数组
    26-38-javajvm-类加载器子系统
    Photoshop制作简洁清新的插画海报图片
    Android 布局优化,看过来 ~
    [附源码]java毕业设计基于的高校学生考勤管理系统
    C++之特殊数据成员
  • 原文地址:https://blog.csdn.net/qq_27198345/article/details/126551162