• AQS实现原理


    在java.util.concurrent包中,我们经常会使用ReentrantLock,CyclicBarrier等工具类,但是我们往往对其内部的实现原理却并不知晓。

    本篇文章主要对上述工具类的核心实现AQS进行剖析,分析原理可以让我们学习到大神的代码设计思维。

    文章将从一下几个方面分析:

    1.AQS是什么?

    AbstractQueuedSynchronizer类就是我们通常说的AQS,抽象的队列同步器。

    其实我们学过操作系统原理都知道,所谓的同步,指的是多线程场景下通过某种机制,保证某段代码执行是线程独享的,

    我们把这段代码叫同步块,而把这种机制叫同步。

    在JAVA中,传统的方式是使用synchronized关键字来实现同步,那么其底层是基于C++实现的ObjectMonitor。

    今天我们讨论的AQS是JDK1.5之后,提供的一个能实现同步功能的抽象类。

    通过该类的注释,我们可以了解到其内部采用了FIFO队列的数据结构来实现,互斥场景的资源申请和释放的实现如下所示:

    复制代码
       Acquire:
           while (!tryAcquire(arg)) {
              enqueue thread if it is not already queued;
              possibly block current thread;
           }
      
       Release:
           if (tryRelease(arg))
              unblock the first queued thread;
    复制代码

    而tryAquire()和tryRelease()方法都是需要子类去实现的。

    换句话说,如果要使用AQS,那么只需要继承,然后实现如下方法来自定义资源获取和释放的逻辑就行了。

    FIFO队列的节点使用内部类Node来描述:

    复制代码
        static final class Node {
    
            // 节点状态
            volatile int waitStatus;
    
            // 上一个节点
            volatile Node prev;
    
            // 下一节点
            volatile Node next;
    
            // 需要排队的线程
            volatile Thread thread;
        }
    复制代码

    其他代码我先省略。

    复制代码
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        // 双向链表头节点
        private transient volatile Node head;
    
        // 双向链表为节点
        private transient volatile Node tail;
    
        // 资源
        private volatile int state;
    }
    复制代码

     

    2.申请互斥资源的源码分析。

    2.1 分析acquire()方法的逻辑

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    tryAcuire()方法可以理解成获取资源,由子类实现,我们不关注。

    我把代码写成下面这样,可能好理解一些:

    复制代码
    public final void acquire(int arg) {
        if(tryAcquire(arg)) {
            return;
        }
        Node n = addWaiter(Node.EXCLUSIVE);
        if(acquireQueued(n, arg))
            selfInterrupt();
    }
    复制代码

     

    复制代码
    public final void acquire(int arg) {
        // 如果获取资源成功,直接返回
        if(tryAcquire(arg)) {
            return true;
        }
        // 未获取到资源,将自己封装成一个node添加到队列尾部
        Node n = addWaiter(Node.EXCLUSIVE);
        // 有点复杂,下面慢慢分析
        if(acquireQueued(n, arg)) {
            // 自我中断 先不关注。
            selfInterrupt();
        }
    }
    复制代码

    起码,我们应该知道一点,该方法一旦返回了,那么就意味着可以进入同步块代码执行了。

    2.2 分析addWaiter()方法

    复制代码
    private Node addWaiter(Node mode) {
        // 以互斥模式创建一个node,waitStatus是0
        Node node = new Node(Thread.currentThread(), mode);
        // 其实该段代码跟下面的enq方法差不多
        Node pred = tail;
        // 如果该队列已经有为节点
        if (pred != null) {
            // 当前node上一个node指向为尾节点
            node.prev = pred;
            // cas修改尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 死循环保证添加成功
        enq(node);
        return node;
    }
    
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // 如果当前队列没有节点,创建一个虚拟节点作为头和尾,该节点的thread == null, waitStatus是0
                // 通过cas操作保证只有一个线程修改成功
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    复制代码

    其实上述方法就是保证了并发情况下node一定能正确加入到node中,而且如果是空链表,会增加一个虚拟的head节点

    图解一下:

    (创建head虚拟节点)

     

     将新节点指向tail

     通过cas操作修改tail指向新节点

     如果修改成功,将修改前的tail的next指向新节点

     

    2.3 分析aquireQueued方法

    复制代码
    // 看这个方法的时候我建议不要关注其中的临时变量
    // 我们只要知道,这个方法里面有个死循环,不管怎样,只有return了,才能执行业务中定义的同步代码块。
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 唯一的返回点
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    // 我把代码改写成下面这样 可能会好理解一些
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 只有前继节点是head的时候具有获取资源的资格,如果获取成功则直接将当前node设置成head
                if (p == head && tryAcquire(arg)) {
                    // 可能在排队后未阻塞前执行,也可能在阻塞被唤醒后执行
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 唯一的返回点
                    return interrupted;
                }
                // 判断是否该阻塞自己
                if (!shouldParkAfterFailedAcquire(p, node)){
                   continue;
                }
                // 进行阻塞,并且在被唤醒之后返回线程的中断状态
                if(!parkAndCheckInterrupt()) {
                    continue;
                }
                interrupted = true;
            }
        } finally {
            // 先别关注。
            if (failed)
                cancelAcquire(node);
        }
    }
    复制代码

    总之,一旦该方法返回了,就意味着线程获取资源成功了。

    下面图解获取资源成功后,做的修改:

     

     

     

     当然这里的waitStatus不一定是0。

    2.4 分析shouldParkAfterFailedAcquire()方法

    复制代码
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 如果上一节点状态是-1,直接返回true
        if (ws == Node.SIGNAL)
            // 返回,并触发外层阻塞线程
            return true;
        if (ws > 0) {
            // 如果上一个节点状态大于0,其实就是1表示被取消了
            // 往前找waitStatus正常的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // cas修改上一个节点为状态为-1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 返回进入下一个循环
        return false;
    }
    复制代码

    其实逻辑很简单,就是修改自己的prev指向往前寻找的有效节点,并在阻塞前,将prev指向的有效节点waitStatus设置为-1。

    问题来了?这个代码没有并发问题吗?

    do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;

    答案是没有并发问题,因为每个节点都是往前寻找,理论上讲每个线程遍历的节点不一样。

    我们来图解一下这个循环修改的过程:

    假设线程A(waitStatus = 0),线程B(waitStatus = 1),线程C(waitStatus = 0 为当前线程)

     下图标识了循环往前探测并修改引用关系

     下图就是修改了pred节点的waitStatus = -1 (下一次循环到来的时候)

     

    此时中间线程B的节点其实没有引用再指向他了。

    如果成功修改线程A的waitStatus为-1之后,下一次循环到来,该方法就返回了true,线程就阻塞了。

     其实看到这里,我们可以将虚拟的head节点就当做是正在使用资源的线程表示(个人观点哈)

     互斥资源申请的源码分析就结束了。

    做一个小总结:

    复制代码
    public final void acquire(int arg) {
        // 如果争抢资源成功直接返回直接业务的同步代码块
        if (tryAcquire(arg)) {
            return;
        }
        // 循环+cas保证向队列中添加当前线程的node成功
        Node n = addWaiter(Node.EXCLUSIVE);
        // 死循环,争抢资源,或者阻塞,或者唤醒之后继续争抢资源,直到抢资源成功后返回。
        if(acquireQueued(n, arg)) {
            selfInterrupt();
        }
        // 未被中断的抢到资源
    }
    复制代码

    3.释放互斥资源的源码分析。

    我们直接分析release()方法:

    复制代码
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            // 一旦释放资源,此处就需要考虑并发问题了
            Node h = head;
            // head节点是虚拟节点,不可能是取消状态所以这里的判断可以理解为
            // 头结点不为空而且头结点的waitStatus = -1执行unparkSuccessor方法
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            // h == null 或者 h.waitStatus = 0说明没有后继节点需要唤醒,
            // 如果此时正好head后面一个node正在试图修改head的状态改成-1是能改成功的
            // 但是由于之前分析的acquireQueued方法是一个死循环,哪怕head被修改成-1,
            // 但是由于该循环会先抢锁所以也就不存在线程改了状态会park的问题。
            // h == null说明并无线程参与竞争
            return true;
        }
        return false;
    }
    复制代码

     

    其实关键就是unparkSuccessor()方法

    复制代码
    private void unparkSuccessor(Node node) {
        // 进入该方法说明资源已经被释放了。
        int ws = node.waitStatus;
        // 如果ws小于0,修改为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 传入node就是head
        // s指向head下一节点
        Node s = node.next;
        // 如果没有后继节点说明有线程已经抢到资源
        // 如果后继节点被取消了
        if (s == null || s.waitStatus > 0) {
            // 假设没有节点需要唤醒
            s = null;
            // 从后往前找,找到距离head最近的节点唤醒
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    复制代码

    其实执行该方法的时候是存在并发的情况的。

    我理解这个地方从后往前找是为啥呢?能不能从前往后找?

     

    这个问题,我也不知道。。。。

     

  • 相关阅读:
    「PaddleOCR」 模型应用优化流程
    【slam十四讲第二版】【课后习题】【第十一讲~回环检测】
    十一、流程控制
    精品基于Uniapp+SSM实现的Android平台的健康管理系统
    【Android笔记03】Android基本的UI控件(TextView、Button、EditText、ImageView、ProgressBar)
    学习Bootstrap 5的第十四天
    SAP UI5 sap.ui.core.Element 的概要介绍
    工业企业网络推广解决方案 | 网络营销专家分享 | 上海添力
    JavaScript学习笔记——对象
    美格智能Cat.1蜂窝IPC解决方案 助力安防监控智慧连接
  • 原文地址:https://www.cnblogs.com/999-ganmaoling/p/17538966.html