• ReentrantLock 实现原理


    多线程情况下,sum++,因线程切换导致并发问题,因此需给导致并发问题的代码加锁

    使用 synchronized 方式

    public class MultiThreadAddDemo {
      static int sum;
      public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[2];
        for (int i = 0; i < 2; i++) {
          threads[i] = new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
              synchronized (MultiThreadAddDemo.class) {
                sum++;
              }
            }
          });
          threads[i].start();
        }
        threads[0].join();
        threads[1].join();
        System.out.println(sum);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    synchronized 为悲观锁,它是一种思想,即认为如果不给资源加锁,则一定会发生错误,所以其加锁方式是重量级的

    乐观锁与之相反,它是一种无锁的加锁方式,ReentrantLock 就是其中一种

    使用 ReentrantLock 方式

    public class MultiThreadAddDemo {
      static int sum = 0;
      public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[2];
        ReentrantLock lock = new ReentrantLock();
        for (int i = 0; i < 2; i++) {
          threads[i] = new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
              lock.lock();
              try {
                sum++;
              } finally {
                lock.unlock();
              }
            }
          });
          threads[i].start();
        }
        threads[0].join();
        threads[1].join();
        System.out.println(MultiThreadAddDemo.sum);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    上述代码中使用到了 new ReentrantLock() 、lock.lock()
    在这里插入图片描述
    ReentrantLock 继承关系大致如下

    class ReentrantLock implements Lock {
      class NonfairSync extends Sync {  }
      class Sync extends AbstractQueuedSynchronizer {  }
    }
    class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {  }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    AbstractQueuedSynchronizer,简称 AQS,其继承 AbstractOwnableSynchronizer

    AbstractOwnableSynchronizer中有个 exclusiveOwnerThread 字段,记录那个线程持有当前独占锁,尝试获取资源(tryAcquire)时会用到

    public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
      private static final long serialVersionUID = 3737899427754241961L;
      /**
       * The current owner of exclusive mode synchronization. 记录那个线程持有当前独占锁
       */
      private transient Thread exclusiveOwnerThread;
      // exclusiveOwnerThread 的 setter、getter 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    AQS 内部有个 state 字段,尝试获取资源(tryAcquire)时会用到;以及一个双向链表,用于存储修改 state 失败的线程

    先来看看 AQS 中的两个模板方法,继承 AQS 需实现该方法,否则使用该方法会抛出异常

    protected boolean tryAcquire(int arg) {    // <----- 独占方式:尝试获取资源
      throw new UnsupportedOperationException();
    }
    
    protected boolean tryRelease(int arg) {    // <----- 独占方式:尝试释放资源
      throw new UnsupportedOperationException();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    NonfairSync - lock 逻辑

    public class ReentrantLock implements Lock, java.io.Serializable {
    
      public ReentrantLock() {
        sync = new NonfairSync();
      }
    
      public void lock() {
        sync.lock();
      }
    
      static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        final void lock() {
          if (compareAndSetState(0, 1))    // <----- 通过 CAS 检查锁资源(不会检测 AQS 队列)
            setExclusiveOwnerThread(Thread.currentThread());  // <----- 设置 exclusiveOwnerThread(AbstractOwnableSynchronizer 类中除序列化字段外唯一字段)
          else  // <----- 获取锁失败
            acquire(1);    // <----- 调用 AQS 中的 acquire、表示尝试获取锁(独占锁)
        }
        /** AQS 中的 acquire 方法
        public final void acquire(int arg) {
          if (!tryAcquire(arg) &&    // <----- tryAcquire 为模板方法,调用子类实现的 tryAcquire,及下面的代码
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
        }
        */
        protected final boolean tryAcquire(int acquires) {  // <----- NonfairSync 中实现了 tryAcquire 方法,尝试获取资源(独占方式)
          return nonfairTryAcquire(acquires);    // <----- Sync 中的 nonfairTryAcquire,大致逻辑,根据 AQS 中的 state,设置 exclusiveOwnerThread
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    tryAcquire 获取独占锁

    tryAcquire(1)一下,名字上就知道,这个只是试一试

    通过 AQS 中的 state,可以知道当前的锁是否存在线程

    • 为 0,表示无锁,设置 exclusiveOwnerThread

    • 不为0:判断当前锁线程是否与当前线程相等

      • 相等:增加重入次数 setState(getState() + acquires) ( 根据acquire(1)可以知道,acquires 参数为 1)
      • 不相等:返回 false,表示尝试获取锁失败
    final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
        if (compareAndSetState(0, acquires)) {
          setExclusiveOwnerThread(current);
          return true;
        }
      } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
          throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
      }
      return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    addWaiter - 添加 Node 到 AQS 队列

    AQS 内维护了一个双向链表,用于存储修改 state 失败的线程

    尝试获取锁失败,则会执行 addWaiteracquireQueued(addWaiter(Node.EXCLUSIVE), arg))

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    
      public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
      }
    
      private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {    // <----- 队列不为空,插入并返回节点
          node.prev = pred;    // <----- 与队列中最后一个节点进行关联
          if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
          }
        }
        enq(node);    // <----- 队列为空
        return node;
      }
    
      private Node enq(final Node node) {
        for (; ; ) {
          Node t = tail;
          if (t == null) { // Must initialize
            // 创建 head(初始节点):初始节点为 Dummy(哑元或哨兵),用来占位,并不关联线程,因此 thread 为 null
            // compareAndSetHead -> unsafe.compareAndSwapObject(this, headOffset, null, update);
            if (compareAndSetHead(new Node()))    // <----- 设置传入对象的 headOffset 位置的值,即 head 的值
              tail = head;    // <----- 将 head 赋值给 tail 之后,进入第二次的 for 循环
          } else {
            node.prev = t;    // <----- 与 tail 进行关联
            if (compareAndSetTail(t, node)) {
              t.next = node;    // <----- tail 的 next 指向当前节点
              return t;
            }
          }
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    第一次会进入 enq,创建一个 Dummy,之后将当前节点加入到 Dummy 之后

    在这里插入图片描述

    acquireQueued

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    
      public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
      }
    
      private Node addWaiter(Node mode) {
        // ......
      }
    
      private Node enq(final Node node) {
        // ......
      }
    
      final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
          boolean interrupted = false;
          for (;;) {
            // 通过头节点去判断当前节点是否有资格获取锁
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {    // <----- 前置节点为 head,则再次尝试获取锁
              setHead(node);
              p.next = null; // help GC
              failed = false;
              return interrupted;
            }
            // 前置节点不为 head,或 tryAcquire(尝试获取锁)失败
            if (shouldParkAfterFailedAcquire(p, node) &&    // <----- 根据节点状态,决定当前线程是否应被挂起?
                parkAndCheckInterrupt())    // <----- 调用 park 方法挂起当前线程,等待以后被唤醒
              interrupted = true;    // <----- 重新设置中断标记:interrupted 会清除中断标记
          }
        } finally {
          // try 执行结果的 failed 永远是 false,只有抛出异常,才会进入
          if (failed)
            // 将 Node 的 waitStatus 置为 CANCEL
            cancelAcquire(node);
        }
      }
    
      private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        // 当前线程在这里阻塞
        // 如果其他线程调用了该线程的 interrupt 方法(使用 park ,调用 interrupt 时不会抛出异常,只会改变中断状态值),于是需通过一个变量记录该值
        // 返回 true,将中断标记传到外层,使得获取锁成功时,acquireQueued 返回值为 interrupted 的状态
        return Thread.interrupted();
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    shouldParkAfterFailedAcquire - 修改节点状态

    Node 有 5 中状态,默认为 0

    static final class Node {
      /** waitStatus value to indicate thread has cancelled */
      static final int CANCELLED =  1;
      /** waitStatus value to indicate successor's thread needs unparking */
      static final int SIGNAL    = -1;
      /** waitStatus value to indicate thread is waiting on condition */
      static final int CONDITION = -2;
      /**
       * waitStatus value to indicate the next acquireShared should
       * unconditionally propagate
       */
      static final int PROPAGATE = -3;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    shouldParkAfterFailedAcquire 函数会确保在同步队列中,每个等待的线程状态都是正常的

    每个 node 在入队的时候,都会将其前置节点的状态改为 SIGNAL,然后阻塞,等待被前置唤醒

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
          /*
           * This node has already set status asking a release
           * to signal it, so it can safely park.
           * 当前节点前置节点为 SIGNAL,说明前置节点也在等待拿锁,所以当前节点可以挂起休息
           * SIGNAL 表示:只要释放锁,就有责任通知标志位为 SIGNAL 的后继节点(进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前置节点完成的)
           */
          return true;
        if (ws > 0) {
          /*
           * Predecessor was cancelled. Skip over predecessors and
           * indicate retry.
           * 状态只可能是 CANCEL
           * 前置节点取消了排队,需移除当前前置节点,然后一直向前寻找,直到找到前置节点 pred.waitStatus > 0
           */
          do {
            node.prev = pred = pred.prev;
          } while (pred.waitStatus > 0);
          pred.next = node;
        } else {
          /*
           * waitStatus must be 0 or PROPAGATE.  Indicate that we
           * need a signal, but don't park yet.  Caller will need to
           * retry to make sure it cannot acquire before parking.
           * waitStatus 只可能是0,-2,-3
           * 用 CAS 将前置节点的 waitStatus 设置为 Node.SIGNAL(也就是-1)
           * !!!第一次进来的时候,waitStatus 为默认值 0,于是设置当前 pred 为 -1
           * 之后会通过 acquireQueued 内的 for 循环,第二次进入,waitStatus 为 -1,返回 true,调用 parkAndCheckInterrupt 进行阻塞
           */
          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    通过对 acquireQueued 分析,可以得出

    • 如果当前节点为head 后一个节点:尝试获取锁,失败挂起
    • 如果当前节点不为head 后一个节点:刚加入时的 waitStatus 为 0,之后在 shouldParkAfterFailedAcquire 方法内变为 -1,然后经过 parkAndCheckInterrupt 方法挂起

    也就是说,只有在 head 节点之后的节点可以通过 CAS 获取锁

    在这里插入图片描述

    其他线程何时被唤醒?
    猜想:当一个线程使用完共享资源,并且要释放锁的时候,去唤醒其他正在等待锁的线程

    释放锁

    public class ReentrantLock implements Lock, java.io.Serializable {
    
      public void unlock() {
        sync.release(1);    // <----- 调用 AQS 中的 release
      }
    
      abstract static class Sync extends AbstractQueuedSynchronizer {
        protected final boolean tryRelease(int releases) {    // <----- 根据 state,设置 exclusiveOwnerThread
          int c = getState() - releases;
          if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
          boolean free = false;
          if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
          }
          setState(c);
          return free;
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
      public final boolean release(int arg) {
        if (tryRelease(arg)) {
          Node h = head;
          if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
          return true;
        }
        return false;
      }
      
      private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        AbstractQueuedSynchronizer.Node s = node.next;
        // 如果 head 下一个节点为 null,或者 waitStatus 为 -1(表示 CANCELLED:线程取消了排队)
        // 从队列 tail 开始,从后往前遍历,找到离 head 最近的 waitStatus <= 0 节点(不是最近,会被覆盖)
        if (s == null || s.waitStatus > 0) {
          s = null;
          for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
              s = t;
        }
        if (s != null)
          LockSupport.unpark(s.thread);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    当释放锁时,将 head 的 waitStatus 设置为 0,避免影响其他函数的判断
    并且从队列 tail 开始,从后往前遍历,找到离 head 最近的 waitStatus <= 0 节点,然后调用 unpark
    被唤醒的节点获取锁成功,则将当前节点设为 head

    在这里插入图片描述

    唤醒时为何从后往前遍历

    当 enq 一个新的节点的时候, 会使新节点的 prev 等于 tail:node.prev = t;

    然后通过 CAS,使 tail.next = 新节点:if (compareAndSetTail(t, node)) { t.next = node; }

    如果在 CAS 操作之后,t.next = node 还未执行到时,其他线程调用 unlock(),而链表关系没有建立完整,如果从前往后遍历,导致遍历失败
    在这里插入图片描述

    参考资料及推荐阅读

  • 相关阅读:
    【PHP】PHP7中的引用计数
    关于产品MVP的定义与实践
    HTML+CSS大作业 环境网页设计与实现(垃圾分类) web前端开发技术 web课程设计 网页规划与设计
    数据治理:元数据管理篇
    事务、存储引擎
    嵌入式分享合集87
    如何让 Go 反射变快,你学会了吗?
    传输层 SACK与选择性重传算法
    浮点数2^7*29/32中/32是什么意思?
    ubuntu 18.04 sudo apt-get update 出现:ld.google.com(114.250.65.33) 连接超时
  • 原文地址:https://blog.csdn.net/qq_30763385/article/details/126805623