• 【JAVA多线程】AQS,JAVA并发包的核心


    目录

    1.概述

    1.1.什么是AQS

    1.2.AQS和BlockQueue的区别

    1.3.AQS的结构

    2.源码分析

    2.1.CLH队列

    2.2.模板方法的实现

    2.2.1.独占模式

    1.获取资源

    2.释放资源

    2.2.2.共享模式


    1.概述

    1.1.什么是AQS

    AQS非常非常重要,可以说是JAVA并发包(java.util.concurrent)的核心。

    以下是一个常见的多线程场景:

    当多个线程对同一个资源进行争抢时,没有抢到的线程怎么办?直接拒绝然后丢弃?最可行最合适的方式当然是让没有抢到资源的线程排队阻塞起来,这样即让出了CPU的资源,又方便当资源被释放、可争抢的时候唤醒线程来继续争抢。

    我们发现一个信号量+一个线程安全的用来存放线程的队列,是并发编程体系的一个底座,这个底座有一个名字叫——同步器。

    AQS(AbstractQueuedSynchronizer),一个JDK中提供的同步器,提供一系列线程的控制和同步机制,是JDK中诸多需要进行线程控制和同步的地方,诸如可重入锁ReentrantLock,以及诸多同步工具Semaphore、CountDownLatch、CyclicBarrier等,其底层都是使用的AQS来实现线程控制和同步。

    1.2.AQS和BlockQueue的区别

    阻塞队列(Blocking Queue)是一种特殊的队列,它的特殊之处在于它具有阻塞特性。当队列为空时,从队列中获取元素的线程会被阻塞,直到队列中有新的元素被加入;同样地,当队列满时,试图向队列中添加元素的线程也会被阻塞,直到队列中有空余的空间。这种特性使得阻塞队列非常适用于生产者-消费者模式中。

    阻塞队列:

    • 元素的读写是线程安全的

    • 里面存的任务

    • 阻塞的是外部的生产者线程/消费者线程

    AQS,同步器,是用来进行线程控制和同步的一个基础组件,用来让现场排队、阻塞、唤醒等操作。

    AQS:

    • 元素的读写是安全的

    • 里面存的是线程

    • 阻塞的是内部存的线程

    1.3.AQS的结构

    AQS提供了什么?

    一套API和一套数据结构。

    可以看到AQS是一个抽象类:

    既然是抽象类而不是直接是个接口就说明,其提供的不只是一个标准,而是一套资源。其底层用链表来组织其管理的线程,包含:

    • 链表的节点Node

    • 链表的头尾指针

    • 状态state,这是个volatile变量用于表示同步状态,在不同的实现类中,它的含义不同,有可能表示锁是否被持有、持有者持有锁的次数、信号量可用的许可数量等。

    • 条件变量ConditionObject

    AQS提供了一套标准的API,来完成两方便的操作:

    • 状态控制

    • 资源争抢

    状态控制:

    • getState():返回同步状态

    • setState(int newState):设置同步状态

    • compareAndSetState(int expect, int update):使用CAS设置同步状态

    • isHeldExclusively():当前线程是否持有资源 独占资源(不响应线程中断)

    资源争抢和释放一共两种,共享模式、独占模式:

    • tryAcquire(int arg):独占式获取资源,子类实现

    • acquire(int arg):独占式获取资源模

    • tryRelease(int arg):独占式释放资源,子类实现

    • release(int arg):独占式释放资源模板 共享资源(不响应线程中断)

    • tryAcquireShared(int arg):共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现

    • acquireShared(int arg):共享式获取资源模板

    • tryReleaseShared(int arg):共享式释放资源,子类实现

    • releaseShared(int arg):共享式释放资源模板

    独占模式出现在对一把锁进行争抢、释放的场景中,如ReentrantLock中;共享模式出现在对多个资源的争抢、释放的场景中,如ReadWriteLock、CountDownLatch中。

    2.源码分析

    2.1.CLH队列

    在AQS的源码开头,就描述了其是使用CLH队列来组织没争抢到资源的线程的:

    该队列是用链表实现的,没被争抢到资源的线程会被封装成Node,被挂入链表中。node的入队和出队都用的CAS(自旋锁)来保证效率和安全的兼得。

    这个Node类源码很简单,主要是要注意Node是有状态的,用状态来控制该节点该不该被唤醒,用waitStatus来表示状态:

    Node 类中的静态常量定义了以下状态:

    • SIGNAL (-1):表示当前节点的后续节点需要被唤醒。这是默认状态,当一个节点被添加到队列中时,它的 waitStatus 会被设置为 -1。

    • PROPAGATE (-3):类似于 SIGNAL,但在某些情况下,如释放共享模式下的锁时,可能会设置为 -3,以帮助传播唤醒信

    • CONDITION (-2):表示节点位于条件队列中,而不是同步队列中。这通常与 Condition 对象相关联。

    • 0:表示没有特别的状态。当节点被初始化时,它的 waitStatus 默认为 0。

    • CANCELLED (1):表示节点已经被取消。这通常发生在线程被中断或者尝试获取锁失败时。一旦节点被标记为 CANCELLED,它就会从队列中移除。

    waitStatus 的使用:

    • 线程阻塞:当一个线程被插入到队列中时,它的 waitStatus 通常会被设置为 SIGNAL。这意味着当前节点的后续节点应该被唤醒,当当前节点释放时。

    • 线程唤醒:当一个线程释放锁或其他同步状态时,它会检查自己的 waitStatus,并根据情况唤醒后继节点。例如,在独占模式下,当线程释放锁时,它会检查其后继节点的 waitStatus,如果是 SIGNAL,则会唤醒该节点对应的线程。

    • 条件队列:当线程等待某个条件满足时,它会被移到条件队列中,并且其 waitStatus 会被设置为 CONDITION。当条件满足时,线程会被放回同步队列,并且 waitStatus 会被设置为 SIGNAL。

    • 取消节点:如果线程被中断或超时,或者由于其他原因无法继续等待,那么该节点会被标记为 CANCELLED,并且从队列中移除。

    整个CLH队列可以理解为:

    2.2.模板方法的实现

    2.2.1.独占模式

    1.获取资源

    AQS的实现类很多,此处我们以ReentrantLock的FairSync(公平锁)为例。

    lock()其实是去,调用AQS的acquire():

    AQS的acquire()会去调用实现类的tryAcquire()来判断当前线程是否能抢到线程,如果抢不到就入队CLH,并且阻塞:

    至此我们就能体会到了,AQS实现了核心的阻塞+入队以及唤醒+出队的一些列对CLH中线程的操作,但具体的争抢策略作为模板开放出去了,也就是上文我们说的一系列模板方法。

    我们来看看FairSync的tryAcquire()实现:

    1. protected final boolean tryAcquire(int acquires) {
    2.           final Thread current = Thread.currentThread();
    3.           int c = getState();//获取锁的状态
    4.           if (c == 0) {//如果锁没有被持有
    5.           //就去判断CLH队列中还有没有线程,没有的话就去CAS获取锁
    6.               if (!hasQueuedPredecessors() &&
    7.                   compareAndSetState(0, acquires)) {
    8.                   setExclusiveOwnerThread(current);
    9.                   return true;
    10.               }
    11.           }
    12.           //如果当前线程是锁的持有者就去增加锁的持有次数
    13.           else if (current == getExclusiveOwnerThread()) {
    14.               int nextc = c + acquires;
    15.               if (nextc < 0)
    16.                   throw new Error("Maximum lock count exceeded");
    17.               setState(nextc);
    18.               return true;
    19.           }
    20.           return false;
    21.       }
    2.释放资源

    释放资源应该做些什么?这里我们想一下就能想出来:

    • 首先释放锁

    • 然后唤醒后面的线程。

    释放锁:

    可以看到ReentrantLock的unlock调用的AQS的release()去释放锁,AQS的release里会先去tryRelease()尝试释放锁,tryRelease()是交给子类重写的:

    所以调用的就是reentrantLock的释放策略:

    ReentrantLock的尝试释放的过程很简单,就是线程安全的去减少锁的持有次数+清除锁的持有者的信息:

    1. protected final boolean tryRelease(int releases) {
    2.           int c = getState() - releases;
    3.           //检查当前线程是否是锁的所有者,确保只有锁的持有者能释放锁
    4.           if (Thread.currentThread() != getExclusiveOwnerThread())
    5.               throw new IllegalMonitorStateException();
    6.           boolean free = false;
    7.           if (c == 0) {
    8.               free = true;
    9.               //清除锁的持有者
    10.               setExclusiveOwnerThread(null);
    11.           }
    12.           //刷新状态
    13.           setState(c);
    14.           return free;
    15.       }

    唤醒后面的线程:

    继续回到AQS的release,继续往下看,tryRelease()后确定可以释放锁后,unparkSuccessor()唤醒CLH队列中需要被唤醒的第一个线程。为什么说是第一个需要被唤醒的喃?前文已经说过了Node节点有状态,状态用来表示该节点是否需要被唤醒,有些状态是表示节点不需要被唤醒的。

    1. public final boolean release(int arg) {
    2.        if (tryRelease(arg)) {
    3.            Node h = head;
    4.            if (h != null && h.waitStatus != 0)
    5.                unparkSuccessor(h);//唤醒后面的需要被唤醒的线程
    6.            return true;
    7.       }
    8.        return false;
    9.   }

    unparkSuccessor()很简单,修改当前节点状态,唤醒后面需要唤醒的状态:

    1. private void unparkSuccessor(Node node) {
    2.       //将当前的节点从待唤醒状态变为已经唤醒
    3.       int ws = node.waitStatus;
    4.       if (ws < 0)
    5.           compareAndSetWaitStatus(node, ws, 0);
    6.       //找到后续第一个需要被唤醒的节点,然后唤醒它
    7.       Node s = node.next;
    8.       if (s == null || s.waitStatus > 0) {
    9.           s = null;
    10.           for (Node t = tail; t != null && t != node; t = t.prev)
    11.               if (t.waitStatus <= 0)
    12.                   s = t;
    13.       }
    14.       if (s != null)
    15.           LockSupport.unpark(s.thread);
    16.   }

    2.2.2.共享模式

    之所以有共享模式,是因为有时候资源不一定是唯一的,比如除了锁之外,可能要求能有10条线程同时工作,又或者读写之间不互斥等需求。在这种允许多条线程同时工作的情景中就要用到共享模式,标志位state不再表示一个意思(是否被持有),而是用来表示多个意思,有可能是线程的数量,也有可能是高低位分别表示是否允许读或者写。

    共享模式下,资源的获取是通过acquireShared和tryAcquireShared来实现的,释放是通过releaseShared和tryReleaseShared来实现的。具体代码实现的话其实和独占模式大差不差,只是在对标志位state的操作上略有不同,以及可能会有一些位操作。以下是一些常见的使用共享模式的场景:

    • 读写锁 (ReadWriteLock): 读写锁是一种常用的同步工具,它允许多个线程同时读取共享资源,但在写入时需要独占资源。 读锁通常使用共享模式实现,因为多个读线程可以同时访问资源。 写锁则使用独占模式,因为写入操作通常需要独占资源以避免数据不一致。

    • 信号量 (Semaphore): 信号量用于限制可以同时访问共享资源的线程数量。 它可以通过减少或增加一个计数器来管理资源的可用性,这个计数器通常就是 AQS 的 state 字段。 当线程尝试获取信号量时,它会减少 state 的值;当线程释放信号量时,它会增加 state 的值。 因为多个线程可以同时获取信号量,所以信号量通常使用共享模式。

    • 循环屏障 (CyclicBarrier): 循环屏障允许一组线程相互等待,直到所有线程都到达了一个共同的屏障点。 当最后一个线程到达屏障点时,所有线程都会被释放继续执行。 循环屏障通常使用共享模式来管理线程的等待和释放。

    • CountDownLatch: CountDownLatch 是一种同步辅助类,它允许一个或多个线程等待其他线程完成操作。 它维护了一个计数器,每当一个操作完成时,计数器就会减少。 当计数器达到零时,所有等待的线程都会被释放。 CountDownLatch 通常使用共享模式来管理计数器的减少。

    • Exchanger: Exchanger 允许两个线程交换对象。 两个线程各自提供一个对象并等待另一个线程到达交换点。 一旦两个线程都到达交换点,它们就可以交换对象并继续执行。 Exchanger 通常使用共享模式来管理线程的交换过程。 示例代码

    由于这是一个系列文章,后面几篇文章我们就将聊到CountDownLatch、Semaphore、CyclicBarrier之类的同步工具类、ReadWriteLock之类的读写锁,到时候分析源码的时候,再来看看各自的具体实现。

  • 相关阅读:
    前端开发:export 和 export default的区别
    SEBlock | ECABlock | CBAM
    分享使用百度EasyDL实现安全帽智能识别
    研究告诉你,晨练好,还是晚练好?看看你运动的方式是正确的吗
    css line-height属性是什么
    Modbus协议详解2:通信方式、地址规则、主从机通信状态
    简单理解路由重分发(用两路由器来理解)
    【Java快速复习】一.数据类型与结构设计
    windows系统下压力测试工具(cpu使用率,内存使用率,磁盘使用率,磁盘空间)
    Java 插入公式到PPT幻灯片
  • 原文地址:https://blog.csdn.net/Joker_ZJN/article/details/140983419