• AbstractQueuedSynchronizer之AQS


    1. AQS源码分析前置知识储备

    AQS在juc中的地位特别重要。

    公平锁和非公平锁
    可重入锁
    自旋思想
    LockSupport
    数据结构之双向链表
    设计模式之模板设计模式

    1.1 AQS入门级别理论知识

    AQS的字面翻译:
    抽象的队列同步器:是用来实现锁或者其它同步器组件的公共基础部分的抽象实现,是重量级基础框架及整个JUC体系的基石,主要用于解决锁分配给"谁"的问题
    整体就是一个抽象的FIFO队列来完成资源获取线程的排队工作
    依靠单个原子int值来表示持有锁的状态,通过占用和释放方法,改变状态值
    在这里插入图片描述

    1.2 AQS为什么是JUC内容中最重要的基石

    在这里插入图片描述锁,面向锁的使用者:定义了程序员和锁交互的使用层API,隐藏了实现细节,你调用即可。
    同步器,面向锁的实现者。

    Java并发大神DqugLee,提出统一规范并简化了锁的实现,将其抽象出来。屏蔽了同步状态管理、同步队列的管理和维护、阻塞线程排队和通知、唤醒机制等,是一切锁和同步组件实现的。

    加锁会导致阻塞。有阻塞就需要排队,实现排队必然需要队列

    抢到资源的线程直接使用处理业务,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。

    既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

    如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS同步队列的抽象表现。它将要请求共享资源的线程及自身的等待状态封装成队列的结点对象(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。

    在这里插入图片描述

    1.3 AQS同步队列的基本结构

    在这里插入图片描述内部类Node(Node类在AQS类内部)
    Node的int变量
    Node的等待状态waitState成员变量:volatile int waitStatus;
    等候区其它顾客(其它线程)的等待状态。队列中每个排队的个体就是一个Node

    Node此类的讲解
    在这里插入图片描述

    1.4 AQS小总结

    有阻塞就需要排队,实现排队必然需要队列
    state变量+CLH双端队列

    AQS的int变量
    AQS的同步状态State成员变量 private volatile int state;
    类比:银行办理业务的受理窗口状态
    零就是没人,自由状态可以办理;大于等于1,有人占用窗口,等着去

    AQS的CLH队列
    CLH队列(三个大牛的名字组成),为一个双向队列
    通过自旋等待
    state变量判断是否阻塞
    从尾部入队,从头部出队

    2. AQS源码深度讲解和分析

    2.1 ReentrantLock
    2.1.1 ReentrantLock基础架构

    Lock接口的实现类,基本都是通过【聚合】了一个【队列同步器】的子类完成线程访问控制的
    ReentrantLock的原理,从我们的ReentrantLock开始解读AQS源码
    在这里插入图片描述ReentrantLock与AQS的关系
    在这里插入图片描述
    在这里插入图片描述
    ReentrantLock经典代码
    默认是非公平锁,构造形参传入true代表公平锁。

    public class AQSTest {
      public static void main(String[] args) {
          Lock lock = new ReentrantLock();
          lock.lock();
          try {
    
          }finally {
              lock.unlock();
          }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    2.1.2 ReentrantLock的lock()方法

    以非公平锁ReentrantLock()为例作为突破走起,方法lock()
    在这里插入图片描述源码解读

        public void lock() {
            sync.lock();
        }
    
    • 1
    • 2
    • 3

    sync的locak是一个抽象方法,子类有两个实现,FairSync和NonfairSync
    在这里插入图片描述
    在这里插入图片描述非公平锁仅仅比公平锁多了一个if判断。

    if (compareAndSetState(0, 1))
         setExclusiveOwnerThread(Thread.currentThread());
    
    • 1
    • 2

    compareAndSetState是调用AQS类的compareAndSetState,底层是调用Unsafe类的CAS方法
    在这里插入图片描述如果当前锁没有被占用,即state还是0,那就把state设为1,并且调用setExclusiveOwnerThread方法,该方法最终调用了AOS的setExclusiveOwnerThread方法,设置该线程拥有当前独占访问权。
    在这里插入图片描述如果锁已经被占用了,那就进入acquire方法了。此时公平锁与非公平锁代码相同。acquire方法在AQS中,主要有三个流向
    在这里插入图片描述

    2.2 acquire()方法

    以非公平锁作为案例突破

    acquire方法三步骤:
    尝试加锁;
    加锁失败,线程入队列;
    线程入队列后,进入阻塞状态。

    在这里插入图片描述
    在这里插入图片描述

    2.2.1 tryAcquire()方法

    tryAcquire是AQS类的方法,有四个实现类重写了该方法。
    在这里插入图片描述从最简单的lock方法开始看看公平和非公平
    在这里插入图片描述
    可以明显看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:
    hasQueuedPredecessors(),它是公平锁加锁时判断等待队列中是否存在有效节点的方法。
    在这里插入图片描述查询是否有其他线程已经等待了更长时间来获取锁

    hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:

    公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;

    非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程苏醒后,不一定就是排头的这个线程获得锁,它还是需要参加竞争锁(存在线程竞争的情况下),后来的线程可能不讲武德插队夺锁了。

    队列里面先进先出(公平),队列外面不讲武德(非公平)

    hasQueuedPredecessors()方法返回true,那么tryAcquire方法就会返回false,然后才会有后面的addWaiter和acquireQueued

    主线是非公平锁,公平锁只是连带着讲一下,现在看回非公平锁。非公平锁的tryAcquire调用的是nonfairTryAcquire方法

    nonfairTryAcquire()
    在这里插入图片描述nonfairTryAcquire中主要做了两个判断:
    state是否为0?这块代码看起来很熟悉,上文已经分析过了,非公平锁刚进来的时候已经判断过一次了,这里进行再次判断是为了防止这个过程中锁已经被释放了。如果这里执行成功,那么后面的两大流程就不用走了,成功获取到锁。
    当前线程是否是持有锁的线程?这里主要是为了重入锁做判断的。ReentrantLock是重入锁。c + acquires就代表重入的次数,acquires是1,c是当前锁重入的次数。
    返回false才会执行下面两大流程。

    2.2.2 addWaiter()方法
    /**
    * Creates and enqueues node for current thread and given mode.
    *
    * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
    * @return the new node
    */
    private Node addWaiter(Node mode) {
         Node node = new Node(Thread.currentThread(), mode);
         // Try the fast path of enq; backup to full enq on failure
         Node pred = tail;
         if (pred != null) {
             node.prev = pred;
             if (compareAndSetTail(pred, node)) {
                 pred.next = node;
                 return node;
             }
         }
         enq(node);
         return node;
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    Node.EXCLUSIVE表示独占,Node.SHARED表示共享。非公平锁和公平锁都是独占锁。
    以当前线程和mode(Node.EXCLUSIVE)创建一个Node
    定义Node节点pred,并把尾结点赋值给pred

    线程B进来时队列还未初始化,此时tail是null,即走enq(node)方法,这个只有尾结点等于null才会走该方法
    线程C进来时pred是B节点,不为null,然后把C节点的前驱节点设为B,并把C设为尾结点,同时把B的后继节点设为C

    addWaiter方法返回的是以当前线程构造的节点

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    使用for (;;)开启死循环,第一次循环时,tail还是null,新建一个空节点并通过cas把它设为头结点,尾结点也指向它。退出第一次循环
    第二次循环时,tail指向了刚才创建的新节点记为n(new Node()),不等于null了,则将以B线程基础构造的node节点的前驱指针指向t,此时t等于tail,tail又指向了n,即将node挂在了新的n后面,node的前驱节点是n,然后通过cas操作将node作为尾结点,并将n的后继节点设为node,然后返回头结点n。

    在这里插入图片描述acquireQueued
    假如再抢抢失败就会进入
    shouldParkAfterFailedAcquire和 parkAndChecklnterrupt方法中

    shouldParkAfterFailedAcquire
    在这里插入图片描述如果前驱节点的 waitStatus是 SIGNAL状态,即 shouldParkAfterFailedAcquire方法会返回 true。程序会继续向下执行parkAndCheckInterrupt方法,用于将当前线程挂起

    parkAndCheckInterrupt
    在这里插入图片描述

    2.2.3 acquireQueued()方法
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    定义两个标志位failed 和interrupted,当前是B线程,开启第一次循环,然后获取B节点的前驱节点n

    final Node predecessor() throws NullPointerException {
       Node p = prev;
       if (p == null)
           throw new NullPointerException();
       else
           return p;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    n是头结点但是tryAcquire尝试获取锁一般会失败,然后执行shouldParkAfterFailedAcquire方法
    头结点n的waitStatus默认值为0,通过cas方法将头结点的waitStatus设置为1,并返回false,不会执行parkAndCheckInterrupt

    开启第二次循环,头节点还是n,tryAcquire尝试获取锁依旧失败,然后执行shouldParkAfterFailedAcquire方法,此时头结点n的waitStatus是1,返回true,继续执行parkAndCheckInterrupt方法,B线程被挂起。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 该节点已设置状态,要求释放信号,以便安全停车。
            return true;
        if (ws > 0) {
            // 前置任务已取消。跳过前置任务并指示重试
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // waitStatus必须为0或PROPAGATE。表明我们需要信号,但不要停车。来电者需要重试以确保在停车前无法获取
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    • 1
    • 2
    • 3
    • 4

    然后C线程进来,它的前驱节点为B,执行两次shouldParkAfterFailedAcquire方法终于返回true,然后继续执行parkAndCheckInterrupt方法,C线程被挂起。

    2.3 unlock()方法源码

    在这里插入图片描述unlock()方法分析

    public void unlock() {
        sync.release(1);
    }
    
    • 1
    • 2
    • 3
    2.3.1 release()方法

    unlock底层调用的release方法

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    2.3.2 tryRelease()方法

    tryRelease是AQS类的方法,实现类方法主要有三个。
    tryRelease方法若抛出异常,则说明线程释放锁失败,返回false
    tryRelease方法返回true,则继续执行。定义节点h,令h等于头结点,如果h不等于null且h的waitStatus不等于0,说明有线程在等待唤醒。执行
    在这里插入图片描述在这里插入图片描述释放锁的时候没有区分公平锁和非公平锁,因为锁只有一把,不存在公平与否之说

    protected final boolean tryRelease(int releases) {
       int c = getState() - releases;
       if (Thread.currentThread() != getExclusiveOwnerThread())
           throw new IllegalMonitorStateException();
       boolean free = false;
       if (c == 0) {
           free = true;
           setExclusiveOwnerThread(null);
       }
       setState(c);
       return free;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    因为当前锁被持有了,所以state一定是1。千万不要把state和waitStatus搞混了。
    c=1-1=0。如果当前线程和持有锁的线程不同,则抛出异常。
    定义锁空闲等于false,如果c等于0,将锁空闲改为true,将当前持有锁的线程设置为null,并把state置为0,返回true。

    2.3.3 unparkSuccessor()方法
    private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    拿到头结点的waitStatus,因为此时等待队列有线程在等待,所以头结点的waitStatus一定不等于0,用cas将头结点的waitStatus置为0。然后获取头结点的后继节点s,如果等于null或者s的waitStatus(s已取消获取锁的等待)。
    如果不为null,唤醒后继节点B。

    唤醒节点B,镜头切回到acquireQueued的parkAndCheckInterrupt方法中,之前线程B就是在这里被挂起的,哪里跌到,就在哪里爬起。
    返回return Thread.interrupted();因为线程B从未中断过,所以返回false。

    复习一下interrupted()方法和isInterrupted()的区别: interrupted()执行后具有清除状态标志值为false的功能,但isInterrupted()不会。
    acquireQueued方法继续讲解,因为返回false,所以继续循环,获取B节点前驱节点p,p就是头结点,然后执行tryAcquire尝试获取锁,因为
    刚才已经把锁释放了,所以state等于0,tryAcquire能够获取锁成功。执行setHead方法。将头结点p的后继节点置为null,此时已经没有节点指向p,p已经成了无根之木,垃圾对象,随时等待命运的铁扫帚将自己扫入历史的垃圾堆。将failed 置为false,则不用执行finally中的cancelAcquire方法了。然后acquireQueued方法返回false,也不用执行acquire方法中的selfInterrupt方法了。

    final boolean acquireQueued(final Node node, int arg) {
     boolean failed = true;
     try {
         boolean interrupted = false;
         for (;;) {
             final Node p = node.predecessor();
             if (p == head && tryAcquire(arg)) {
                 setHead(node);
                 p.next = null; // help GC
                 failed = false;
                 return interrupted;
             }
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 interrupted = true;
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    将B节点设为头结点,将B节点持有线程改为null,把B节点的前驱节点置为null

    private void setHead(Node node) {
         head = node;
         node.thread = null;
         node.prev = null;
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述
    注:本文是学习B站周阳老师《尚硅谷2022版JUC并发编程》课程所做学习笔记。

  • 相关阅读:
    如何测量电源的噪声(有视频)
    python-opencv 培训课程笔记(2)
    ctfshow 命令执行(40-50)
    【深度学习 论文篇 01-1 】AlexNet论文翻译
    连接到 GBase LDAP 服务器
    虚拟知识图谱系统Ontop使用全流程
    9、IOC 之基于注释的容器配置
    文心一言 VS 讯飞星火 VS chatgpt (113)-- 算法导论10.2 5题
    eNSP启动路由器一直出#号、以为是安装配置winpcap的问题。。。。(以为是win10安装winpcap失败的问题。。。)
    Chapter15 : Artificial Intelligence in Compound Design
  • 原文地址:https://blog.csdn.net/qq_44300280/article/details/127624306