• 多线程与高并发(7)——从ReentrantLock到AQS源码(两万字大章,一篇理解AQS)


    一、可重入锁、公平锁、非公平锁

    可重入锁: 指的是以线程为单位,当一个线程获取对象锁之后,这个线程可以再次获取本对象上的锁。(防止死锁,自己锁住自己)
    可重入锁的实现原理是可以看Synchronized实现原理,JVM会记下锁的持有线程,并且将计数器置为 1,进入时+1,退出时就-1,为0即释放锁。
    公平锁: 就是很公平,在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列中的第⼀个,就占⽤锁,否者就会加⼊到等待队列中。
    非公平锁: 谁抢了算谁的。一上来就尝试占有锁,如果占有失败,则按照公平锁的方式等待。

    二、ReentrantLock和Synchronized对比

    在这里插入图片描述
    Synchronized我们看之前文章:多线程与高并发(2)——synchronized用法详解
    上表可以说在很多文章中都有,对于ReentrantLock使用方法呢,看以下代码:

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class LockTest {
    
        public static void main(String[] args) {
            // 1.初始化选择公平锁、非公平锁。默认非公平锁
            Lock lock = new ReentrantLock();
            //2.可用于代码块
            lock.lock();
            try {
                int i = 1;
                i++;
                try {
                    // 3.支持多种加锁方式,比较灵活; 具有可重入特性
                    if (lock.tryLock(100, TimeUnit.MILLISECONDS)) {
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 4.手动释放锁
                    lock.unlock();
                }
            } finally {
                // 4.手动释放锁
                lock.unlock();
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    三、ReentrantLock和AQS源码分析

    1、ReentrantLock类结构

    我们看ReentrantLock源码,如下:

    public class ReentrantLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = 7373984872572414699L;
        /** Synchronizer providing all implementation mechanics */
        private final Sync sync;
    	//1、内部类Sync 继承了AbstractQueuedSynchronizer 
        abstract static class Sync extends AbstractQueuedSynchronizer {...}
        //2、内部类NonfairSync 
        static final class NonfairSync extends Sync {...}
        //3、内部类FairSync 
        static final class FairSync extends Sync {...}
        //4、构造函数,默认非公平锁
         public ReentrantLock() {
            sync = new NonfairSync();
        }
        //5、构造函数,传参决定公平非公平
         public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
        //6、上锁
         public void lock() {
            sync.lock();
        }
        //7、响应中断
         public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
        ...
        //8、尝试上锁
        public boolean tryLock() {
            return sync.nonfairTryAcquire(1);
        }
        //9、生成一个条件,条件变量很大一个程度上是为了解决
        //Object.wait/notify/notifyAll难以使用的问题。
        //一个ReentrantLock对象可以同时绑定对个对象。ReenTrantLock提供了一个Condition(条件)类,
        //用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。
         public Condition newCondition() {
            return sync.newCondition();
        }
         protected Collection<Thread> getQueuedThreads() {
            return sync.getQueuedThreads();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    上面截取了ReentrantLock 的主要源码,在java.util.concurrent包中,其结构如下所示:
    在这里插入图片描述

    2、从ReentrantLock关联到AQS

    ReentrantLock是怎么调用到AQS的呢,我们应用debug模式跑一下上面的代码,对于非公平锁来说,代码调用如下:
    lock.lock()方法调用到了NonfairSync内部类中的lock()方法,如下所示:

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    上述代码中,compareAndSetState()、setExclusiveOwnerThread()、acquire()均为AQS中的方法。
    compareAndSetState()源码如下:

     protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    
    • 1
    • 2
    • 3
    • 4

    上述代码的大概意思如下:

    1、通过compareAndSetState也就是CAS设置锁的状态(stateOffset),如果锁状态设为1成功,那就获取到了锁,就调用setExclusiveOwnerThread(Thread.currentThread());将当前线程设为独占线程。
    2、如果上锁失败,就调用 acquire(1)方法。

    对于公平锁来说,加锁代码如下:

     static final class FairSync extends Sync {
    		...
            final void lock() {
                acquire(1);
            }
            ...
          }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这里同样调用了acquire(1)方法。
    我们结合了开头公平锁和非公平锁的介绍,大致可以猜到,acquire()方法中会让线程进入一个等待队列,然后进行加锁。但是加锁是怎么实现的呢,acquire()方法是AQS中的一个核心方法,我们要先了解AQS。

    3、AQS解析

    AQS是什么呢,简单来说,AQS 为构建锁和同步器提供了一些通用功能的是实现。
    我们从它的源码开始看:

    public abstract class AbstractQueuedSynchronizer extends
        AbstractOwnableSynchronizer implements java.io.Serializable { 
        //等待队列的头节点
        private transient volatile Node head;
        //等待队列的尾节点
        private transient volatile Node tail;
        //同步状态
        private volatile int state;
        //设置同步状态的方式
        protected final int getState() { return state;}
        protected final void setState(int newState) { state = newState;}
        protected final boolean compareAndSetState(int expect, int update) {
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    可以看出,AQS的核心是volatile修饰的state和等待队列。
    从上面代码可以看到,state是用volatile修饰的用来表示同步状态。
    当一个线程过来的时候,如果被请求的共享资源是空闲的,那么就将此线程设置为工作线程,将共享资源设置为锁定状态。如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配,将暂时获取不到锁的线程加入到队列中。
    AQS用到了模板方法模式,可参考java(面向对象)的23种设计模式(10)——模板方法模式

    3.1 AQS框架

    AQS的架构图如下所示,图片来源:美团技术团队
    在这里插入图片描述
    上图中,head、tail、state、spinForTimeoutThreshold皆为属性值,其他为方法。

    1、从上图中可以看出,AQS结构由浅入深,从API到数据提供层。所谓AQS用到了模板方法模式,此模式基本定义了算法实现的流程及部分抽象和基本方法,AQS中定义了加锁解锁的基本流程,提供了加锁解锁的API,具体如何改变加锁状态(state的值),则由具体子类或者叫自定义同步器(比如ReentrantLock)来实现。
    2、当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API 进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

    3.2 AQS原理

    从上面代码中可以看出,state代表共享资源的状态,由Node组成一个等待队列FIFO,多线程争用资源被阻塞时会进入此队列。 然后通过CAS来完成对state的修改。
    CLH:Craig、Landin and Hagersten 队列,是单向链表。
    虚拟双向队列(FIFO)是CLH的变体,AQS 是通过将请求共享资源的每条线程封装成一个节点 Node来实现锁的分配。
    原理图(网上找的哦)如下:
    在这里插入图片描述

    3.2.1 Node节点

    AQS中最基本的数据结构就是Node,等待队列是FIFO先进先出,只有前一个节点的状态为SIGNAL时,当前节点的线程才能被挂起。
    其源码如下:

    static final class Node {
            /** 表示线程以共享的模式等待锁 */
            static final Node SHARED = new Node();
            /** 表示线程正在以独占的方式等待锁 */
            static final Node EXCLUSIVE = null;
    
            /** waitStatus 为 1,表示线程获取锁的请求已经取消了 */
            static final int CANCELLED =  1;
            /** 为-1,表示线程已经准备好了,就等资源释放了*/
            static final int SIGNAL    = -1;
            /** 为-2,表示节点在等待队列中,节点线程等待唤醒(本文不过多讲解) */
            static final int CONDITION = -2;
            /** 
            * 为-3,当前线程处在 SHARED 情况下,该字段才会使用;
            * 前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
            */
            static final int PROPAGATE = -3;
            /** 0:当一个 Node 被初始化的时候的默认值 */
            volatile int waitStatus;
            /**前驱指针 */
            volatile Node prev;
            /**后继指针 */
            volatile Node next;
            /**
             * 表示处于该节点的线程;构造时初始化,使用后清空。
             */
            volatile Thread thread;
            /**
             * 指向下一个处于 CONDITION 状态的节点
             */
            Node nextWaiter;
            /**
             * 共享模式下是否处于等待状态
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
            /**
             * 返回前驱节点,没有的话抛出空指针
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // 用于建立初始头部或共享标记}
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    对于waitStatus,分为0,1,-1,-2,-3五种值,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。
    AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch,这些我们下篇文章来总结)。

    3.2.2 state状态

    state表示共享资源的状态,即同步状态。访问这个字段的方法如下:

    final getState();//获取 State 的值
    final setState();//设置 State 的值
    final compareAndSetState();//使用 CAS 方式更新 State
    
    • 1
    • 2
    • 3

    final修饰的方法,子类无法重写,说明在此模板方法模式里,这些具体方法不能被重写;但是,如何改变加锁状态,是子类去实现的。
    因此,可以通过修改state的状态,来设置同步,也就是加锁解锁。 这也是自定义同步器实现的基本思路。

    3.2.3 同步的主要方法

    AQS源码中,提供了以下方法进行同步(上图框架中也有标出):

    //该线程是否正在独占资源。只有用到condition才需要去实现它。
    isHeldExclusively();
    //独占方式。尝试获取资源,成功则返回true,失败则返回false。
    tryAcquire(int);
    //独占方式。尝试释放资源,成功则返回true,失败则返回false。
    tryRelease(int);
    //共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    tryAcquireShared(int);
    //共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
    tryReleaseShared(int)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。
    AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock。

    ReentrantLock 是独占锁,所以实现了 tryAcquire-tryRelease。state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
    再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

    独占模式和共享模式,加锁过程是不同的,对于自定义同步器来说,过程如下所示,图片来源:美团技术团队
    在这里插入图片描述
    在这里插入图片描述

    4、ReentrantLock同步过程源码详解

    4.1 加锁流程

    我编写下面内容,边总结了一张图,先把图放上来有助于理解,加锁流程(自己画的哦)如下:
    在这里插入图片描述

    4.1.1 非公平锁加锁

    我们还用debug模式走ReentrantLock.lock()方法;这里我们从默认锁也就是非公平锁来看,且边走代码边制图,整个流程就清晰了。
    首先,进入lock()方法,其调用的是NonfairSync的lock()方法,这里我们最开始已经写过了:

     static final class NonfairSync extends Sync {
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    通过CAS设置锁的状态,如果锁状态设为1成功,那就获取到了锁,就调用setExclusiveOwnerThread()将当前线程设为独占线程。如果获取锁失败,就调用acquire(1)方法;acquire方法如下:

    public final void acquire(int arg) {
    		//尝试获取资源否则放入等待队列,模式为独占模式
            if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    上面代码中,会先调用tryAcquire()与acquireQueued()方法尝试获取资源,获取成功则返回,失败则调用selfInterrupt()方法。
    tryAcquire()方法源码如下:

      protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    
    • 1
    • 2
    • 3

    nonfairTryAcquire代码如下:

    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //当前线程就是持有线程,state+1
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    从上面的代码我们可以看出,ReentrantLock的可重入特性。具体如下:

    1. State 初始化的时候为 0,表示没有任何线程持有锁。如果为0时,会再次CAS锁定资源。
    2. 当前线程再次持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
    3. 解锁也是对这个字段-1,一直到 0,此线程对锁释放。
    4.1.2 队列

    如果tryAcquire获取锁失败,就很调用addWaiter加入到等待队列中。addWaiter()源码如下,看注释哦:

     private Node addWaiter(Node mode) {
     		以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
            Node node = new Node(Thread.currentThread(), mode);
            // 尝试快速方式直接放到队列尾部
            //1、Pred 指针指向尾节点 Tail。
            Node pred = tail;
            //尾结点不为空
            if (pred != null) {
            	//node的前置节点指向pred
                node.prev = pred;
                //通过 compareAndSetTail 方法,完成尾节点的设置。
                //这个方法主要是对 tailOffset 和 Expect 进行比较,
                //如果 tailOffset 的 Node 和 Expect 的 Node 地址是相同的,
                //那么设置 Tail 的值为 Update 的值。
                //这里的Expect 值是pred;tailOffset 的值为当前节点的尾结点的值
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //上一步失败则通过enq入队。
            enq(node);
            return node;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    此方法通过CAS将当前线程加入等待队列的队尾。
    enq()方法源码如下:

     private Node enq(final Node node) {
     		//死循环,直到cas成功
            for (;;) {
                Node t = tail;
                // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                //正常流程
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    addWaiter 就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。
    CAS中,offset的值定义如下:

     static {
            try {
                stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
                headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
                tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
                waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
                nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));
    
            } catch (Exception ex) { throw new Error(ex); }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    从 AQS 的静态代码块可以看出,Offset都是获取一个对象的属性相对于该对象在内存当中的偏移量,这样我们就可以根据这个偏移量在对象内存当中找到这个属性,然后用于CAS对比操作。

    4.1.3 公平锁加锁

    上面讲了非公平锁加锁,我们现在讲一下公平锁加锁,对于公平锁来说,和非公平的区别就是一上来就先加入队列排队,tryAcquire()方法也有一点区别,源码如下:

    static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
            final void lock() {
                acquire(1);
            }
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    可以看出和非公平锁的区别在于:
    1、lock()方法一开始就acquire()获取资源;也就是说,一上来要先去触发加入队列并尝试获取资源。
    2、tryAcquire()中加入了hasQueuedPredecessors()判断。
    hasQueuedPredecessors()源码如下:

     public final boolean hasQueuedPredecessors() {
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这段代码什么意思呢?

    1、双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。
    2、当 h != t时(不止一个节点): 如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了 Tail 指向Head,没有将 Head 指向 Tail,此时队列中有元素,需要返回 True(这块具体见下边代码分析)。
    3、 如果(s =h.next) != null,说明此时队列中至少有一个有效节点。如果此时 s.thread ==Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的;如果 s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。

    对于下面的CAS代码来说:

    Node t = tail;
    if (t == null) { // Must initialize
    	if (compareAndSetHead(new Node()))
    		tail = head;
    } else {
    	node.prev = t;
    	if (compareAndSetTail(t, node)) {
    		t.next = node;
    		return t;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    tail指向了head,节点入队不是原子操作,所以会出现短暂的 head != tail,此时 Tail 指向最后一个节点,而且 Tail 指向 Head。如果 Head 没有指向 Tail(可见 5、6、7 行),这种情况下也需要将相关线程加入队列中。所以这块代码是为了解决极端情况下的并发问题。

    4.1.4 线程出队列

    通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。接下来呢,就是
    进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后进行工作。 那怎么轮到自己拿到资源呢,也就是怎么出队呢,此时,就是acquireQueued()方法了,源码如下:

     final boolean acquireQueued(final Node node, int arg) {
     		//标记是否拿到资源
            boolean failed = true;
            try {
            	//标记等待过程中是否被中断过
                boolean interrupted = false;
                //死循环,直到return为止
                for (;;) {
                	//当前节点的前驱节点
                    final Node p = node.predecessor();
                    //如果前驱节点是头结点,说明当前节点在真实数据队列的首部(头结点是虚节点),老二节点
                    //就尝试获取锁(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)
                    if (p == head && tryAcquire(arg)) {
                    //拿到资源后,将head指向该结点。
                    //所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
                        setHead(node);
                    //setHead中node.prev已置为null,此处再将head.next置为null,
                    //就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
                        p.next = null; // help GC
                        //拿到资源
                        failed = false;
                        //返回等待过程中是否被中断过
                        return interrupted;
                    }
                    //p节点部署头节点或者没有获得锁(可能是非公平锁被抢占了)
                    //判断node节点是否被阻塞(被阻塞条件:前驱节点的waitStatus为-1)
                    //防止无限循环浪费资源。
                    if (shouldParkAfterFailedAcquire(p, node) &&
                 //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。
                 //如果不可中断的情况下被中断了,那么会从park()醒过来,拿不到资源,继续进入park()等待。
                        parkAndCheckInterrupt())
                        //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
                        interrupted = true;
                }
            } finally {
                if (failed)
                // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),
                // 取消结点在队列中的等待。
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    setHead(node)是把当前节点设置为虚节点(head指向当前节点),并不修改waitStatus。源码如下:

      private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    我们总结一下上述过程,acquireQueued()过程如下:
    在这里插入图片描述

    我们在这里再回顾一下waitStatus状态的意义(懒得往上翻了):

    CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
    SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。 0:新结点入队时的默认状态。

    这时,我们再看一下shouldParkAfterFailedAcquire()源码:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    		//前一结点的状态
            int ws = pred.waitStatus;
            //状态为-1
            if (ws == Node.SIGNAL)
            	//如果已经告诉前驱(后面代码操作)拿完号后通知自己一下,那就可以安心休息(park)了
                return true;
            //状态为取消状态
            if (ws > 0) {
                /*
                 * 循环向前查找取消节点,把取消节点从队列中剔除,然后自己排到正常节点后面
                 * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,
                 * 稍后就会被保安大叔赶走了(GC回收)!
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * 如果前驱正常,那就把前驱的状态设置成SIGNAL,
                 * 告诉它拿完号后通知自己一下。
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    parkAndCheckInterrupt()源码如下:

     private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);//调用park()使线程进入waiting状态
            return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
        }
    
    • 1
    • 2
    • 3
    • 4

    为了防止因死循环导致 CPU 资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起 ,具体挂起流程(shouldParkAfterFailedAcquire 流程):
    在这里插入图片描述

    如果线程获取锁失败且被中断过,acquire()方法中会调用selfInterrupt()方法,把当前线程中断。

     static void selfInterrupt() {
            Thread.currentThread().interrupt();
        }
    
    • 1
    • 2
    • 3

    这里是为什么呢?为什么可能获得锁了,但是中断状态变了,还要中断呢,看一下解释,来源于:美团技术团队

    1、当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过 Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为 False),并记录下来,如果发现该线程被中断过,就再中断一次。
    2、线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。
    这里的处理方式主要是运用线程池中基本运作单元 Worder 中的 runWorker,通过Thread.interrupted()进行额外的判断处理,感兴趣的同学可以看下 ThreadPoolExecutor 源码。

    总结:
    1、结点进入队尾后,检查状态,找到park()点;
    2、调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
    3、被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。

    4.1.5 CANCEL状态节点

    acquireQueued 方法中的 finally 代码, 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),取消结点在队列中的等待。cancelAcquire(node)源码如下:

     private void cancelAcquire(Node node) {
            // 无效节点
            if (node == null)
                return;
    		//当前节点设置为虚节点
            node.thread = null;
            // 循环跳过过滤取消过的节点
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
            // 过滤后的节点的后置节点
            Node predNext = pred.next;
            // 当前节点的状态改掉
            node.waitStatus = Node.CANCELLED;
    
            // 如果当前节点是tail,将pred设置为尾节点
            if (node == tail && compareAndSetTail(node, pred)) {
            //更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
                compareAndSetNext(pred, predNext, null);
            } else {
                int ws;
                 // 如果当前节点不是head的后继节点,
                if (pred != head &&
                //1:判断当前节点前驱节点的是否为SIGNAL
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                //2:如果不是,则把前驱节点设置为SINGAL看是否成功
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                     // 如果1和2中有一个为true,再判断前一节点的线程是否为null
                    pred.thread != null) {
                    // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
                // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
                    unparkSuccessor(node);
                }
    
                node.next = node; // help GC
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    过程如下:
    1、当前节点是尾节点。如图所示,将pred设置为尾节点,将pred的后一节点指向为null。当前Node状态设为CANCELLED。
    在这里插入图片描述
    2、当前节点是 Head 的后继节点。或者上述条件不满足,那就唤醒当前节点的后继节点。
    在这里插入图片描述
    3、当前节点不是 Head 的后继节点,也不是尾节点。前驱节点设置为SINGAL或者前驱节点就是SINGAL,过程如下:
    在这里插入图片描述
    为什么所有的变化都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?

    执行 cancelAcquire 的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过 Try 代码块中的 shouldParkAfterFailedAcquire 方法了),如果此时修改 Prev 指针,有可能会导致 Prev指向另一个已经移除队列的 Node,因此这块变化 Prev 指针不安全。 shouldParkAfterFailedAcquire
    方法中,会执行下面的代码,其实就是在处理 Prev 指针。shouldParkAfterFailedAcquire
    是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 Prev指针比较安全。

    do {
    	node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    
    • 1
    • 2
    • 3

    4.2 解锁流程

    ReentrantLock 在解锁的时候,并不区分公平锁和非公平锁,解锁源码如下:

    public void unlock() {
            sync.release(1);
        }
    
    
    • 1
    • 2
    • 3
    • 4

    继续往下看,其调用了AQS的release()方法:

    public final boolean release(int arg) {
    //上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
            if (tryRelease(arg)) {
                Node h = head;
               /*
                * h == null Head 还没初始化。初始情况下,head == null,第一个节点入队,
                * Head 会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现 head == null。
    			* h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
    			* h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
                */
                if (h != null && h.waitStatus != 0)
                //头结点不为空并且头结点的waitStatus不是初始化节点情况,解除后继线程挂起状态
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    ReentrantLock 的tryRelease如下所示:

    //方法返回当前锁是不是没有被线程持有
     protected final boolean tryRelease(int releases) {
     			//减少可重入次数
                int c = getState() - releases;
                //当前线程不是持有锁的线程,抛出异常
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                //如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    unparkSuccessor()方法如下:

     private void unparkSuccessor(Node node) {
            /*
             * 获取头结点waitStatus
             */
            int ws = node.waitStatus;
            //置零当前线程所在的结点状态,允许失败
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * 头节点的下一个节点
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                //下个节点是null或者下个节点被cancelled,就从后往前找到队列最开始的非cancelled的节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
            	//把下一个不为空的节点unpark
                LockSupport.unpark(s.thread);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    为什么要从后往前找第一个非 Cancelled 的节点呢?原因如下。

    如果是从前往后找,由于极端情况下入队的非原子操作和 CANCELLED 节点产生过程中断开 Next
    指针的操作,可能会导致无法遍历所有的节点。
    由于并发问题,addWaiter()入队操作和cancelAcquire()取消排队操作都会造成next链的不一致,而prev链是强一致的,所以这时从后往前找是最安全的。

    我们总结一下解锁的流程,流程图如下:
    在这里插入图片描述
    总结
    好了,以上就是独占加锁解锁的全部流程。对于共享模式呢,这里就不过多描述了(篇幅太长!!!)。

  • 相关阅读:
    再立云计算“昆仑”,联想混合云Lenovo xCloud凭什么?
    跨境电商平台自养号测评防关联技巧!
    构建工具 Vite、Webpack、Rollup对比
    springBoot 项目中的静态资源文件夹static和模版文件文件夹templates
    如何快速开发一个简单实用的MES系统?
    Seata AT模式TransactionHook会被莫名删除?
    Google Earth Engine(GEE)——快速建立一个10km的格网
    史上最强 Java 学习路线图!
    Spring+spring mvc+mybatis整合的框架
    [算法刷题笔记]二叉树练习(2):对称二叉树有关的练习
  • 原文地址:https://blog.csdn.net/liwangcuihua/article/details/125387255