• 22.读写锁ReetrantReadWriteLock


    前面我们介绍过读写锁的用法,本节我们详细分析读写锁的实现过程。ReetrantReadWriteLock由于要实现读读不互斥、读写互斥、写写互斥,根据前面对AQS的分析,貌似读读场景使用共享锁能搞定,而后两者基于独占锁能搞定。事实也确实如此,两个就是基于AQS实现的。具体来说为了实现读写锁,ReetrantReadWriteLock类的内部分别提供了两个实现类:

    ReadLock:读锁

    WriteLock:写锁。

    分析ReetrantReadWriteLock类的关系图,会发现其的结构比较复杂:

    图中除了AQS和两个接口Lock和ReadWriteLock,其他的类,例如Sync、ReadLock和Write类以及公平和非公平锁都是在ReetrantReadWriteLock类里实现的。

    而ReadLock和WriteLock依赖于Sych同步类,从类的关系看,Sync中重写了AQS的独占和共享四个方法。

    如果当前线程调用ReadLock.lock()方法,则实际会调用Sync中的tryAcquireShared()方法来实现共享锁竞争。如果当前线程调用WriteLock.lock(),则实际会调用到Sync中的tryAcquire()方法竞争独占锁。

    1 WriteLock锁

    接下来,我们就看一下写锁和读锁的实现原理,首先看写锁。参考ReadWriteLockExample例子,当我们执行writeLock.lock();方法时,最终也是会执行到AQS的如下位置:

    1. public final void acquire(int arg) {
    2. if (!tryAcquire(arg) &&
    3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4. selfInterrupt();
    5. }

    这个代码与我们前面介绍的重入锁是一致的,但是里面的几个方法的实现有不少差异,因此还是需要再看一次。

    1.1 锁竞争过程

    tryAcquire()方法的实现是在ReadWriteLock类里:

    1. protected final boolean tryAcquire(int acquires) {
    2. Thread current = Thread.currentThread();
    3. int c = getState();//获取当前的互斥变量的值
    4. //获得写线程的数量
    5. int w = exclusiveCount(c);
    6. if (c != 0) {
    7. // 如果 if c != 0 and w == 0 说明有其他线程获得了共享锁,此时写操作不能执行
    8. if (w == 0 || current != getExclusiveOwnerThread())
    9. return false;
    10. if (w + exclusiveCount(acquires) > MAX_COUNT)
    11. throw new Error("Maximum lock count exceeded");
    12. // 线程重入
    13. setState(c + acquires);
    14. return true;
    15. }
    16. //如果c==0,说明读锁和写锁都没有被线程获取
    17. if (writerShouldBlock() ||
    18. !compareAndSetState(c, c + acquires))
    19. return false;
    20. setExclusiveOwnerThread(current);
    21. return true;
    22. }

    这里的核心思想与重入锁一致,都是通过state互斥量来竞争锁资源,但是具体实现过程有很大区别,具体过程是:

    • 1.通过getState()方法获取当前的互斥变量的值。 如果 c != 0 and w == 0 说明有其他线程获得了共享锁,此时写操作不能执行 。

    • 2.通过exclusiveCount()方法查找当前获得写锁的线程数量,由于写锁是互斥的,所以如果能够获得多个,就说明只能是重入的情况。则通过w+exclusiveCount(acquires)进行次数的累加,这里有一个最大重入允许次数65535。

    • 3.如果c==0,说明读锁和写锁都没有被线程获取,通过writeShouldBlock()方法判断写锁是否应该阻塞,在非公平模式下,写锁不需要先阻塞,而是通过compareAndSetState()方法竞争锁。

     1.2 写锁数量如何获得的

    在上面的代码中,判断写锁的数量是通过这一行进行的:

    if (w + exclusiveCount(acquires) > MAX_COUNT)

    他的实现与其他的貌似不太一样:

    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    EXCLUSIVE_MASK又是什么,其定义是这样的:

    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    这里其实就是定义EXCLUSIVE_MASK=2^16-1=01111111111111(二进制)= 65535。之所以使用移位,主要还是为了计算效率。

    这里是如何获得的呢?这是因为state采用高低位分开存储读锁和写锁,如下图所示。

     在上面的图中,高16位存储读锁,当前读锁状态为10,表示有两个线程获得了读锁。低16位存储写锁状态,当前锁状态为100,表示有一个线程重入了4次。

    exclusiveCount()方法采用位运算得到state低16位的值,并以该值来判断当前state的重入次数,代码如下:

    static int exclusiveCount(int c){return c&EXCLUSIVE_MASK;}

    这种计数方式在很多代码中都能看到,是个很常用的技巧。

    1.3释放锁的过程

    我们继续看锁释放的过程:

    1. protected final boolean tryRelease(int releases) {
    2. if (!isHeldExclusively())
    3. throw new IllegalMonitorStateException();
    4. int nextc = getState() - releases;
    5. boolean free = exclusiveCount(nextc) == 0;
    6. if (free)
    7. setExclusiveOwnerThread(null);
    8. setState(nextc);
    9. return free;
    10. }

    释放过程与重入锁的基本一致,通过getState()-release来递减锁的次数,由于写锁的重入次数保存在低位,所以直接按十进制计算即可。通过exclusiveCount()方法计算写锁的重入次数,如果为0,则说明锁释放成功

    WriteLock锁竞争失败的逻辑,和前文分析的ReetrantLock锁竞争逻辑也是一致,这里不再赘述。

    2 ReadLock锁

    分析完了写锁,我们再来看一下读锁是如何工作的。允许多个线程同时读,通过AQS的共享锁来实现。

    2.1 锁竞争过程

    共享锁的竞争会调用tryAcquireShared()方法,tryAcquireShared()方法如果返回-1,则表示需要等待其他写锁释放,否则表示当前没有线程持有锁,可以直接获得读锁。

    1. protected final int tryAcquireShared(int unused) {
    2. Thread current = Thread.currentThread();//获得当前线程
    3. int c = getState();//获取当前线程
    4. //如果写锁或独占锁的持有者不是当前线程,则直接阻塞
    5. if (exclusiveCount(c) != 0 &&
    6. getExclusiveOwnerThread() != current)
    7. return -1;
    8. int r = sharedCount(c);//读锁数量
    9. if (!readerShouldBlock() &&
    10. r < MAX_COUNT &&
    11. compareAndSetState(c, c + SHARED_UNIT)) {
    12. if (r == 0) {//表示第一次获取读锁
    13. //保存第一个获取读锁的线程
    14. firstReader = current;
    15. firstReaderHoldCount = 1;
    16. //表示读锁的重入
    17. } else if (firstReader == current) {
    18. firstReaderHoldCount++;
    19. } else {//保存每个线程读锁的重入次数
    20. HoldCounter rh = cachedHoldCounter;
    21. if (rh == null || rh.tid != getThreadId(current))
    22. cachedHoldCounter = rh = readHolds.get();
    23. else if (rh.count == 0)
    24. readHolds.set(rh);
    25. rh.count++;
    26. }
    27. return 1;
    28. }
    29. return fullTryAcquireShared(current);
    30. }

    我们来分析一下上面的逻辑过程:

    1.首先,(exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)的作用是判断是否有其他线程获得写锁。如果有写操作则读线程直接进入等待状态。

    2.通过sharedCount()方法获得读锁的数量。其实现也是通过位移,详见前面小节“写锁是如何获得的”。

    3.接下来,如果当前读锁(通过readerShouldBlock()实现)不需要等待,并且读线程数量没超过65535,就直接通过compareAndSetState()方法如果readerShouldBlock返回false,表示当前锁不需要等待。

    4.读锁获取成功后,处理还有些麻烦,这里会根据不同的条件进行处理:

    • r==0,表示第一次获得锁。此时保存第一次获得读锁的线程。

    • firstReader==current,表示当前读线程是第一个获得读锁的,此时需要增加重入次数。

    • 如果是其他情况, 则用readHolds来保存每个线程获得锁的次数。如果查看readHolds的定义,这里使用的是ThreadLocal 来保存的,其功能是每个读线程可以独立管理自己的重入次数。该问题我们在ThreadLocal部分介绍。

    5.如果CAS失败,则调用fullTryAcquireShared()方法尝试获取共享锁。

    在ReetrantReadWriteLock读锁和写锁的获取过程中,在通过CAS修改互斥变量的状态之前,会分别调用readerShouldBlock()方法和writedShouldBlock()方法来判断是否可以通过CAS尝试获得锁,这两个方法的实现再公平和非公平模式中的实现不同。

    对公平锁来说,readShouldBlock和writeShouldBlock都会通过hasQueuedPredecessor判断当前同步队列中年是否还有排队的线程,也就是对公平锁而言,只有同步队列中当前结点之前没有等待的线程 ,才会先尝试抢占锁,代码如下:

    1. static final class FairSync extends Sync {
    2. private static final long serialVersionUID = -2274990926593161451L;
    3. final boolean writerShouldBlock() {
    4. return hasQueuedPredecessors();
    5. }
    6. final boolean readerShouldBlock() {
    7. return hasQueuedPredecessors();
    8. }
    9. }

    hasQueuedPredecessors()方法就是一个简单的链表判断,代码如下:

    1. public final boolean hasQueuedPredecessors() {
    2. Node t = tail; // Read fields in reverse initialization order
    3. Node h = head;
    4. Node s;
    5. return h != t &&
    6. ((s = h.next) == null || s.thread != Thread.currentThread());
    7. }

    对非公平锁来说,writerShouldBlock()方法直接返回false,也就是说在默认情况下都会先去抢占写锁,代码如下:

    1. static final class NonfairSync extends Sync {
    2. private static final long serialVersionUID = -8159625535654395037L;
    3. final boolean writerShouldBlock() {
    4. return false; // writers can always barge
    5. }
    6. final boolean readerShouldBlock() {
    7. return apparentlyFirstQueuedIsExclusive();
    8. }
    9. }

    而readerShouldBlock()方法则使用apparentlyFirstQueuedIsExclusive()来判断:

    1. final boolean apparentlyFirstQueuedIsExclusive() {
    2. Node h, s;
    3. return (h = head) != null &&
    4. (s = h.next) != null &&
    5. !s.isShared() &&
    6. s.thread != null;
    7. }

    这个方法的目的是避免写锁无限等待的问题,试想一下,如果一直有线程在获得读锁,那意味着写锁将一直无法获得,极端情况下将会一直导致写锁一直等待下去。为了避免这个问题,apparentlyFirstQueuedIsExclusive是这么做的:

    • 如果当前同步队列head节点的下一个结点是独占锁结点,那么该方法会返回true,表示当前来获取锁的线程需要排队。

    • 如果当前同步队列head结点的下一个结点是共享锁结点,那么该方法会返回false,表示当前来获得读锁的线程允许通过CAS修改互斥锁的状态。

    这种设计在一定程度上避免了写锁无限等待的情况。

    另外,在tryAcquireShard()方法中,当通过CAS抢占到读锁时,出了通过state变量记录总的读锁次数,还使用HoldCounter以线程为单位记录每个线程获得读锁的次数。之所以要这样设计,是因为state只能记录读线程和写线程的总数,但是无法记录每个线程获得读锁的重入次数,代码如下:

    1. protected final int tryAcquireShared(int unused) {
    2. ....
    3. HoldCounter rh = cachedHoldCounter;
    4. if (rh == null || rh.tid != getThreadId(current))
    5. cachedHoldCounter = rh = readHolds.get();
    6. else if (rh.count == 0)
    7. readHolds.set(rh);
    8. rh.count++;
    9. }
    10. ...
    11. }

    HoldCounter保存了count和tid,其中count用来记录数量,而tid用来记录当前线程的id,所以一个HoldCounter可以表示某个线程对应的重入次数,代码如下:

    1. static final class HoldCounter {
    2. int count = 0;
    3. final long tid = getThreadId(Thread.currentThread());
    4. }

    但是如果要实现线程的隔离,也就是说每个线程都有一个独立的HoldCounter实例,那么要怎么样实现呢?我们看到再记录重入数量的代码中有这样一行代码:

     cachedHoldCounter = rh = readHolds.get();

    每个线程进行数量统计时,都是从readHolds.get()一个HoldCounter实例,代码如下:

    1. static final class ThreadLocalHoldCounter
    2. extends ThreadLocal<HoldCounter> {
    3. public HoldCounter initialValue() {
    4. return new HoldCounter();
    5. }
    6. }

    这里采用ThreadLocal来进行线程隔离,也就是每个线程调用readHolds.get()方法,都会得到一个和当前绑定的HoldCounter对象实例,也就能实现针对每个线程记录读锁的重入次数的功能。

    如果通过tryAcquireShared尝试抢占读锁失败,则还需要调用fullTryAcquireShared()方法,该方法的整体逻辑和tryAcquireShared()类似,只不过增加了自旋等待来保证读锁成功,代码如下:

    1. final int fullTryAcquireShared(Thread current) {
    2. HoldCounter rh = null;
    3. for (;;) {//自旋
    4. int c = getState();
    5. //情况1:如果当前有其他线程获得写锁,并且获得写锁的咸亨不是当前线程,则返回-1
    6. if (exclusiveCount(c) != 0) {
    7. if (getExclusiveOwnerThread() != current)
    8. return -1;
    9. //情况2:如果返回true,则表示当前抢占读锁的线程要等待
    10. } else if (readerShouldBlock()) {
    11. // Make sure we're not acquiring read lock reentrantly
    12. if (firstReader == current) {
    13. // assert firstReaderHoldCount > 0;
    14. } else {
    15. if (rh == null) {
    16. rh = cachedHoldCounter;
    17. if (rh == null || rh.tid != getThreadId(current)) {
    18. rh = readHolds.get();
    19. if (rh.count == 0)
    20. readHolds.remove();
    21. }
    22. }
    23. if (rh.count == 0)
    24. return -1;
    25. }
    26. }
    27. //情况3:判断读锁的总数是否大于最大值
    28. if (sharedCount(c) == MAX_COUNT)
    29. throw new Error("Maximum lock count exceeded");
    30. //情况4
    31. if (compareAndSetState(c, c + SHARED_UNIT)) {
    32. if (sharedCount(c) == 0) {
    33. firstReader = current;
    34. firstReaderHoldCount = 1;
    35. } else if (firstReader == current) {
    36. firstReaderHoldCount++;
    37. } else {
    38. if (rh == null)
    39. rh = cachedHoldCounter;
    40. if (rh == null || rh.tid != getThreadId(current))
    41. rh = readHolds.get();
    42. else if (rh.count == 0)
    43. readHolds.set(rh);
    44. rh.count++;
    45. cachedHoldCounter = rh; // cache for release
    46. }
    47. return 1;
    48. }
    49. }
    50. }

    上述代码中,

    • 情况1里的getExclusiveOwnerThread() != current是为了避免死锁的,如果一个线程先获得锁,在没释放写锁之前再舱室获得锁,并且直接返回-1,那么获得写锁的线程会一直无法唤醒,从而进入死锁状态。

    • 情况2位置表示当前读锁应该先阻塞,再判断当前读锁是否是重入,如果是则直接抢占锁,否则阻塞。

    • 情况4位置,通过cas抢占读读锁资源,成功后使用HoldCounter记录当前线程的重入次数,其代码实现与tryAcquireShared()方法相同。

    上述代码中,我们要注意firstReader和firstReaderHoldCount两个字段,这两个字段会记录第一个获得读锁的先生以及该线程的重入次数,如果是第一个先生就没有必要添加到HoldCounter了,为什么要这样呢?这是一种优化,如果获得读锁的线程只有一个,就没必要从HoldCounter中查找了,这可以提供性能。

    但是这里的firstReader并不是全局的第一个线程,当原本的第一个线程释放了锁之后,后序来获得读锁的线程会占用这个firstReader属性,firstReader和firstReaderHoldCount可以在读锁不产生竞争的情况下快速记录读锁的重入次数。

    最后,在fullTryAcquireShared()方法中,在如下两个情况下需要加入到同步队列进行等待:当前有其他线程获得了写锁并且当前线程不是重入。readerShouldBlock()方法返回true并不是重入。

    3 读写锁过程总结

    我们接下来从整体上看一下读写锁的基本过程。假设两个线程ThreadA和ThreadB先去获得读锁。此时使用firstReader和firstReaderHoldCount分别记录第一个获得读锁的线程以及线程重入的次数。ThreadB获得读锁,用HoldCounter记录当前线程的重入次数。

     接着ThreadC来抢占写锁,由于此时有ThreadA和ThreadB持有读锁,因此ThreadC抢占写锁失败,直接加到同步阻塞队列中。此时假如又来了D和E来抢占写线程。

    假如此时再有两个线程ThreadD和ThreadE来抢占读锁,由于不满足直接读锁的条件,所以需要加入到同步队列。注意读锁加入队列中的节点类型是SHARED,表示共享节点。SHARED节点有个特点:如果其中一个被唤醒,那么其他类型的SHARED结点也都会被唤醒,也就是允许多个线程同时抢读锁。这也是读写锁的特性,当写锁释放之后,要唤醒所有等待的读锁来读取最新的数据。

    如果ThreadA和ThreadB这两个获得读锁的线程释放了锁,就会从AQS队列中唤醒头部结点的下一个结点,也就是ThreadC线程,该线程是来抢占锁的,当该线程获得锁之后的结构如下。ThreadC这个Node结点变成Head结点,原来的head结点从链表中移除,然后ThreadC竞争到互斥锁,所以state的低位为1,exclusiveOwnerThread=ThreadC。

    最后,如果ThreadC的读锁也释放了,那么需要从AQS的同步队列中继续唤醒head结点的next结点,也就是ThreadD线程所在的结点,在唤醒的过程中发现该结点类型是SHARED,由于共享锁的结点允许多个线程竞争到锁,所以继续往后查找类型为SHARED的结点进行唤醒,如果下一个结点的类型是EXCLUSIVE(独占锁),则中断共享锁的传递,不再往后继续唤醒SHARED结点。

  • 相关阅读:
    Dirac delta function (狄拉克 delta 函数)
    django的csrf跨站请求伪造
    Flask数据库_query函数的使用
    java-php-python-ssm-预约健身私教网站-计算机毕业设计
    postgresql pgsql 连接池 pgBouncer(详细)
    MyBatis的配置文件
    一文汇总VSCode多光标用法
    【python】屈小原求三峡大学面积(CTGU百年校庆)
    测试过程中印象最深刻的bug?| 万能回答必杀技
    ComfyUI进阶:Comfyroll插件 (一)
  • 原文地址:https://blog.csdn.net/xueyushenzhou/article/details/126924862