• JUC并发编程--------AQS以及各类锁


    AQS

    什么是AQS

    java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队
    列、独占获取、共享获取等,而这些行为的抽象就是基于 AbstractQueuedSynchronizer(简称
    AQS) 实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。
    JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的
    一般是通过一个内部类Sync继承 AQS
    将同步器所有调用都映射到Sync对应的方法

    AQS具备的特性:

    • 阻塞等待队列
    • 共享/独占
    • 公平/非公平
    • 可重入
    • 允许中断

    AQS核心结构

    AQS内部维护属性  volatile int state
    state表示资源的可用状态
    State三种访问方式:
    • getState()
    • setState()
    • compareAndSetState()
    定义了两种资源访问方式:
    • Exclusive-独占,只有一个线程能执行,如ReentrantLock
    • Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
    AQS实现时主要实现以下几种方法:
    isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
    tryAcquire(int):独占方式。 尝试获取资源,成功则返回true,失败则返回false。
    tryRelease(int):独占方式。 尝试释放资源,成功则返回true,失败则返回false。
    tryAcquireShared(int):共享方式。 尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    tryReleaseShared(int):共享方式。 尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
    AQS定义两种队列
    同步等待队列: 主要用于维护获取锁失败时入队的线程。
    条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁
    AQS 定义了5个队列中节点状态:
    • 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
    • CANCELLED,值为1,表示当前的线程被取消;
    • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
    • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
    • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

    ReentrantLock

    ReentrantLock是一种可重入的独占锁,它允许同一个线程多次获取同一个锁而不会被阻塞。
    它的功能类似于synchronized是一种互斥锁,可以保证线程安全。相对于 synchronized,
    ReentrantLock具备如下特点:
    • 可中断
    • 可以设置超时时间
    • 可以设置为公平锁
    • 支持多个条件变量
    • 与 synchronized 一样,都支持可重入
    它的主要应用场景是 在多线程环境下对共享资源进行独占式访问,以保证数据的一致性和安全性。

    常用API

    ReentrantLock实现了Lock接口规范,常见API如下:

    void lock()
    获取锁,调用该方法当前线程会获取锁,当锁获
    得后,该方法返回
    void lockInterruptibly() throws InterruptedException
    可中断的获取锁,和lock()方法不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线
    boolean tryLock()
    尝试非阻塞的获取锁,调用该方法后立即返回。
    如果能够获取到返回true,否则返回false
    boolean tryLock(long time, TimeUnit unit)
    throws InterruptedException
    超时获取锁,当前线程在以下三种情况下会被返回:
    当前线程在超时时间内获取了锁
    当前线程在超时时间内被中断
    超时时间结束,返回false
    void unlock()
    释放锁
    Condition newCondition()
    获取等待通知组件,该组件和当前的锁绑定,当
    前线程只有获取了锁,才能调用该组件的await()
    方法,而调用后,当前线程将释放锁
    在使用时要注意 4 个问题:
    • 默认情况下 ReentrantLock 为非公平锁而非公平锁;
    • 加锁次数和释放锁次数一定要保持一致,否则会导致线程阻塞或程序异常;
    • 加锁操作一定要放在 try 代码之前,这样可以避免未加锁成功又释放锁的异常;
    • 释放锁一定要放在 finally 中,否则会导致线程阻塞。

    公平锁与非公平锁使用

    ReentrantLock支持公平锁和非公平锁两种模式:
    公平锁:线程在获取锁时,按照等待的先后顺序获取锁。
    非公平锁:线程在获取锁时,不按照等待的先后顺序获取锁,而是随机获取锁。ReentrantLock默认是非公平锁
    1. 1 ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁
    2. 2 ReentrantLock lock = new ReentrantLock(true); //公平锁
    3. //非公平锁
    4. final void lock() {
    5. if (compareAndSetState(0, 1))
    6. setExclusiveOwnerThread(Thread.currentThread());
    7. else
    8. acquire(1);
    9. }
    10. //公平锁
    11. final void lock() {
    12. acquire(1);
    13. }
    14. //两者的区别在于非公平锁刚进来则会先用当前线程去尝试获取锁,如果获取失败再走acquire流程

    可重入锁

    1. protected final boolean tryAcquire(int acquires) {
    2. final Thread current = Thread.currentThread();
    3. int c = getState();
    4. if (c == 0) {
    5. if (!hasQueuedPredecessors() &&
    6. compareAndSetState(0, acquires)) {
    7. setExclusiveOwnerThread(current);
    8. return true;
    9. }
    10. }
    11. else if (current == getExclusiveOwnerThread()) {
    12. //如果当前线程是获得锁的线程,表示重入的过程,这时候则需要将锁状态进行增加即可
    13. int nextc = c + acquires;
    14. if (nextc < 0)
    15. throw new Error("Maximum lock count exceeded");
    16. setState(nextc);
    17. return true;
    18. }
    19. return false;
    20. }

    源码解析

    公平锁:

    首先先看公平锁类FairSync,其中包含了lock() 以及tryAcquire(int acquires) 方法

    1. static final class FairSync extends Sync {
    2. private static final long serialVersionUID = -3000897897090466540L;
    3. final void lock() {
    4. acquire(1);
    5. }
    6. /**
    7. * Fair version of tryAcquire. Don't grant access unless
    8. * recursive call or no waiters or is first.
    9. */
    10. protected final boolean tryAcquire(int acquires) {
    11. final Thread current = Thread.currentThread();
    12. int c = getState();
    13. if (c == 0) {
    14. if (!hasQueuedPredecessors() &&
    15. compareAndSetState(0, acquires)) {
    16. setExclusiveOwnerThread(current);
    17. return true;
    18. }
    19. }
    20. else if (current == getExclusiveOwnerThread()) {
    21. int nextc = c + acquires;
    22. if (nextc < 0)
    23. throw new Error("Maximum lock count exceeded");
    24. setState(nextc);
    25. return true;
    26. }
    27. return false;
    28. }
    29. }

    要加锁的时候我们首先调用lock方法, 其中会调用acquire(1)方法

    1. public final void acquire(int arg) {
    2. if (!tryAcquire(arg) &&
    3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4. selfInterrupt();
    5. }

    调用acquire方法之后,由于我们使用的是公平锁,紧接着就会先调用公平锁的tryAcquire方法

    1. protected final boolean tryAcquire(int acquires) {
    2. final Thread current = Thread.currentThread();
    3. int c = getState();
    4. if (c == 0) {
    5. if (!hasQueuedPredecessors() &&
    6. compareAndSetState(0, acquires)) {
    7. setExclusiveOwnerThread(current);
    8. return true;
    9. }
    10. }
    11. else if (current == getExclusiveOwnerThread()) {
    12. int nextc = c + acquires;
    13. if (nextc < 0)
    14. throw new Error("Maximum lock count exceeded");
    15. setState(nextc);
    16. return true;
    17. }
    18. return false;
    19. }
    20. }

    进入tryAcquire方法之后

    首先先通过getState获取状态,当状态值为0的时候,表示锁未被持有,于是进入

    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current)

    的判断,在hasQueuedPredecessors()方法中,会判断线程是否有等待阻塞队列,如果有等待阻塞队列,并且头节点是当前线程,则返回false,加上'!'的非判断,就可以进入 compareAndSetState(0, acquires),尝试修改锁的状态为持有,如果成功则调用 setExclusiveOwnerThread(current);方法将当前线程设为锁持有线程。

    如果当前状态不为0的时候,表示锁已经被持有,这时候通过if (current == getExclusiveOwnerThread())方法判断锁的持有则是不是当前线程,如果是当前线程,则将锁的持有状态+1,代表重入锁

    1. public final boolean hasQueuedPredecessors() {
    2. // The correctness of this depends on head being initialized
    3. // before tail and on head.next being accurate if the current
    4. // thread is first in queue.
    5. Node t = tail; // Read fields in reverse initialization order
    6. Node h = head;
    7. Node s;
    8. return h != t &&
    9. ((s = h.next) == null || s.thread != Thread.currentThread());
    10. }

    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current)

    只有该方法返回false的情况,才能进入下一步通过cas获取锁。

    在该方法中:

    判断h!=t,表示头节点不等于尾节点,才能继续判断,否则就是头节点等于尾节点表示阻塞队列为空或则阻塞队列只有一个线程,那么可以直接返回false

    当h!=t的时候,表示存在阻塞队列,并且存在多个线程在等待,此时需要继续判断,当s=h.next)==null的时候,表示还不存在阻塞队列,则返回true,需要先创建阻塞队列

    当s=h.next)!=null的时候,表示已经存在阻塞队列了,这时候需要进行下一步判断

    当s.thread==Thread.currentThread()的时候,表示头节点的下一个节点是当前线程,则返回false准备尝试获取锁

    1. public final void acquire(int arg) {
    2. if (!tryAcquire(arg) &&
    3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4. selfInterrupt();
    5. }

    掉调用tryAcquire方法之后,如果没有成功获取锁,就会将进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法

    进入acquireQueued方法之前,先通过addWaiter(Node.EXCLUSIVE)用当前线程入参创建一个节点。

    1. private Node addWaiter(Node mode) {
    2. Node node = new Node(Thread.currentThread(), mode);
    3. // Try the fast path of enq; backup to full enq on failure
    4. Node pred = tail;
    5. if (pred != null) {
    6. node.prev = pred;
    7. if (compareAndSetTail(pred, node)) {
    8. pred.next = node;
    9. return node;
    10. }
    11. }
    12. enq(node);
    13. return node;
    14. }

    通过 Node node = new Node(Thread.currentThread(), mode);将当前线程放入新创的节点中,从上一个方法可以看出传入的mode是Node.EXCLUSIVE,表示独占锁

    创建好节点之后,首先判断tail尾节点是否为为空,如果不为空,则将当前节点的前驱节点设置为尾节点,然后通过compareAndSetTail(pred, node)方法,尝试将aqs维护的尾节点换成刚创建的界定啊,如果cas成功,则将原先的尾节点的后序节点设置成当前节点,至此新创建的节点变为尾节点。

    如果tail==null,表示还没存在尾节点,这时候需要调用enq(node);方法

    1. private Node enq(final Node node) {
    2. for (;;) {
    3. Node t = tail;
    4. if (t == null) { // Must initialize
    5. if (compareAndSetHead(new Node()))
    6. tail = head;
    7. } else {
    8. node.prev = t;
    9. if (compareAndSetTail(t, node)) {
    10. t.next = node;
    11. return t;
    12. }
    13. }
    14. }
    15. }

    进入enq方法之后,会循环创建尾节点,继续通过tail==null进行二次判断,如果当前尾节点还是null,则cas新建一个节点作为头节点(初始化),完成之后再设置尾节点等于头节点。

    然后进入第二次循环,此时tail肯定!=null,则将传入进来的新节点的前驱节点设置为尾节点,然后尝试交换尾节点,成功之后继续将原先的尾节点的后序节点设置为传入的新节点,至此,新线程入队成功,此时addWaiter方法调用完毕

    1. public final void acquire(int arg) {
    2. if (!tryAcquire(arg) &&
    3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4. selfInterrupt();
    5. }

    执行完addwaiter方法之后,接着执行acquireQueued方法

    1. final boolean acquireQueued(final Node node, int arg) {
    2. boolean failed = true;
    3. try {
    4. boolean interrupted = false;
    5. for (;;) {
    6. final Node p = node.predecessor();
    7. if (p == head && tryAcquire(arg)) {
    8. setHead(node);
    9. p.next = null; // help GC
    10. failed = false;
    11. return interrupted;
    12. }
    13. if (shouldParkAfterFailedAcquire(p, node) &&
    14. parkAndCheckInterrupt())
    15. interrupted = true;
    16. }
    17. } finally {
    18. if (failed)
    19. cancelAcquire(node);
    20. }
    21. }

    在该方法中,通过 final Node p = node.predecessor();获取当前线程节点的前驱节点,

    如果前驱节点是头节点,表示当前节点排第二,可以通过tryAcquire获取锁,获取锁成功,则通过setHead(node);方法,将节点线程设置为null(因为之后不在需要阻塞与唤醒了),

    并且返回fase(该返回值有特别的意义,如果返回true,方法结束之后会接着调用selfInterrupt();方法,将中断标记位设置为true,后面不会在被park中断),如果获取锁失败,就会进入shouldParkAfterFailedAcquire(p, node)方法判断是否需要进行中断,如果需要中断则调用parkAndCheckInterrupt()方法中断,中断结束之后就会将interrupted设置为true并返回。

    1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    2. int ws = pred.waitStatus;
    3. if (ws == Node.SIGNAL)
    4. /*
    5. * This node has already set status asking a release
    6. * to signal it, so it can safely park.
    7. */
    8. return true;
    9. if (ws > 0) {
    10. /*
    11. * Predecessor was cancelled. Skip over predecessors and
    12. * indicate retry.
    13. */
    14. do {
    15. node.prev = pred = pred.prev;
    16. } while (pred.waitStatus > 0);
    17. pred.next = node;
    18. } else {
    19. /*
    20. * waitStatus must be 0 or PROPAGATE. Indicate that we
    21. * need a signal, but don't park yet. Caller will need to
    22. * retry to make sure it cannot acquire before parking.
    23. */
    24. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    25. }
    26. return false;
    27. }

    该方法是为了判断是否需要进行中断,比较巧妙,

    首先进来会先判断前驱节点的状态,默认都是0,而该方法的目的就是要即将前驱节点状态设置成-1,在-1状态下,如果前驱节点释放锁,就会激活该节点,然后重新尝试获取锁。

    从源码上可以看到,当线程调用lock的时候,进来这个方法,前驱节点都是0,然后设置完前驱节点的状态为-1之后,会返回一个false,在外层又会进行一次自旋,如果前驱节点是头节点,则可以尝试通过cas尝试获取锁,当获取锁失败再次进来的时候,通过 if (ws == Node.SIGNAL)判断成功,直接返回ture,表示需要中断

    1. /**
    2. * Convenience method to park and then check if interrupted
    3. *
    4. * @return {@code true} if interrupted
    5. */
    6. private final boolean parkAndCheckInterrupt() {
    7. LockSupport.park(this);
    8. return Thread.interrupted();
    9. }

    线程park然后被中断之后,执行Thread.interrupted();清空中断标记位,避免下次无法中断造成不停的自旋不断消耗cpu,至此,lock全过程就完成了

    获取锁完成业务之后开始释放锁

    1. public void unlock() {
    2. sync.release(1);
    3. }
    4. public final boolean release(int arg) {
    5. if (tryRelease(arg)) {
    6. Node h = head;
    7. if (h != null && h.waitStatus != 0)
    8. unparkSuccessor(h);
    9. return true;
    10. }
    11. return false;
    12. }

    调用unlock方法,然后调用release方法,传入状态值为1

    1. protected final boolean tryRelease(int releases) {
    2. int c = getState() - releases;
    3. if (Thread.currentThread() != getExclusiveOwnerThread())
    4. throw new IllegalMonitorStateException();
    5. boolean free = false;
    6. if (c == 0) {
    7. free = true;
    8. setExclusiveOwnerThread(null);
    9. }
    10. setState(c);
    11. return free;
    12. }

    先通过tryRelease方法将state-1(加锁的时候是+1,释放锁则需要-1还原),如果状态值为0,则表示释放锁成功,此时通过   setExclusiveOwnerThread(null);将锁持有线程设置为null

    如果不为0,则及继续返回一个false,表示锁还没有释放完成,仍然被持有(重入锁状态);

    1. public final boolean release(int arg) {
    2. if (tryRelease(arg)) {
    3. Node h = head;
    4. if (h != null && h.waitStatus != 0)
    5. unparkSuccessor(h);
    6. return true;
    7. }
    8. return false;
    9. }

    如果释放锁成功,返回ture之后,进入方法体,判断头节点是否为为空,此时头节点就是自己,肯定不为空。并且waitStatus在下一个线程进入阻塞队列之前就已经被设置为-1,所以顺利执行unparkSuccessor(h);方法

    1. private void unparkSuccessor(Node node) {
    2. /*
    3. * If status is negative (i.e., possibly needing signal) try
    4. * to clear in anticipation of signalling. It is OK if this
    5. * fails or if status is changed by waiting thread.
    6. */
    7. int ws = node.waitStatus;
    8. if (ws < 0)
    9. compareAndSetWaitStatus(node, ws, 0);
    10. /*
    11. * Thread to unpark is held in successor, which is normally
    12. * just the next node. But if cancelled or apparently null,
    13. * traverse backwards from tail to find the actual
    14. * non-cancelled successor.
    15. */
    16. Node s = node.next;
    17. if (s == null || s.waitStatus > 0) {
    18. s = null;
    19. for (Node t = tail; t != null && t != node; t = t.prev)
    20. if (t.waitStatus <= 0)
    21. s = t;
    22. }
    23. if (s != null)
    24. LockSupport.unpark(s.thread);
    25. }

    进入该方法后,waitStatus的值为-1,小于0,于是开始通过cas将waitStatus设置为0。

    紧接着尝试找到当前节点的下一个节点,如果不为null,则通过unpark唤醒该线程

  • 相关阅读:
    Python——案例
    计算一组Tensor的直方图C算法实现
    StableAudio-大模型创作音乐的工具
    unity工具类篇 时间间隔帮助类(天、时、分、秒) 下篇
    讲讲团队工程化内的规范化
    设置Linux CentOS7桥接模式连网
    反序列化中_wakeup的绕过
    2022年RHCE认证考题解析最新版—RH294环境
    浅谈压力测试的重要目标及意义
    Alphalens使用方法细节判断
  • 原文地址:https://blog.csdn.net/Promise_J_Z/article/details/133052601