• JUC——ReentrantReadWriteLock


    ReentrantReadWriteLock是读写锁,写写互斥,读写互斥。它实现了ReadWriteLock接口,接口内定义了readLock和writeLock接口;
    ReentrantReadWriteLock是基于AQS实现的加锁功能,支持公平式抢占和非公平式抢占;

    1. 构造器

    // 默认是非公平锁
    public ReentrantReadWriteLock() {
            this(false);
    }
    public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
    }
    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    2. WriteLock实现

    在创建WriteLock的时候,会将sync传递到WriteLock内,WirteLock的操作是基于sync(AQS)实现的;

    写锁在加锁时,需要注意:

    1. 如果有读锁,则写锁需要等待;
    2. 如果有写锁,则写锁需要等待;

    2.1 writeLock.lock

    public void lock() {
        sync.acquire(1);
    }
    // 重写AQS的 tryAcquire
    protected final boolean tryAcquire(int acquires) {
        // 获取当前线程
        Thread current = Thread.currentThread();
        // 获取当前的state值
        int c = getState();
    
        // c与 00000000 00000000 11111111 11111111 进行&运算
        // 其目地就是判断c低16位是否有值;
        // 高16位表示读锁的获取次数,低16位表示写锁的次数
        int w = exclusiveCount(c);
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            // w=0只有两种情况:1.c=0,2.c的高16位有1,低16位全0
            // w=0 说明c高16位有二进制1,因为前面判断了c!=0
            // 当前线程 != 持有锁线程,则返回false,则会进行AQS的入队操作;
            // w=0:有读锁,当前线程阻塞,返回false
            // w!=0 有写锁,判断持有写锁的线程是否是当前线程;,如果不是,返回false
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            // 走到这说明 w!=0且持有锁线程是当前线程,则锁重入
            // 判断是否超过加锁的最大限制
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            setState(c + acquires);
            return true;
        }
        // c=0时
        // 写锁抢占,默认非公平式,公平式会判断是否需要排队,如果需要排队writerShouldBlock返回true
        // 非公平式会直接返回false,表示进行CAS抢占
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        // 抢占成功,设置当前线程为持有锁线程
        setExclusiveOwnerThread(current);
        return true;
    }
    
    // 简单理解就是c & (1 << ) 
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
    // 它将AQS的state拆分成两段,高16位和低16位;高16位表示:读涣   低16位表示:写锁
    
    // 公平式抢占
    static final class FairSync extends Sync {
            private static final long serialVersionUID = -2274990926593161451L;
            final boolean writerShouldBlock() {
            // 返回true,表示要排队,否则不排队
                return hasQueuedPredecessors();
            }
    }
    
    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -8159625535654395037L;
            // 直接返回false
            final boolean writerShouldBlock() {
                return false; // writers can always barge
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    2.2 writeLock.unlock

    protected final boolean tryRelease(int releases) {
        // 判断当前线程是否为持有锁线程
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        // 如果free==0说明,低16位全为0,表示没有写锁
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3. ReadLock实现

    3.1 readLock.lock

    public void lock() {
        sync.acquireShared(1);
    }
    public final void acquireShared(int arg) {
        // 返回<0,则等待
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        // 如果exclusiveCount(c) != 0 表示 有写锁并且持有写锁的线程还不是当前线程,则进行排队
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
    
        // c无符号左移16位,获取读锁数量
        int r = sharedCount(c);
        // 当前读请求是否需要阻塞;又分为公平式和非公平式;
        // 公平式:判断是否需要排队,true需要排队,false不需要排队
        // 非公平式:
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&  // 并且读锁不超过最大限制
            // 读锁次数+1,前面提到过高16位表示读锁的次数
            compareAndSetState(c, c + SHARED_UNIT)) {
            // 表示第一次加读锁
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 第一次获取锁的线程重入次数+1
                firstReaderHoldCount++;
            } else { 
                // 线程第一次进到这rh=null
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    // 获取当前线程的重入次数,默认0
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
               // 线程每次重入将threadlocal内的count+1
                rh.count++;
            }
            return 1;
        }
        // 表示没有获取锁成功,也就是readerShouldBlock返回true  1.需要排除或者是头节点的next是独占节点
        return fullTryAcquireShared(current);
    }
    
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    
    FairSync:
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
    
    NonfairSync:
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
    // 判断第一个节点是否是独占锁
    // true:head不为空,head.next不为空,head.next是独占模式并且独占锁是有线程的
    // 也说明在排除的节点是一个写锁的node
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null && // 头节点不为空
            (s = h.next)  != null && // 头节点的下一个节点不为空
            !s.isShared()         && // 下个节点是独占模式
            s.thread != null;        // 下个节点的thread为null
    }
    Sync:
    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }
    static final class HoldCounter {
        int count = 0;  // 记录读锁的重入次数
        final long tid = getThreadId(Thread.currentThread());
    }
    
    // 每个读锁(除第一次来的)都拥有一个HoldCounter来记录读锁和重入次数
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    
    private transient ThreadLocalHoldCounter readHolds;
    // HoldCounter记录上一次获取读锁的线程
    private transient HoldCounter cachedHoldCounter;
    
    
    
    final int fullTryAcquireShared(Thread current) {
        
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            // 判断有没有线程持有写锁,如果有,再判断是否为当前线程,否则,返回-1进入阻塞
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
            } else if (readerShouldBlock()) {  // 再次判断是否需要读阻塞,如果需要,进入else if
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) {
                    // 判断当前线程是否是firstReader,如果是,则直接获取锁;
                    // assert firstReaderHoldCount > 0;
                } else {
    
                    if (rh == null) {
                        // 获取上一次获取到读锁的线程
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            // 将当前获取读锁的线程的HoldCounter赋值给rh
                            rh = readHolds.get();
                            // rh.count 默认就是为0,因为需要去排除,所以将它的HoldCounter删除掉;
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    // rh.count=0去排队
                    if (rh.count == 0)
                        return -1;
                }
            }
            // 判断是否超过最大读锁限制
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // 获取读锁
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                // 获取到读锁之后,跟前面的流程一样
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154

    3.2 readLock.unlock

    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        // 判断线程是否为firstReader,如果是,则进行count--
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            // 获取上一个读锁的线程的HoldCounter
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            // 获取当前线程的重入次数进行--,如果减0时,则删除threadLocal值;
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
    
        for (;;) {
            int c = getState();
            // 获取读锁的个数
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                // 如果读锁为0了,则进入释放资源,进行通知队列中阻塞的线程
                return nextc == 0;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
  • 相关阅读:
    for-in与不可枚举
    go gin ShouldBind 绑定参数到结构体struct 数据校验
    039Node.js后端路由封装与独立拆分解决方案
    小程序A跳转小程序B并且自动登录
    Rancher 系列文章-在腾讯云的 K3S 上安装高可用 Rancher 集群
    学习路之PHP--laravel postman 提交表单出现419错误
    解决Spring Boot 2.7.16 在服务器显示启动成功无法访问问题:从本地到服务器的部署坑
    「优选算法刷题」:删除字符串中的所有相邻重复项
    图像识别与处理学习笔记(五)人工神经网络和深度学习
    Excel导入导出,增删改查的实现
  • 原文地址:https://blog.csdn.net/shixiaoling123/article/details/125462585