• ReentrantLock源码解析


    ReentrantLock源码解析

    ReentrantLock

    参考文档

    ReentrantLock介绍

    ReentrantLock,意思是“可重入锁”,关于可重入锁的概念在下一节讲述。ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法。

    基本语法上,ReentrantLock与synchronized很相似,它们都具备一样的线程重入特性,只是代码写法上有点区别而已。一个表现为API层面互斥锁(Lock),一个表现为原生语法层面的互斥锁(synchronized)。

    ReentrantLock相对synchronized而言还是增加了一些高级功能,主要有以下三项:

    • 等待可中断:当持有锁的线程长期不释放锁时,正在等待的线程可以选择放弃等待,改为处理其他事情,它对处理执行时间非常长的同步块很有帮助。而在等待由synchronized产生的互斥锁时,会一直阻塞,是不能被中断的。
    • 可实现公平锁:多个线程赶在等待同一个锁时,必须按照申请锁的时间顺序排队等待,而非公平锁则不保证这点,在锁释放时,任何一个等待锁的线程都有机会获得锁。synchronized中的锁是非公平锁,ReentrantLock默认情况下也是非公平锁,但可以通过构造方法ReentrantLock(true)来要求使用公平锁。
    • 锁可以绑定多个条件:ReentrantLock对象可以同时绑定多个Condition对象(条件变量或条件队列),而在synchronized中,锁对象的 wait()和notify()或notifyAll()方法可以实现一个隐含条件,但如果要和多于一个的条件关联的时候,就不得不额外地添加一个锁,而ReentrantLock则无需这么做,只需要多次调用newCondition()方法即可。而且我们还可以通过绑定Condition对象来判断当前线程通知的是哪些线程(即与Condition对象绑定在一起的其它线程)。

    ReentrantLock加锁成功流程

    非公平锁

    reentrantlock 主要有两个构造器 :

     	public ReentrantLock() {
            sync = new NonfairSync();
        }
    
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    分别由非公平锁NofairSync(默认) 和 公平锁 FairSync 来实现的

    非公平锁 :

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    非公平锁的代码如上面所示, NonfairSync 是 ReentrantLock的内部静态类, 并且添加了 final 关键字, 防止其他类去继承 NofairSync

    **如果一个类要被声明为static的,只有一种情况,就是静态内部类。**如果在外部类声明为static,程序会编译都不会过 :

    1.静态内部类跟静态方法一样,只能访问静态的成员变量和方法,不能访问非静态的方法和属性,但是普通内部类可以访问任意外部类的成员变量和方法

    2.静态内部类可以声明普通成员变量和方法,而普通内部类不能声明static成员变量和方法。

    3.静态内部类可以单独初始化

    image-20220916094334082

    NofairSync的继承关系如上图所示, 可以看到 NofairSync 继承了 Sync , Sync 继承了AQS, 所以可以使用AQS的提供的加锁/解锁方法

    加锁流程

    初始状态是没有竞争的时候 :
    image-20220917113437493

    如图所示 :

    • exclusizeOwnerThread = currentThread , state =0 => state = 1
     	protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    
    • 1
    • 2
    • 3

    exclusiveOwnerThreadReentrantLock的一个字段, 对象引用指向获取锁的线程


    当出现竞争的时候 :
    image-20220917113557632

    Thread-1 :

    1. 新的线程过来尝试修改 state 的值C
    2. AS尝试将state由0改为1, 失败 !
    3. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
    4. 获取锁失败 , 接下来进入 addWaiter 逻辑,构造 Node 队列
    addWaiter 逻辑 :
    image-20220917114319124

    addwaiter的逻辑是创建一个Node类对象, 然后插入到队列中, 队列使用双向链表实现

    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
    		...
    }
    
    • 1
    • 2
    • 3
    • 4
    当前线程进入 acquireQueued 逻辑 :

    源码如下 :

    	// ReentrantLock -> lock()
    	final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
    	// AbstractQueuedSynchronizer ->  acquire
    	public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    	// AbstractQueuedSynchronizer ->  acquireQueued
    	final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
    2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
    3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
    image-20220917115732233
    1. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败
    2. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true
    3. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)阻塞
    image-20220917115537382

    解锁流程

    多次竞争失败 :
    image-20220917115858932
    开始进入解锁流程 :

    找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1

    如果加锁成功(没有竞争),会设置 :

    1. exclusiveOwnerThread 为 Thread-1,state = 1
    2. head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread, 相当于把原本的Thread-1所在的Node设置为虚拟头结点
    image-20220917115943506

    非公平锁的体现

    • 哪个线程能获取锁, 并不是根据队列顺序, 如果是公平锁的情况下, 新来的 Thread-4 线程需要往后面拍, 让Thread-1 先获取锁
    • 非公平锁的话, 队列中排第二的Node中的线程(第一是虚拟头节点)要和新来的线程竞争
    image-20220917120327591

    ReentrantLock加锁源码

    NonfairSync

    
    // Sync 继承自 AQS
    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            // 加锁实现
            final void lock() {
                // 尝试获取锁
                if (compareAndSetState(0, 1))
                    // 获取锁成功. 设置当前线程为锁拥有的线程
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    //  如果尝试失败,进入 acquire [1]
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    AQS

    **[1]acquire **:
    	
    	// [1]AQS 继承过来的方法 
    	public final void acquire(int arg) {
            // [2]tryAcquire 再次尝试获取锁
            if (!tryAcquire(arg) &&
                // 如果 tryAcquire失败, 就执行 addWaiter[4] , 再执行 acquireQueued[5]
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    1. 先调用 tryAcquire() , 如果失败
    2. 执行 addWaiter , 然后执行 acquireQueued, 如果成功
    3. 调用 selfInterrupt(); 阻塞
    [2]tryAcquire :

    选中方法, ctrl + alt + b 即可查看 这个方法的所有实现的类 NonfairSync -> tryAcquire

    		// NonfairSync -> 	tryAcquire [2] -> [3]
    		protected final boolean tryAcquire(int acquires) {
                //  Sync -> nonfairTryAcquire
                return nonfairTryAcquire(acquires);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    [3]nonfairTryAcquire :

    AbstractQueuedSynchronizer ==> Sync -> nonfairTryAcquire

    // [3] Sync 继承过来的方法, 方便阅读, 放在此处
    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
        		// 如果还没有获得锁
                if (c == 0) {
                    // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
        		// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
                else if (current == getExclusiveOwnerThread()) {
                    // state++ 
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
        		// 获取失败, 回到调用处
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    1. 获取锁的状态字段 state
    2. 如果还没有获取锁, 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
    3. 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入, state++, 重新设置 state
    4. state 获取失败, 回到调用处
    [4]addWaiter :
    // [4]AQS 继承过来的方法, 方便阅读, 放在此处
    private Node addWaiter(Node mode) {
            // 将当前线程关联到一个 Node 对象上, 模式为独占模式
        	Node node = new Node(Thread.currentThread(), mode);
            // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    // 双向链表
                    pred.next = node;
                    return node;
                }
            }
        	// 尝试将 Node 加入 AQS, 进入 [6]
            enq(node);
            return node;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    [6]enq
    // AQS 继承过来的方法, 方便阅读, 放在此处
    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                // 判断尾部是否为 null ,如果是 就创建一个虚拟节点, head -> node(null)
                if (t == null) { // Must initialize
                    // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    // cas 尝试将 Node 对象加入 AQS 队列尾部
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    [5]acquireQueued :
    //  AQS 继承过来的方法, 方便阅读, 放在此处
    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取锁
                    if (p == head && tryAcquire(arg)) {
                        // 初始状态 head -> node(null) -> node(thred = thread-1) -> node(thred = thread-2)
                        // head -> node(thred = thread-1) -> node(thred = thread-2) , node(null) 被 GC 回收
                        // head -> node(thred = null) -> node(thred = thread-2) , thread-1 已经获取锁了, 对应的node 被设置为 null
                        // 获取成功, 设置自己(当前线程对应的 node)为 head
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        // 返回中断标记 false
                        return interrupted;
                    }
                    // 判断是否应当 park, 进入 [7]
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        // park 等待, 此时 Node 的状态被置为 Node.SIGNAL [8]
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    1. 将头节点 head 指向 第二个节点 node(thread-1) , 虚拟头结点node(null)失去引用就会被垃圾回收
    2. 原本第二个节点 node(thread-1) 会被作为新的虚拟节点, 并且 node(thread = null) 的 thread 会被重置为 null , 返回中断标记
    3. 判断是否应该进入 park => [7]shouldParkAfterFailedAcquire
    [7]shouldParkAfterFailedAcquire
    //  AQS 继承过来的方法, 方便阅读, 放在此处   
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        	// 获取上一个节点的状态    
        	int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
             	// 上一个节点都在阻塞, 那么自己也阻塞好了
                return true;
        	//  > 0 表示取消状态 , 取消状态就是当前线程不再等待锁, 取消等待锁的状态了
            if (ws > 0) {
    			// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
              	// 这次还没有阻塞
                // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    1. 获取前一个节点的 waitStatus
    2. 如果前一个节点是 -1 (Node.SIGNAL = 1) , 说明 这个节点正在阻塞, 那么就直接返回 true , 这样就会执行 parkAndCheckInterrupt()
    3. 如果上一个节点的状态是取消状态, 那么就会删除前面所有的取消的节点, 取消的意思是, 对应的线程不再选择在队列里等待锁, 而是取消等待
    4. 如果前一节点还灭有阻塞, 就设置 waitStatus 为 -1 ( Node.SIGNAL)
    [8]shouldParkAfterFailedAcquire
    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
    }
    
    • 1
    • 2
    • 3
    • 4

    是否需要 unpark 是由当前节点的前驱节点waitStatus == Node.SIGNAL 来决定,而不是本节点的waitStatus 决定

    ReentrantLock解锁源码

    unlock

    Sync 继承自 AQS

    public class ReentrantLock implements Lock, java.io.Serializable {
        // 解锁实现
        public void unlock() {
            sync.release(1);
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    release

    // AQS 继承过来的方法, 方便阅读, 放在此处
    public final boolean release(int arg) {
       		// 尝试释放锁, 进入 [1]
            if (tryRelease(arg)) {
                //  从队列头节点 开始 unpark
                Node h = head;
                //  队列不为 null 并且  waitStatus == Node.SIGNAL(-1) 才需要 unpark
                if (h != null && h.waitStatus != 0)
                    // unpark AQS 中等待的线程, 进入[2]
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    1. 调用 tryRelease 尝试释放锁
    2. 如果释放成功的情况下, 尝试唤醒(unpark)队列中其他阻塞( waitStatus == Node.SIGNAL(-1))的线程 , 最后返回 true
    3. 释放锁失败,
    [1]tryRelease :
    	
    //  Sync 继承过来的方法, 方便阅读, 放在此处
    	protected final boolean tryRelease(int releases) {
            	// state--
                int c = getState() - releases;
            	// 判断当前线程是否是获取锁的那个线程
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
            	// c == 0 是为了
            	// 支持锁重入, 只有 state 减为 0, 才释放成功
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    [2]unparkSuccessor :
     private void unparkSuccessor(Node node) {
    		// 如果状态为 Node.SIGNAL 尝试重置状态为 0
         	// 不成功也可以
            int ws = node.waitStatus;
            if (ws < 0) // ws = Node.SIGNAL(-1)
                // 重置状态state
                compareAndSetWaitStatus(node, ws, 0);
    		// 找到需要 unpark 的节点(state==-1), 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
            Node s = node.next; // head-> node(null) -> node(thread-1)
         	// waitStatus > 0 表示取消状态
            if (s == null || s.waitStatus > 0) {
                s = null;
                // 不考虑已取消的节点(), 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    // waitStatus <= 0 阻塞
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    1. 获取当前节点的 waitSate , 如果当前 waitSate < 0, 当前节点对应的 Thread 已经阻塞, 所以要重置当前节点的state为0
    2. 如果 阻塞队列为空 (s == null) , 或者 s 对应的线程等待状态已经被取消了 s.waitStatus > 0
    3. 不考虑已取消的节点(s.waitStatus > 0), 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
    4. 找到需要unpark的节点以后, 执行 LockSupport.unpark(s.thread);

    可重入原理

    nonfairTryAcquire

    static final class NonfairSync extends Sync {
     	
        // Sync 继承过来的方法, 方便阅读, 放在此处
        final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
            	// 没有其他人获取锁的情况
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
            	// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    1. 没有其他人获取锁的情况 , 修改状态state, 获取锁
    2. 已经获取锁的情况下, 去判断 当前线程是否是获取锁的线程
    3. 然后对 state 进行累加 int nextc = c + acquires;, 表示重入锁的次数 setState(nextc)

    tryRelease

    	
    	//  Sync 继承过来的方法, 方便阅读, 放在此处
    	protected final boolean tryRelease(int releases) {
            	// state--
                int c = getState() - releases;
            	// 判断当前线程是否是获取锁的那个线程
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
            	// c == 0 是为了
            	// 支持锁重入, 只有 state 减为 0, 才释放成功
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    1. 让 state - 1 , 然后返回 false , 相当于是把锁重入的计数减一
    2. 当 c == 0, 也就是计数减到0的时候, 说明已经释放成功了, 然后会把 free 设置为 true , 设置 setExclusiveOwnerThread(null); 为 null
    3. 返回释放锁成功的标志 true

    流程图解

    reentrant源码解析-可重入原理

    可打断原理

    不可打断模式

    在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

    // Sync 继承自 AQS
    static final class NonfairSync extends Sync {
        
        // 从AQS继承来的方法
        private final boolean parkAndCheckInterrupt() {
            //  如果打断标记已经是 true, 则 park 会失效
            LockSupport.park(this);
            // interrupted 会清除打断标记
            return Thread.interrupted();
        }
        
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    // 1. 先获取锁 
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        // 2. 返回打断状态
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    	
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                // 返回的 interrupted = true
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                // 如果打断的状态为 true
                selfInterrupt();
        }
        
        static void selfInterrupt() {
            // 重新产生一次中断
            Thread.currentThread().interrupt();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    可打断模式

    static final class NonfairSync extends Sync {
        
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            // 如果没有获得到锁, 进入 [1]
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    [1]可打断的获取锁流程:
    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        // 在 park 过程中如果被 interrupt 会进入此
                        parkAndCheckInterrupt())
                        //  这时候抛出异常, 而不会再次进入 for (;;)
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    注意 , 与不可打断模式, 这里是设置 打断标记为true, 会继续进行for循环

    	if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
    
    • 1
    • 2
    • 3

    可打断模式 : 会直接抛出异常

    if (shouldParkAfterFailedAcquire(p, node) &&
                        // 在 park 过程中如果被 interrupt 会进入此
                        parkAndCheckInterrupt())
                        //  这时候抛出异常, 而不会再次进入 for (;;)
                        throw new InterruptedException();
    
    • 1
    • 2
    • 3
    • 4
    • 5

    公平锁原理

    static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
         	// 与非公平锁主要区别在于 tryAcquire 方法的实现
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        
        	
        	// AQS 继承过来的方法, 方便阅读, 放在此处
        	public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        	}
        
        	
        	public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            // h != t 说明 队列中有 等待的线程
            return h != t &&
                // h.next == null 说明没有 等待的线程
                // h = head -> node(null) -> node(thread1)
                // 判断当前线程是否是第二个节点(第一个为head)对应的线程
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    1. 判断队列中是否有线程等待 (如果有 = true)
    2. 判断当前线程是是否是排第一个等待的线程, 如果不是, 返回true, 如果是返回false
    3. 当前线程是排队第一等待的线程 hasQueuedPredecessors() = false , !hasQueuedPredecessors() = true
    4. 尝试修改状态state获取锁

    公平锁

    acquire(1) -> tryAcquire(acquires = 1)

    // 与非公平锁主要区别在于 tryAcquire 方法的实现
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    1. 获取当前锁的状态, 如果没有人持有锁
    2. 先检查AQS队列中是否有前驱节点, 也就是说, 判断下当前节点是否是优先级最高的 (排在队列第一的线程优先级肯定是最高的)
    3. 如果自己是优先级最高的情况下, 就尝试获取锁

    hasQueuedPredecessors :

    public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            // h != t 说明 队列中有 等待的线程
            return h != t &&
                // h.next == null 说明没有 等待的线程
                // h = head -> node(null) -> node(thread1)
                // 判断当前线程是否是第二个节点(第一个为head)对应的线程
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    1. 判断队列中是否有线程等待 (如果有 = true)
    2. 判断当前线程是是否是排第一个等待的线程, 如果不是, 返回true, 如果是返回false
    3. 当前线程是排队第一等待的线程 hasQueuedPredecessors() = false , !hasQueuedPredecessors() = true
    4. 尝试修改状态state获取锁

    非公平锁

    acquire(1) -> tryAcquire(acquires = 1)

    	protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    
    	final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    1. 非公平锁不会去考虑AQS等待队列中是否有 线程正在等待的, 无论是新来的, 还是等待的(锁释放的时候就会唤醒这些等待的线程) 一块竞争.

    条件变量实现原理

    await流程

    1. 让当前持有锁的线程 thread-1 wait , 进入Condition等待队列

    2. 让NonfairSync同步器释放锁 :

      1. owner = null
      2. state = 0
    3. 唤醒 AQS等待队列中优先级最高的节点 , 让其竞争锁

    4. 阻塞 thread-1 (unpark)

    await源码
    	public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
            	// 进入 fullyRelease 流程
            	// fullRelease 
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    // 是当前线程处于等待状态
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    addConditionWaiter流程
    image-20220918183430436
    fullRelease流程
    image-20220918203026157
    竞争锁流程
    image-20220918204301406
    1. fullRelease 之后 唤醒(unpark) AQS 队列中的下一个节点(排名最靠前的)来竞争锁,
    2. 假设没有其他竞争线程,那么 Thread-1 竞争成功
    3. park 阻塞 Thread-0

    signal流程

    初始状态

    假设 Thread-1 要来唤醒 Thread-0

    image-20220918210546346
    doSignal流程
    image-20220918210721637
    transferForSignal流程
    image-20220918210932397
    1. 执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,
    2. 将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1
    3. Thread-1 释放锁,进入 unlock 流程
  • 相关阅读:
    时间空间复杂度分析--选择排序算法
    三农数据(1996-2020)八:农林牧渔业总产值、增加值构成及增加值率、中间消耗
    【无标题】
    基于AT89C51流水花样灯proteus仿真设计
    安利一波C2工具
    大学生想做兼职应该怎么找,适合大学生的线上线下靠谱兼职推荐
    postman环境变量的设置
    plantuml画图
    自动化测试框架有哪几种?搭建的思路是什么?完整指南奉上!
    java版工程管理系统Spring Cloud+Spring Boot+Mybatis实现工程管理系统源码
  • 原文地址:https://blog.csdn.net/weixin_40040107/article/details/126923220