• AQS 源码解读之加锁篇


    AQS 基础篇
    AQS 源码解读之解锁篇

    以 ReentrantLock 创建的非公平锁为基础,进行 AQS 全流程的分析。

    分析 demo

    一共有 A、B、C 三个线程。

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    public class AQSDemo { // 带入一个银行办理业务的案例 public static void main(String[] args) { // 创建一个非公平锁 ReentrantLock lock = new ReentrantLock(); // 三个线程模拟3个网点 // A 顾客就是第一个顾客,此时没有没有其他顾客 new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "\t thread come in"); try { TimeUnit.MINUTES.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }, "A").start(); // 第二个线程 --> 由于受理窗口只有一个(只能一个线程持有锁),此时 B 只能等待 // 进入候客区 new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "\t thread come in"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }, "B").start(); // 第三个线程 --> 由于受理窗口只有一个(只能一个线程持有锁),此时 B 只能等待 // 进入候客区 new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "\t thread come in"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }, "C").start(); } }

    线程 A

    lock 方法分析

    第一步:

    调用抽象类 sync 的抽象 lock() 方法

    复制代码
    • 1
    • 2
    • 3
    public void lock() { sync.lock(); }

    第二步:

    抽象类 sync 的具体实现
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
    执行 compareAndSetState(0, 1) 方法
    复制代码
    • 1
    • 2
    • 3
    protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

    这个方法就是将 state 值进行比较修改,由于这个是第一个线程进来,所以通过比较修改,将 state 的值从默认的 0 改成了 1,然后返回 true。

    执行 setExclusiveOwnerThread(Thread.currentThread()) 方法
    复制代码
    • 1
    • 2
    • 3
    protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }

    设置当前拥有独占访问权限的线程,对应 Demo 中的 A 线程。

    总结

    第一个线程的执行逻辑比较简单,直接修改 state 和将当前占有的线程改成自己就可以了。

    线程 B

    lock 方法分析

    第一步:

    和第一个线程执行的是一样的代码。

    第二步:

    2.1 抽象类 sync 的具体实现
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
    2.2 执行 compareAndSetState(0, 1) 方法
    复制代码
    • 1
    • 2
    • 3
    • 4
    // expect = 0 update = 1 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

    此时当线程二再次执行比较并修改方法,想修改 state 的值时,通过比较对比。此时 state 的值已经被线程一修改成了 1,所以此时修改失败。返回 false。

    2.3 执行 acquire(1) 方法 1197
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    // arg = 1 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
    2.4 执行 !tryAcquire(arg) 方法
    2.4.1 此方法是 AQS 中的抽象类,需要查看器具体实现

    通过抛出异常的方式,强制子类必需实现其钩子程序。

    复制代码
    • 1
    • 2
    • 3
    protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
    2.4.2 找到具体实现,在 NonfairSync 类中,上面的代码可以 [点击查看](##### 2.1 抽象类 sync 的具体实现)。
    复制代码
    • 1
    • 2
    • 3
    • 4
    // acquires = 1 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
    2.5 执行了 nonfairTryAcquire(acquires) 方法
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    // acquires = 1 final boolean nonfairTryAcquire(int acquires) { // 此时 Thread = 第二个线程 final Thread current = Thread.currentThread(); // getState() 方法返回 1,因为 state 已经被第一个线程所修改了 int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // getExclusiveOwnerThread:获取当前占用锁的线程,也就是线程一 // 此时如果线程一再次过来获取到锁,就可以直接进去也就是可重入锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

    nonfairTryAcquire 方法首先校验了 state 的值是否等于 0,也就是看看上一个占用锁的线程是不是已经把资源给释放了。

    后续又校验了当前线程是不是和占用锁的线程是同一个,也就是一个可重入锁。

    线程 B 来判断的话都不满足条件,所以返回 false。

    返回 false 后 [!tryAcquire(arg) = true](######2.3 执行 acquire(1) 方法),所以继续执行。

    2.6 执行 addWaiter(Node.EXCLUSIVE) 方法

    Node.EXCLUSIVE:static final Node EXCLUSIVE = null; 也就是排他的意思。

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    // mode = null private Node addWaiter(Node mode) { // 创建一个 Node 节点 Node node = new Node(Thread.currentThread(), mode); // tail:尾节点,此时为 null Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
    2.6.1 new Node(Thread.currentThread(), mode);
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    // thread = 线程二 mode = null Node(Thread thread, Node mode) { // 将当前等待节点谁知为 null this.nextWaiter = mode; // 这个 Node 节点的线程设置为 线程二 this.thread = thread; }
    2.6.2 enq(node)
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    // node 等于 2.6.1 创建的 Node 节点 private Node enq(final Node node) { // 自旋操作 for (;;) { // 此时尾结点 tail 为 null Node t = tail; // t is null 进行初始化操作,这一步也就是意味着此时 CLH 队列中还没有任何一个元素。 if (t == null) { // 成功将队列里面的头节点替换成新创建的 Node 节点 if (compareAndSetHead(new Node())) // 将尾结点也指向新创建 Node 节点 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
    2.6.3 compareAndSetHead(new Node())
    复制代码
    • 1
    • 2
    • 3
    private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); }

    比较并替换掉头节点,如果是 null 的话,直接将头节点替换成新创建的 Node 节点。

    这里需要注意的是:队列中的第一个节点并不是我们线程二这个节点,而是系统自动帮我们创建了一个新的 Node 节点。

    替换成功返回 true 继续执行 if 语句里面的[代码](######2.6.2 enq(node));

    经过第一轮循环,此时 CLH 中的情况:

    qVfJSJ.png

    2.6.4 enq(node) 第二次循环
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    // node = 2.6.1 创建的 Node 节点,也就是线程二的 Node 节点 private Node enq(final Node node) { // 自旋操作 for (;;) { // 此时尾结点 tail 为 新创建的 node 节点 Node t = tail; // t is not null 执行 else if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { // 将 node 节点的前指针指向新创建的头节点 node.prev = t; // 通过比较替换将队列的尾结点替换了线程 B 的 Node 节点 if (compareAndSetTail(t, node)) { // 将系统初始的 Node 的后指针指向线程 B 的 Node 节点 t.next = node; // 然后返回 B 线程的 Node 节点 return t; } } } }

    经过第二次循环后,此时的 CLH 队列的情况如下

    qVfIfg.png

    2.7 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    // node:线程 B 对应的 Node 节点 arg = 1 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 自旋操作 for (;;) { // 获取现在队列中的第一个节点也就是系统创建的 Node 节点 final Node p = node.predecessor(); // P 现在是头节点,true。但是线程 B 尝试获取锁失败,false if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 所以执行这一步,返回 false。所以进行下一次循环 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
    2.7.1 node.predecessor();
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
    2.7.2 tryAcquire(arg)

    再走一遍 2.5 执行了 [!nonfairTryAcquire(acquires) 方法](#####2.5 执行了 nonfairTryAcquire(acquires) 方法)

    2.7.3 shouldParkAfterFailedAcquire(p, node)

    第一次循环

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    // pred 系统创建的 Node 节点,node 线程 B 对应的 Node 节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 此时头节点的 waitStatus = 0 int ws = pred.waitStatus; // Node.SIGNAL = -1 if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 通过比较替换,将头节点的值从 0 调整为 -1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

    现在对应 CLH 队列中的情况:

    qVqgCq.png

    该方法将头节点中 Node 的 waitStatus 的值改成了 -1,并且返回了 false。

    然后再次重复 [2.7 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)](######2.7 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 的操作,

    2.7.4 shouldParkAfterFailedAcquire(p, node)

    第二次循环

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    // node:线程 B 对应的 Node 节点 arg = 1 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 自旋操作 for (;;) { // 获取现在队列中的第一个节点也就是系统创建的 Node 节点 final Node p = node.predecessor(); // P 现在是头节点,true。但是线程 B 尝试获取锁失败,false if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 所以第二次循环后返回 ture,执行下一步 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    // pred 系统创建的 Node 节点,node 线程 B 对应的 Node 节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 经过第一次循环操作,此时头节点的 waitStatus = -1 int ws = pred.waitStatus; // Node.SIGNAL = -1 if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 通过比较替换,将头节点的值从 0 调整为 -1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
    2.7.5 parkAndCheckInterrupt()
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    // this 当前 B 线程 private final boolean parkAndCheckInterrupt() { // 将当前线程 B 进行挂起 LockSupport.park(this); return Thread.interrupted(); }

    此时,线程B相当于已经完成了入队操作,进行了挂起。不会再尝试去获取锁了,安安心心在 CLH 队列中等待唤醒操作。

    获取锁小总结

    线程 B 在锁已经被占用的情况下,会先去尝试抢占锁。如果抢占失败,AQS 回将线程 B 进行入队操作。但是在入队之前会先进行初始化操作,也就是先创建一个傀儡节点,由其充当头节点和尾结节点。

    队列初始化完成后,再将线程 B 对应的 Node 节点与傀儡节点进行连接,也就是傀儡节点的尾指针指向线程 B 的 Node 节点,线程 B 的 Node 节点的指针指向傀儡节点。最后将 CLH 队列的尾指针指向线程 B 的 Node 节点。

    将acquire线程 B 的 Node 节点加入到 CLH 队列中后又调用了 acquireQueued 方法,这里通过自旋使得线程 B 又抢占了两次锁,抢占到了的就进行后面的操作,没有抢占到便执行 parkAndCheckInterrupt 方法,将自己挂起,等待前面的线程执行完释放锁后将自己唤醒。

    线程 C

    lock 方法分析

    线程 C 和线程 B 前面执行的逻辑是一样,直到执行 addWaiter(Node.EXCLUSIVE) 方法时才有所出入,所以就直接分析 addWaiter(Node.EXCLUSIVE) 方法的执行流程。

    addWaiter(Node.EXCLUSIVE)

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    // mode 还是等于 null 排他 private Node addWaiter(Node mode) { // 创建线程 C 的 Node 节点 Node node = new Node(Thread.currentThread(), mode); // 此时尾指针指向的是线程 B 的 Node 节点 Node pred = tail; if (pred != null) { // 将线程 C 的 Node 节点的前指针指向线程 B 的 Node 节点 node.prev = pred; // 通过比较替换将线程的尾指针指向线程 C 的 Node 节点 if (compareAndSetTail(pred, node)) { // 将线程 B 的 Node 节点的后指针指向线程 C 的 Node 节点 pred.next = node; // 返回线程 C 的 Node 节点 return node; } } enq(node); return node; }

    线程 C 执行完 addWaiter 方法后此时的 CLH 队列的情况如下:

    qV4wGD.png

    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    // node 线程 c 对应的 Node arg = 1 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取线程 c 对应的 Node 节点的前指针对应的 Node(线程 B 的 Node 节点线程 B 的 Node 节点) final Node p = node.predecessor(); // 因为此时 head 头节点还是傀儡节点,所以不匹配,直接执行下面的代码 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

    shouldParkAfterFailedAcquire(p, node)

    第一次执行
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    // pred 线程 B 对应的 Node node 线程 c 对应的 Node private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 此时线程 B 的 waitStatus = 0 int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 通过比较替换将线程 B 的 waitStatus 改成 -1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
    第二次执行
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    // pred 线程 B 对应的 Node node 线程 c 对应的 Node private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 此时线程 B 的 waitStatus = -1 int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

    parkAndCheckInterrupt()

    复制代码
    • 1
    • 2
    • 3
    • 4
    private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

    线程 C 执行 parkAndCheckInterrupt 方法将自己挂起。

    此时 CHL 队列的情况如下:

    qZKXS1.png

    总结

    线程 C 执行的流程和线程 B 大致是差不太多的,但是线程 C 和线程与线程 B 最显著的区别就是少了两次锁的抢占 。在方法 acquireQueued 中,由于线程 C 的前指针指向的 Node 节点与头节点的 Node 不一样,所以就直接跳过了,不会执行后续的尝试抢占锁的方法。

    后面线程 C 执行 shouldParkAfterFailedAcquire 方法将其前指针指向的 Node 节点中的 waitStatus 的值从 0 改成了 -1。最后就是执行 parkAndCheckInterrupt 方法,将自己挂起。

  • 相关阅读:
    社区论坛小程序系统源码+自定义设置+活动奖励 自带流量主 带完整的搭建教程
    MySQL——慢查询日志分析
    窥一斑而知全豹,从五大厂商看MCU国产化的机遇和挑战
    pandas笔记:显示中间的省略号
    Mybatis的生命周期和作用域
    Spring之引入外部的属性配置文件
    借秋说愁卖产品
    从色情直播到直播电商
    计算机网络第6章应用层 单元测试(习题+答案+图文解析)
    盲目跟风考PMP认证?PMP还剩多少含金量?
  • 原文地址:https://www.cnblogs.com/lhnstart/p/16029767.html