• ReentrantReadWriteLock(可重入读写锁)


    基于AQS的互斥锁(写)和共享锁(读)实现的可重入读写锁

    属性

    public interface ReadWriteLock {
        Lock readLock();
        Lock writeLock();
    }
    
    public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
        private static final long serialVersionUID = -6992448646407690164L;
        //读锁
        private final ReentrantReadWriteLock.ReadLock readerLock;
    	//写锁
        private final ReentrantReadWriteLock.WriteLock writerLock;
        //同步器
        final Sync sync;
    	//默认创建非公平锁同步器
        public ReentrantReadWriteLock() {
            this(false);
        }
    	
        public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
    
        public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
        public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    Sync同步器

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;
    
        /*
         * Read vs write count extraction constants and functions.
         * Lock state is logically divided into two unsigned shorts:
         * The lower one representing the exclusive (writer) lock hold count,
         * and the upper the shared (reader) hold count.
         */
    	//将32位int拆分成2部分,高16位表示读锁线程数,低16位表示写锁线程数
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 用于获取低16位值的掩码值
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        
        //获取当前持有读锁的线程/资源数
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    
        //获取当前持有写锁的线程/资源数
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
        //单个线程读锁重入的次数,通过ThreadLocal来维护这个计数器
        static final class HoldCounter {
            int count = 0;
          
            final long tid = getThreadId(Thread.currentThread());
        }
    
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
    	//创建ThreadLocal
        private transient ThreadLocalHoldCounter readHolds;
    
        // 缓存最后一个线程获取的读锁数量
        private transient HoldCounter cachedHoldCounter;
    
        /**
         * firstReader is the first thread to have acquired the read lock.
         * firstReaderHoldCount is firstReader's hold count.
         *
         * 

    More precisely, firstReader is the unique thread that last * changed the shared count from 0 to 1, and has not released the * read lock since then; null if there is no such thread. * *

    Cannot cause garbage retention unless the thread terminated * without relinquishing its read locks, since tryReleaseShared * sets it to null. * *

    Accessed via a benign data race; relies on the memory * model's out-of-thin-air guarantees for references. * *

    This allows tracking of read holds for uncontended read * locks to be very cheap. */ private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { //初始化ThreadLocal对象 readHolds = new ThreadLocalHoldCounter(); //使用state变量的volatile,用于保证可见性 setState(getState()); } abstract boolean readerShouldBlock(); abstract boolean writerShouldBlock(); private IllegalMonitorStateException unmatchedUnlockException() { return new IllegalMonitorStateException( "attempt to unlock read lock, not locked by current thread"); } /** * Performs tryLock for write, enabling barging in both modes. * This is identical in effect to tryAcquire except for lack * of calls to writerShouldBlock. */ final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; } /** * Performs tryLock for read, enabling barging in both modes. * This is identical in effect to tryAcquireShared except for * lack of calls to readerShouldBlock. */ final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } // Methods relayed to outer class final ConditionObject newCondition() { return new ConditionObject(); } final Thread getOwner() { // Must read state before owner to ensure memory consistency return ((exclusiveCount(getState()) == 0) ? null : getExclusiveOwnerThread()); } final int getReadLockCount() { return sharedCount(getState()); } final boolean isWriteLocked() { return exclusiveCount(getState()) != 0; } final int getWriteHoldCount() { return isHeldExclusively() ? exclusiveCount(getState()) : 0; } final int getReadHoldCount() { if (getReadLockCount() == 0) return 0; Thread current = Thread.currentThread(); if (firstReader == current) return firstReaderHoldCount; HoldCounter rh = cachedHoldCounter; if (rh != null && rh.tid == getThreadId(current)) return rh.count; int count = readHolds.get().count; if (count == 0) readHolds.remove(); return count; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); readHolds = new ThreadLocalHoldCounter(); setState(0); // reset to unlocked state } final int getCount() { return getState(); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    tryAcquire(写锁获取子类实现)
    protected final boolean tryAcquire(int acquires) {
        //获取当前线程
        Thread current = Thread.currentThread();
        int c = getState();
        //获取当前持有写锁的线程数量
        int w = exclusiveCount(c);
        if (c != 0) {  //已经有线程获取锁,但此时不区分读/写锁,在下一步w == 0时区分
            if (w == 0 //没有线程获取写锁,但是有线程获取到读锁了
                || current != getExclusiveOwnerThread())  //如果有,判断获取写锁的线程是不是当前线程
                //获取锁失败,执行AQS排队
                return false;
            //写锁重入,最大值不得大于MAX_COUNT
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // 可重入,更新state值
            setState(c + acquires);
            return true;
        }
        //既没有读锁,也没有写锁
        if (writerShouldBlock() ||  //判断是否当前线程获取写锁(由子类(公平/非公平方式)实现)
            !compareAndSetState(c, c + acquires))  //CAS抢写锁,失败后走AQS排队
            return false;
        setExclusiveOwnerThread(current);  //获取写锁成功,将当前线程标识为获取互斥锁的线程对象
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    writerShouldBlock
    //非公平锁实现
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // 非公平锁实现方式,写锁总是去抢,抢完再说
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }
    
    //公平锁实现
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();  //需要先判断队列中有没有人排队
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    tryRelease(释放写锁)
    //该方法也是线程安全(因为锁还没释放完成)
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively()) //依旧先判断线程释放是否合法
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;  //判断锁是否完全释放成功(锁重入)
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    tryAcquireShared(读锁获取)
    protected final int tryAcquireShared(int unused) {
        /*
         * Walkthrough:
         * 1. If write lock held by another thread, fail.
         * 2. Otherwise, this thread is eligible for
         *    lock wrt state, so ask if it should block
         *    because of queue policy. If not, try
         *    to grant by CASing state and updating count.
         *    Note that step does not check for reentrant
         *    acquires, which is postponed to full version
         *    to avoid having to check hold count in
         *    the more typical non-reentrant case.
         * 3. If step 2 fails either because thread
         *    apparently not eligible or CAS fails or count
         *    saturated, chain to version with full retry loop.
         */
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&  //有线程获取写锁
            getExclusiveOwnerThread() != current)  //获取写锁的线程不是当前线程,返回-1,告知AQS获取读锁失败
            return -1;
        // 获取读锁的持有数
        int r = sharedCount(c);
        if (!readerShouldBlock() &&  //子类实现,判定当前获取读锁的线程是否应该被阻塞
            r < MAX_COUNT &&  //线程持有数必须小于MAX_COUNT
            compareAndSetState(c, c + SHARED_UNIT)) { //CAS增加state的高16位的读锁持有数
            //计数为0,表示当前线程就是第一个获取读锁的线程
            if (r == 0) {
                //设置第1个获取读锁的线程和该线程持有state数(重入)
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                //当前获取读锁的线程就是第一个线程,也就是锁重入,直接加1计数位
                firstReaderHoldCount++;
            } else {
                //当前线程不是第一个读线程,此时将其获取读锁的次数保存在ThreadLocal中
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                     /*这里要说明一下,当rh == null时,其实整套代码看下来,没有任何地方有对rh的创建和初始化,实际上				 			  *readHolds.get()这个方法默认会创建一个空值的rh对象,详细实现可以看ThreadLocal.get()源码
                      */
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        //这个地方需要说明一下,看fullTryAcquireShared方法实际跟上面做的事情是一样的,那为什么还要做两次呢,实际上是对性能做的优化,就是把一些常见的场景前置
        return fullTryAcquireShared(current);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    fullTryAcquireShared
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
             //已经有线程获取到写锁且不是当前线程
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
            } else if (readerShouldBlock()) {
                //子类实现判断当前线程应该阻塞
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    tryReleaseShared(释放共享锁
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        //当前线程就是第1个获取读锁的线程
        if (firstReader == current) {
            //如果为1,证明没有锁重入,则直接释放firstReader
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;  //有锁重入,做--处理
        } else {
            //不是第1个获取读锁线程,从ThreadLocal中获取当前线程持有的数量
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            //释放高16位数
            if (compareAndSetState(c, nextc))
                //释放成功,返回true后,通知AQS唤醒阻塞队列节点
                return nextc == 0;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    readerShouldBlock和writerShouldBlock

    读/写锁是否应该阻塞分非公平和公平两种实现方式

    //非公平实现
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // 非公平锁实现方式,写锁总是去抢,抢完再说
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }
    //第1个排队的线程节点是互斥的
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
    
    //公平实现
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors(); // //需要先判断队列中有没有人排队
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }
    //这个方法是AQS中实现的,判断阻塞队列中是否还有前驱节点,只要有那么就排队去,不管读/写锁
    public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    WriteLock写锁

    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164L;
        private final Sync sync;
    	//sync同步器由ReentrantReadWriteLock决定并传入
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
       	//获取锁,调用ReentrantReadWriteLock中同步器的acquire实现方法
        public void lock() {
            sync.acquire(1);
        }
       //可响应中断的锁方法
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
        //尝试获取锁,如果获取不到,立马返回,不做排队
        public boolean tryLock( ) {
            return sync.tryWriteLock();
        }
        //带超时时间的尝试获取锁操作
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
        //释放锁操作
        public void unlock() {
            sync.release(1);
        }
        //获取条件对象
        public Condition newCondition() {
            return sync.newCondition();
        }
    
        public boolean isHeldByCurrentThread() {
            return sync.isHeldExclusively();
        }
    
        public int getHoldCount() {
            return sync.getWriteHoldCount();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

  • 相关阅读:
    cocos游戏引擎制作的滚动框地图防止误点操作的简单方法
    vue视图响应-watch
    如何使用 ABAP 代码解析 XML 文件
    Python-函数
    Kubernetes客户端认证(一)—— 基于CA证书的双向认证方式
    俄罗斯卢布对美元接近60 卢布今年一直是全球表现最好的货币?
    计算机毕业设计java+ssm的高校科研仪器共享平台-计算机毕业设计
    ctfshow-web入门-文件上传(web166、web167)&(web168-web170)免杀绕过
    unity 烘焙的时候出现模型没有光影的情况
    MybatisPlus高级特性
  • 原文地址:https://blog.csdn.net/zhang527294844/article/details/133925619