• AQS之ReentrantLock分析 (五)


    1.案例说明

       public static void main(String[] args){
            ReentrantLock lock = new ReentrantLock();
    
            new Thread(()->{
               lock.lock();
               try {
                   System.out.println("A");
                   try { TimeUnit.SECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
               }finally {
                   lock.unlock();
               }
            }, "A").start();
    
            new Thread(()->{
               lock.lock();
               try {
                   System.out.println("B");
               }finally {
                   lock.unlock();
               }
            }, "B").start();
    
    
            new Thread(()->{
                lock.lock();
                try {
                    System.out.println("C");
                }finally {
                    lock.unlock();
                }
            }, "C").start();
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    相当于3个客户访问一个线程。

    2.代码过程

    2.1 初始过程一

    在这里插入图片描述

    AQS 中的 state表示信号灯, 0表示没有人占用此线程

    2.2 线程A的改变
    final void lock() {
        //通过 CAS 的方式尝试将 state 从0改为1,
        //如果返回 true,代表修改成功,获得锁资源;
        //如果返回false,代表修改失败,未获取锁资源
        if (compareAndSetState(0, 1))
            // 将属性exclusiveOwnerThread设置为当前线程,该属性是AQS的父类提供的
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 通过CAS尝试将state从0变为1, 如果成功的话, 则获得资源, 失败的话, 进入到else中, 尝试获得资源。
    • compareAndSetState():底层调用的是unsafe的compareAndSwapInt,该方法是原子操作。

    在这里插入图片描述

    ThreadA进入窗口, state变为1

    2.3 线程B的改变

    ThreadB 进入窗口, 走下面的else的acquire()方法

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    tryAcquire() -> addWaiter() -> acquireQueued() 的顺序执行。

    tryAcquire(): 存在公平锁FairSync的实现也存在NofaireSync非公平锁的实现。

    走非公平锁:

    在这里插入图片描述

    1.tryAcquire():

    final boolean nonfairTryAcquire(int acquires) {
     // 获取当前线程
     final Thread current = Thread.currentThread();
     //获取AQS 的 state 
     int c = getState();
     // 如果 state 为0,代表尝试再次获取锁资源
     if (c == 0) {
      // 步骤同上:通过 CAS 的方式尝试将 state 从0改为1,
      //如果返回 true,代表修改成功,获得锁资源;
      //如果返回false,代表修改失败,未获取锁资源
      if (compareAndSetState(0, acquires)) {
       //设置属性为当前线程
       setExclusiveOwnerThread(current);
       return true;
      }
     }
     //当前占有锁资源的线程是否是当前线程,如果是则证明是可重入操作
     else if (current == getExclusiveOwnerThread()) {
      //将 state + 1
      int nextc = c + acquires;
      //为什么会小于 0 呢?因为最大值 + 1 后会将符号位的0改为1 会变成负数(可参考Integer.MAX_VALUE + 1)
      if (nextc < 0) // overflow
       //加1后小于0,超出锁可重入的最大值,抛异常
       throw new Error("Maximum lock count exceeded");
      //设置 state 状态
      setState(nextc);
      return true;
     }
     return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    因为ThreadA 获取到窗口, 所以state = 1, 不走if(c == 0), 又因为此时是ThreadB, 不是ThreadA, 那么返回false。

    2.addWaiter() -> enq()

    根据上一个方法的失败走addWaiter(), 可以将没有获取到锁的线程node甩到队列的尾部

    private Node addWaiter(Node mode) {
     //创建 Node 类,并且设置 thread 为当前线程,设置为排它锁
     Node node = new Node(Thread.currentThread(), mode);
     // 获取 AQS 中队列的尾部节点
     Node pred = tail;
     // 如果 tail == null,说明是空队列,
     // 不为 null,说明现在队列中有数据,
     if (pred != null) {
      // 将当前节点的 prev 指向刚才的尾部节点,那么当前节点应该设置为尾部节点
      node.prev = pred;
      // CAS 将 tail 节点设置为当前节点
      if (compareAndSetTail(pred, node)) {
       // 将之前尾节点的 next 设置为当前节点
       pred.next = node;
       // 返回当前节点
       return node;
      }
     }
     enq(node);
     return node;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    当tail不为空的时候, 即队列有数据的时候, 会将Node放入队列中, 本来此时数列无数据, 那么走下面的enq()

    enq():

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    先判断尾结点如果为null, 那么会初始化一个头结点, 尾结点也指向这个初始化的节点, 这个节点为傀儡节点。

    在这里插入图片描述

    所以说第一个节点为傀儡节点, thread = null, waitStatus = 0

    在这里插入图片描述

    于是线程B 进入enq()会进入enq()方法中的else, 这个else会使得线程B 进入队列中。

    先是使得ThreadB的前指针指向傀儡节点, 然后将FIFO的tail指向ThreadB, 最后傀儡节点的next指向ThreadB节点。

    在这里插入图片描述
    3.acquireQueued()

    将已经在队列中的node尝试去获取锁否则挂起: 在队列中的线程node 会去尝试获取锁。

    final boolean acquireQueued(final Node node, int arg) {
     // 获取锁资源的标识,失败为 true,成功为 false
     boolean failed = true;
     try {
      // 线程中断的标识,中断为 true,不中断为 false
      boolean interrupted = false;
      for (;;) {
       // 获取当前节点的上一个节点
       final Node p = node.predecessor();
       //p为头节点,尝试获取锁操作
       if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null;
        // 将获取锁失败标识置为false
        failed = false;
        // 获取到锁资源,不会被中断
        return interrupted;
       }
       // p 不是 head 或者 没拿到锁资源,
       if (shouldParkAfterFailedAcquire(p, node) &&
        // 基于 Unsafe 的 park方法,挂起线程
        parkAndCheckInterrupt())
        interrupted = true;
      }
     } finally {
      if (failed)
       cancelAcquire(node);
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    获取到当前节点的上一个节点, 如果是头节点的话, 会尝试获取锁, 如果不是头节点, 那么表示没有拿到锁资源

    这里当前线程是ThreadB, 那么上一个接点是头节点, 那么会尝试获取锁。

    在这里插入图片描述

    c = 1, 但是, 当前线程是ThreadA节点, 那么直接返回false。

    在这里插入图片描述

    走if, 那么进入shouldParkAfterFailedAcquire方法, 根据waitStatus去进行不同的操作。

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                return true;
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    ws = 0, 走下面的else, cas修改哨兵节点的waitStatus = -1

    因为是一个死循环, 所以再次进入shouldParkAfterFailedAcquire方法, 返回true, 进入parkAndCheckInterrupt()进行阻塞ThreadB。

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
    • 1
    • 2
    • 3
    • 4
    2.4 释放线程A

    在这里插入图片描述

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    c=0, 判断当前线程是否为窗口的线程, 是的话, 进行下面的代码逻辑, 因为c=0, 所以free=true, 并且设置窗口线程为null, 设置AQS的state为0

    在这里插入图片描述
    于是进入release方法中的if逻辑, 这里head不为null, head.waitStatus=-1, 所以进入方法unparkSuccessor

        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    ws=-1, 通过cas修改为0, s为ThreadB节点, 因为不是null, 所以进入下面的LockSupport.unpark()进行唤醒

    2.5 线程B上位

    之前分析, 线程A节点退出了窗口, 而且线程B节点也唤醒, 于是通过acquire()尝试获取资源

    在这里插入图片描述

    tryAcquire() -> addWaiter() -> acquireQueued() 的顺序执行。

    在这里插入图片描述

    当前线程为线程B, c=0, 那么通过cas改变c为1, 设置当前窗口线程为B, 并且返回true

    acquireQueued()

    在这里插入图片描述

    当前的窗口为ThreadB, 那么tryAcquire()返回为true, 进入if, 这里的if 是将队列中的ThreadB节点去除, 并且将头节点设置为傀儡节点。

    3.问题解析

    3.1 问题一

    QA: 为什么要从尾结点往前查找呢

    因为在addWaiter方法中是先给prev指针赋值,最后才将上一个节点的next指针赋值,为了避免防止丢失节点或者跳过节点,必须从后往前找。

    我们举例中head节点的状态为-1,通过CAS的方式将head节点的waitStatus设置为0。

    在这里插入图片描述

    头结点的后继节点是线程2所在的节点,不为null,所以这边会执行unpark操作,从上边的acquireQueued()内的parkAndCheckInterrupt()方法继续执行。

    
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        //返回目标线程是否中断的布尔值:中断返回true,不中断返回false,且返回后会重置中断状态为未中断
        return Thread.interrupted();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    因为线程2未中断,所以返回false。继续执行acquireQueued()中的死循环

    for (;;) {
        // 获取当前节点的上一个节点
        final Node p = node.predecessor();
        //p为头节点,尝试获取锁操作
        if (p == head && tryAcquire(arg)) {
            setHead(node);
            p.next = null;
            // 将获取锁失败标识置为false
            failed = false;
            // 获取到锁资源,不会被中断
            return interrupted;
        }
        // p 不是 head 或者 没拿到锁资源,
        if (shouldParkAfterFailedAcquire(p, node) &&
            // 基于 Unsafe 的 park方法,挂起线程
            parkAndCheckInterrupt())
            interrupted = true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    此时p是头节点,且能获取锁成功,将exclusiveOwnerThread设置为线程2,即线程2 获取锁资源。

    将node节点设置为head节点,并将node节点的pre和thread设置为null。因为拿到锁资源了,node节点就不需要排队了。

    将头节点p的next置为null,此时p节点就不在队列中存在了,可以帮助GC回收(可达性分析)。failed设置为false,表明获取锁成功;interrupted为false,则线程不会中断。

    在这里插入图片描述

    3.2 问题二

    QA2: 为什么被唤醒的线程要调用Thread.interrupted()清除中断标记

    从上边的方法可以看出,当parkAndCheckInterrupt()方法返回true时,即Thread.interrupted()方法返回了true,也就是该线程被中断了。为了让被唤醒的线程继续执行后续获取锁的操作,就需要让中断的线程像没有被中断过一样继续往下执行,所以在返回中断标记的同时要清除中断标记,将其设置为false。

    清除中断标记之后不代表该线程不需要中断了,所以在parkAndCheckInterrupt()方法返回true时,要自己设置一个中断标志interrupted = true,为的就是当获取到锁资源执行完相关的操作之后进行中断补偿,故而需要执行selfInterrupt()方法中断线程。

    3.3 问题三

    QA: 公平锁和非公平锁的区别

    hasQueuedPredecessors()返回false时才会尝试获取锁资源。

    public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    h==t时,队列为空,表示没人排队,可以获取锁资源;
    队列不为空,头结点有后继节点不为空且s节点获取锁的线程是当前线程也可以获取锁资源,代表锁重入操作;

  • 相关阅读:
    2022年软件测试人员必读的经典书籍推荐(附电子版)
    将Word中的表格以图片形式复制到微信发送
    20220726汇承科技的蓝牙模块HC-05的AT命令测试
    Flink watermark与乱序消息处理机制
    不使用脚手架构建vue项目
    八、【React-Router5】路由组件传参
    Net 编译器平台--- Roslyn Scripting APIs
    程序员面试金典 - 面试题 17.17. 多次搜索
    第四章:数据操作Ⅰ 第二节:读写CSV文件
    [附源码]java毕业设计高校社团信息管理系统
  • 原文地址:https://blog.csdn.net/qq_43141726/article/details/127731397