• 从ReentrantLock来学习AQS


    众所周知,AQS是Java用来构建同步工具的基本组件,我们常用的ReentrantLock,Semaphore等同步器都是基于AQS来构建的,这里我们从ReentrantLock这个最简单的同步锁来入手,学习AQS的基本思想。JDK源码基于JDK17,对比一下与JDK1.8在细节上有一些不同。

    这里假定各位对AQS有一些基本了解,不再对AQS的原理进行介绍。

    ReentrantLock#lock()

    		@ReservedStackAccess
            final void lock() {
                if (!initialTryLock())
                    acquire(1);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    initialTryLock方法会尝试去获取锁,这里公平锁和非公平锁有一些不同,公平锁会先判断当前队列是否为空,为空才会尝试获取

            // NonfairSync
    		final boolean initialTryLock() {
                Thread current = Thread.currentThread();
                if (compareAndSetState(0, 1)) { // first attempt is unguarded
                    setExclusiveOwnerThread(current);
                    return true;
                } else if (getExclusiveOwnerThread() == current) {
                    int c = getState() + 1;
                    if (c < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(c);
                    return true;
                } else
                    return false;
            }
    
    
            //FairSync
    		final boolean initialTryLock() {
                Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                } else if (getExclusiveOwnerThread() == current) {
                    if (++c < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(c);
                    return true;
                }
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    由于ReentrantLock是可重入锁,在获取成功之后需要对state +1,同样在release时也要做反操作。

    在第一次尝试获取锁失败后,acquire(1)这个方法就正式进入AQS的流程了。

    public final void acquire(int arg) {
            if (!tryAcquire(arg))
                acquire(null, arg, false, false, false, 0L);
        }
    
    • 1
    • 2
    • 3
    • 4

    这里还是先尝试获取锁,tryAcquire内做的事情和initialTryLock差不多,也是有公平锁和非公平锁两个实现。

    然后就是重头戏了

    final int acquire(Node node, int arg, boolean shared,
                          boolean interruptible, boolean timed, long time){
    		Thread current = Thread.currentThread();
            byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
            boolean interrupted = false, first = false;
            Node pred = null;                // predecessor of node when enqueued
    
            /*
             * Repeatedly:
             *  Check if node now first
             *    if so, ensure head stable, else ensure valid predecessor
             *  if node is first or not yet enqueued, try acquiring
             *  else if node not yet created, create it
             *  else if not yet enqueued, try once to enqueue
             *  else if woken from park, retry (up to postSpins times)
             *  else if WAITING status not set, set and retry
             *  else park and clear WAITING status, and check cancellation
             */
    
            for (;;) {...}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    这个方法比较长,里面各种条件也是眼花缭乱,这里将代码拆分逐一介绍,各位在看这部分源码时也可以参考JDK1.8的版本,相对来说更加易懂。

    第一步:在入队之前,尝试获取锁

    			if (first || pred == null) {
                    boolean acquired;
                    try {
                        if (shared)
                            acquired = (tryAcquireShared(arg) >= 0);
                        else
                            acquired = tryAcquire(arg);
                    } catch (Throwable ex) {
                        cancelAcquire(node, interrupted, false);
                        throw ex;
                    }
                    if (acquired) {
                        if (first) {
                            node.prev = null;
                            head = node;
                            pred.next = null;
                            node.waiter = null;
                            if (shared)
                                signalNextIfShared(node);
                            if (interrupted)
                                current.interrupt();
                        }
                        return 1;
                    }
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    看这里的条件,如果是头结点或者前置节点为空,会尝试获取锁,因为我们当前还没有创建节点,也没有入队,所以pred == null为true。

    第二步:如果获取锁失败,创建节点并插到队尾

    			if (node == null) {                 // allocate; retry before enqueue
                    if (shared)
                        node = new SharedNode();
                    else
                        node = new ExclusiveNode();
                } else if (pred == null) {          // try to enqueue
                    node.waiter = current;
                    Node t = tail;
                    node.setPrevRelaxed(t);         // avoid unnecessary fence
                    if (t == null)
                        tryInitializeHead();
                    else if (!casTail(t, node))
                        node.setPrevRelaxed(null);  // back out
                    else
                        t.next = node;
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    这里其实留了一个小trick,可以看到创建节点和入队并不是在同一次循环中完成的,也就是说在成功入队之前,每一次循环都会走到第一步中尝试获取锁,这里也是JDK17与JDK1.8的不同

    第三步:

    if (!first && (pred = (node == null) ? null : node.prev) != null &&
                    !(first = (head == pred))) {
                    if (pred.status < 0) {
                        cleanQueue();           // predecessor cancelled
                        continue;
                    } else if (pred.prev == null) {
                        Thread.onSpinWait();    // ensure serialization
                        continue;
                    }
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这里的if看起来非常复杂,其实主要是判断以下几个条件:

    • 当前节点不是头结点
    • 前置节点不为空
    • 前置节点不是头结点

    在这过程中还顺带做了以下两件事:

    • 对pre赋值
    • 对first赋值

    理完这个if,再看里面做了什么。

    1. 如果前置的状态为0(表示前置节点已经取消),需要调用cleanQueue对队列进行清理。在cleanQueue里会从队尾循环查找是否有cancelled的节点,然后唤醒下一个最有可能获取锁的节点。
    2. 如果前置的前置为空,表明很快就能够获取到锁,使用Thread.onSpinWait()进行自旋。这里也是AQS为什么要使用双向队列的一个原因。

    第四步:设置状态

    新建的节点status总是0

    			else if (node.status == 0) {
                    node.status = WAITING;          // enable signal and recheck
                }
    
    • 1
    • 2
    • 3

    第五步:将线程挂起

    			else {
                    long nanos;
                    spins = postSpins = (byte)((postSpins << 1) | 1);
                    if (!timed)
                        LockSupport.park(this);
                    else if ((nanos = time - System.nanoTime()) > 0L)
                        LockSupport.parkNanos(this, nanos);
                    else
                        break;
                    node.clearStatus();
                    if ((interrupted |= Thread.interrupted()) && interruptible)
                        break;
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    通过LockSupport.park或者LockSupport.parkNanos方法挂起当前线程,直到:

    • 别的线程调用了unpark方法
    • 别的线程中断了该线程
    • 等待时间到期(LockSupport.parkNanos)
    • The call spuriously (that is, for no reason) returns没看懂是什么意思

    当前线程被唤醒之后,会将状态重新置为0
    在这一过程中,如果等待超时或者线程被中断,将跳出循环进入到cancelAcquire流程,这里暂且不管。

    第六步:再次自旋

    如果前置节点为头节点时,first会在上一步置为true

    else if (first && spins != 0) {
                    --spins;                        // reduce unfairness on rewaits
                    Thread.onSpinWait();
                }
    
    • 1
    • 2
    • 3
    • 4

    spins这个值是在第四步park时右移计算得到的

    最后会在 3->1->4->5->6步骤之间循环,直至获取锁,然后将当前节点置为头结点。

    ReentrantLock#unlock()

    	public final boolean release(int arg) {
            if (tryRelease(arg)) {
                signalNext(head);
                return true;
            }
            return false;
        }
    
    		protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (getExclusiveOwnerThread() != Thread.currentThread())
                    throw new IllegalMonitorStateException();
                boolean free = (c == 0);
                if (free)
                    setExclusiveOwnerThread(null);
                setState(c);
                return free;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    unlock方法就比较简单,先尝试释放资源,由于是可重入锁,这里需要判断重入次数是否为0,为0之后才能进行signalNext

    	private static void signalNext(Node h) {
            Node s;
            if (h != null && (s = h.next) != null && s.status != 0) {
                s.getAndUnsetStatus(WAITING);
                LockSupport.unpark(s.waiter);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    redis伪集群搭建
    数据校验(初级篇)
    【思维构造】Sequence Master—CF1806C
    yolo训练时遇到GBK编码问题
    手工测试如何转向自动化测试?字节5年自动化经验浅谈一下...
    毕业工作还没2年,跳到下一个公司就30K了,好家伙···
    Vue vue.config.js 的详解与配置
    Linux CentOS 8(MariaDB的数据类型)
    微服务网关选型
    解决 vite 4 开发环境和生产环境打包后空白、配置axios跨域、nginx代理本地后端接口问题
  • 原文地址:https://blog.csdn.net/nyzzht123/article/details/132825165