• aqs源码分析


    AQS -AbstractQueuedSynchronizer

    在这里插入图片描述

    常用的Lock、线程池Worker,都是由AQS来实现的

    原理

    我们加锁的原理,实际上就是通过对公共资源加锁来实现的,aqs亦是如此

    在这里插入图片描述

    aqs的核心思想,即一个CLH队列锁,将暂时获取不到所得线程加入到队列中等待

    state使用int值表示资源的同步状态

    使用内部类Node来实现FIFO队列,先进先出,只有前驱节点是head节点的节点才能被首先唤醒去进行同步状态的获取。当该节点获取到同步状态时,它会清除自己的值,将自己作为head节点,以便唤醒下一个节点。

    AQS的资源共享方式:Exclusive(排他模式),Share(共享模式)

    在构建自定义同步器时,只需要依赖AQS底层再实现资源state的获取与释放操作即可。自定义同步器实现时主要实现以下几种方法:

    isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
    tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
    tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
    tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false

    独占模式下的AQS

    获取资源(加锁)

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    tryAcquire 先尝试获取资源(加锁),由子类实现(如排他锁和共享锁获取方式不同)

    如果获取资源失败

    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

      private Node addWaiter(Node mode) {
        // 将当前线程包装成Node
            Node node = new Node(mode);
    
            for (;;) {
              // 临时变量保存旧的尾节点
                Node oldTail = tail;
                if (oldTail != null) {
                  // 将当前节点的前驱节点设置为旧尾节点,让当前节点成为新的尾节点
                    node.setPrevRelaxed(oldTail);
                  // CAS方式使其生效
                    if (compareAndSetTail(oldTail, node)) {
                      // 再将旧的尾节点的后驱节点设置为当前新的尾节点
                        oldTail.next = node;
                        return node;
                    }
                } else {
                  // 不存在尾节点就初始化队列
                    initializeSyncQueue();
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    // 初始化队列可以看出队列的head头节点和tail尾节点都是当前线程节点
    private final void initializeSyncQueue() {
        Node h;
        if (HEAD.compareAndSet(this, null, (h = new Node())))
            tail = h;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    final boolean acquireQueued(final Node node, int arg) {
            boolean interrupted = false;
            try {
                for (;;) {
                  // 获取当前线程的前驱节点
                    final Node p = node.predecessor();
                  // 只有前驱节点为head时,才会尝试去获取资源(锁)
                    if (p == head && tryAcquire(arg)) {
                      // 获取到锁即将自己设置为head头节点
                        setHead(node);
                      // 将旧的头节点的next引用断开,方便gc回收旧head节点
                        p.next = null; // help GC
                        return interrupted;
                    }
                  // 获取资源失败后是否需要阻塞
                    if (shouldParkAfterFailedAcquire(p, node))
                      // 如果需要阻塞则阻塞
                        interrupted |= parkAndCheckInterrupt();
                  
                  // 无需阻塞则自旋继续获取资源
                  // 阻塞后被唤醒则继续获取资源
                }
            } catch (Throwable t) {
                cancelAcquire(node);
                if (interrupted)
                    selfInterrupt();
                throw t;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * 如果前驱节点为SIGNAL状态,则当前节点需要阻塞
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * 如果前驱节点取消了,则将取消的节点剔除掉,并且不阻塞然后重新获取一次锁
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * 其他状态,包括Node创建出的默认状态0或者PROPAGATE,将前驱节点设置为SIGNAL状态,并且不阻塞,重新自旋获取一次锁,这里说明,如果前驱节点是默认状态0,则由后一个节点将前驱节点设置为SIGNAL状态,那么SIGNAL状态的含义为:SIGNAL状态的后续节点都会被阻塞
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    到这里,我们总结一下waitStatus

    在这里插入图片描述

    CANCELLED
    waitStatus值为1时表示该线程节点已释放(超时、中断),已取消的节点不会再阻塞。并且会在后驱节点的线程中移除队列

    SIGNAL
    waitStatus为-1时表示该线程的后续线程需要阻塞,即只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程

    PROPAGATE
    waitStatus为-3时,表示该线程以及后续线程进行无条件传播(CountDownLatch中有使用)共享模式下, PROPAGATE 状态的线程处于可运行状态

    默认值
    waitStatus为默认值0时,表示没有后续节点,因为后续节点判断是否阻塞时,会判断前驱节点的状态,并修改前驱节点的waitStatus

    释放资源(解锁)

    release

     public final boolean release(int arg) {
       // 尝试释放资源,由子类实现
            if (tryRelease(arg)) {
                Node h = head;
              // 释放成功,如果head头节点存在,并且状态不是0,表示还有后续节点
                if (h != null && h.waitStatus != 0)
                  // 唤醒后驱节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    uparkSuccessor

     private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
       // 如果当前节点是非取消状态,则将状态归零
            if (ws < 0)
                node.compareAndSetWaitStatus(ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
       // 拿到后驱节点,如果后驱节点不存在或已取消,则从尾部遍历寻找最早进入到队列的非取消状态的线程节点
       // 至于为什么是从后往前遍历,可以注意一下刚才的addWaiter方法,Node(临时名b)入队的时候,是先设置前驱节点(b的前驱为a),在通过cas来设置成尾节点,如果成为尾节点后(此时b是尾),前驱节点(前驱节点是a)的next还没有指向尾节点(b)而指向null,并且这时前驱节点waitStatus=1为取消状态,那么通过next来从头向尾遍历,就无法拿到正确的next节点
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node p = tail; p != node && p != null; p = p.prev)
                    if (p.waitStatus <= 0)
                        s = p;
            }
            if (s != null)
              // 后驱节点存在,则唤醒后驱节点
                LockSupport.unpark(s.thread);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    共享模式下的AQS

    获取资源(加锁)

    acquireShared

    // arg表示要加锁的个数
    public final void acquireShared(int arg) {
      			// 尝试加锁, 返回值见下图, 需要子类实现
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    • 返回值 > 0,表示还有剩余资源,其他线程可以继续获取共享资源
    • 返回值 = 0,表示资源获取成功,同时没有更多可用共享资源,后续线程需要到同步队列中
    • 返回值 < 0,表示资源获取失败

    doAcquireShared

    private void doAcquireShared(int arg) {
            // 将当前线程以共享的模式加入到同步队列中
            final Node node = addWaiter(Node.SHARED);
            boolean interrupted = false;
            try {
                for (;;) {
                  	// 获取前驱节点
                    final Node p = node.predecessor();
                    if (p == head) {
                      	// 如果前驱节点为head头节点则尝试获取资源
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                          	// 获取成功将当前节点设置为头节点, 详情见下文
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node))
                        interrupted |= parkAndCheckInterrupt();
                }
            } catch (Throwable t) {
                cancelAcquire(node);
                throw t;
            } finally {
                if (interrupted)
                    selfInterrupt();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    setHeadAndPropagate

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
      // 如果获取资源后,还有剩余资源
      // 如果旧的头节点为空或waitStatus < 0
      // 如果新的头节点为空或waitStatus < 0
      // 上述3种情况,就唤醒下一个节点
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
              // 唤醒排队的线程
                doReleaseShared();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    释放资源(解锁)

    public final boolean releaseShared(int arg) {
      // 释放资源,由子类实现
        if (tryReleaseShared(arg)) {
          // 唤醒其他线程
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    有待研究

    LockSupport的park和unpark

    Condition

  • 相关阅读:
    Mysql集群高可用架构MHA
    flink 复postgresql数据库数据
    如何提升网络安全应急响应与事件处置能力
    picGo图床搭建gitee和smms(建议使用)
    为什么选择clickhouse?
    【实践篇】DDD脚手架及编码规范
    重磅!华秋电子再次入选“中国产业数字化百强榜”
    Java中main方法是单线程还是多线程?启动后有多少个线程会被创建?
    微信小程序 request合法域名 中设置WebSocket的wss/ws报错 该域名协议头非法 问题解决
    【QT】QT自定义C++类
  • 原文地址:https://blog.csdn.net/qq1010830256/article/details/126457633