• 【JUC系列-09】深入理解ReentrantReadWriteLock的底层实现


    JUC系列整体栏目


    内容链接地址
    【一】深入理解JMM内存模型的底层实现原理https://zhenghuisheng.blog.csdn.net/article/details/132400429
    【二】深入理解CAS底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/132478786
    【三】熟练掌握Atomic原子系列基本使用https://blog.csdn.net/zhenghuishengq/article/details/132543379
    【四】精通Synchronized底层的实现原理https://blog.csdn.net/zhenghuishengq/article/details/132740980
    【五】通过源码分析AQS和ReentrantLock的底层原理https://blog.csdn.net/zhenghuishengq/article/details/132857564
    【六】深入理解Semaphore底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/132908068
    【七】深入理解CountDownLatch底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/133343440
    【八】深入理解CyclicBarrier底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/133378623
    【九】深入理解ReentrantReadWriteLock 读写锁的底层实现https://blog.csdn.net/zhenghuishengq/article/details/133629550

    一,读写锁

    前面几篇讲解了AQS的一些具体的实现,在对共享资源进行操作时,要么就是只能使用独占锁,对每一个线程都进行一个加锁的操作,要么就是单独使用共享锁,同时存在对多个资源的操作,但是在遇到类似缓存这种读多写少,读写同时存在的时候,那么就需要考虑独占锁和共享锁同时存在的需求了。

    在AQS的具体实现类中,就存在同时对共享资源读和写的操作的实现类,就是并发包中提供的ReentrantReadWriteLock 类,在该类内部,就实现了读锁和一个写锁。总而言之就是在多个线程只需要读取数据的时候,可以使用共享结点实现,如果存在一个线程需要去写数据的时候,那么就需要通过独占结点实现,用一句话概况就是:读读共享、写写互斥

    public class ReentrantReadWriteLock implements ReadWriteLock{
        private static final long serialVersionUID = -6992448646407690164L;
        /** Inner class providing readlock */
        private final ReentrantReadWriteLock.ReadLock readerLock;
        /** Inner class providing writelock */
        private final ReentrantReadWriteLock.WriteLock writerLock;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1,读写锁的基本使用

    在深入地了解这个类的底层原理之前,先了解一下这个类的基本使用,在这个 ReentrantReadWriteLock 类的注释中,已经有一个实例来方便我们了解这个类的基本使用,不得不说AQS里面所有的类的注释是非常的详细的

    在这里插入图片描述

    接下来参考他给的注释,仿写一个用例,用hashMap作为一级缓存,实现数据的缓存和读取。首先先定义一个线程池的工具类

    /**
     * 线程池工具
     * @author zhenghuisheng
     * @date : 2023/10/06
     */
    public class ThreadPoolUtil {
        /**
         * io密集型:最大核心线程数为2N,可以给cpu更好的轮换,
         *           核心线程数不超过2N即可,可以适当留点空间
         * cpu密集型:最大核心线程数为N或者N+1,N可以充分利用cpu资源,N加1是为了防止缺页造成cpu空闲,
         *           核心线程数不超过N+1即可
         * 使用线程池的时机:1,单个任务处理时间比较短 2,需要处理的任务数量很大
         * 参考:https://blog.csdn.net/yuyan_jia/article/details/120298564
         */
        public static synchronized ThreadPoolExecutor getThreadPool() {
            if (pool == null) {
                //获取当前机器的cpu
                int cpuNum = Runtime.getRuntime().availableProcessors();
                log.info("当前机器的cpu的个数为:" + cpuNum);
                int maximumPoolSize = cpuNum * 2 ;
                pool = new ThreadPoolExecutor(
                        maximumPoolSize - 2,
                        maximumPoolSize,
                        5L,   //5s
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue<>(),  //数组有界队列
                        Executors.defaultThreadFactory(), //默认的线程工厂
                        new ThreadPoolExecutor.AbortPolicy());  //直接抛异常,默认异常
            }
            return pool;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    接下来创建一个 PutHashMapTask 线程的任务类,用于添加数据,实现一个callAble接口,可以将添加后的hashMap给返回

    /**
     * @Author: zhenghuisheng
     * @Date: 2023/10/6 22:41
     */
    @Data
    public class PutHashMapTask implements Callable {
    
        Lock writeLock;
        Map<String,Object> hashMap;
    
        public PutHashMapTask(Lock writeLock,Map<String,Object> hashMap){
            this.writeLock = writeLock;
            this.hashMap = hashMap;
        }
    
        @Override
        public Object call() throws Exception {
            //上锁
            writeLock.lock();
            for (int i = 0; i < 1000; i++) {
                hashMap.put(i+"",i);
            }
            //解锁
            writeLock.unlock();
            return hashMap;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    随后创建一个 GetHashMapTask 的任务类,用于获取数据,不需要返回,直接打印出即可

    /**
     * @Author: zhenghuisheng
     * @Date: 2023/10/6 22:41
     */
    @Data
    public class GetHashMapTask implements Runnable {
        Lock readLock;
        Map<String,Object> hashMap;
    
        public GetHashMapTask(Lock readLock, Map<String,Object> hashMap){
            this.readLock = readLock;
            this.hashMap = hashMap;
        }
    
        @Override
        public void run() {
            //上锁
            readLock.lock();
            System.out.println(hashMap.get(new Random(1000)));
            //解锁
            readLock.unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    随后创建一个主线程类,内部定义一把 ReentrantReadWriteLock 读写锁,然后创建线程池一级创建一个hashMap作为一级缓存,随后读写都操作这个hash桶

    /**
     * @Author: zhenghuisheng
     * @Date: 2023/10/6 22:36
     */
    public class ReentrantReadWriteLockDemo {
        //定义一把读写锁
        private static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        private static Lock readLock = readWriteLock.readLock();
        private static Lock writeLock = readWriteLock.writeLock();
        //初始化一个线程池
        static ThreadPoolExecutor threadPool = ThreadPoolUtil.getThreadPool();
        //创建一个hashMap作为一级缓存
        static Map<String,Object> hashMap = new HashMap<>();
    
        public static void main(String[] args) throws Exception {
            //写入数据任务线程
            PutHashMapTask putHashMapTask = new PutHashMapTask(readLock,hashMap);
            //向线程池中添加任务
            Future<?> submit = threadPool.submit(putHashMapTask);
            hashMap =  (Map<String,Object>)submit.get();
            System.out.println(hashMap.size());
            //读取数据任务线程
            GetHashMapTask getHashMapTask = new GetHashMapTask(writeLock,hashMap);
            threadPool.execute(getHashMapTask);
            //主线程休眠
            Thread.sleep(100);
            System.exit(0);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    上面只是讲解了这个基本使用,还有一些读读共享,写写互斥以及写锁降级成读锁在这个类中都有详细的注释。

    在写锁降级成读锁时,会在写锁释放锁之前,给读锁进行一个加锁的操作,主要是为了解决在高并发的场景下,在修改状态时为了保证所有线程的可见性问题,因为在工作线程将变量刷新到主内存时,会有一定的延迟,这样做得好处就是提前将值刷回到缓存,从而解决可见性的延迟问题。总而言之,写锁到读锁降级是为了保证可见性的问题,通过提前刷盘,保证缓存一致性,防止来不及刷盘而读取到脏数据

    {	
    	rwl.readLock().lock();
    	if (!cacheValid) {
         	rwl.readLock().unlock();
         	rwl.writeLock().lock();
         	try {
                //读锁在写锁释放锁之前先加锁
           		rwl.readLock().lock();
            }
         } finally {
           rwl.writeLock().unlock();
         }
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在这个读写锁中,目前还是只支持锁的降级,并没有支持锁的升级的,主要还是因为这个线程可见性的问题,就是假设在同一时刻多个线程来读取数据,其中一个线程修改了数据,那么此时读取线程的数据就变成了脏数据,这就是为啥有延迟双删这个实现的原因,尤其是在缓存中,网络问题等等不可避免的因素,以及最主要的可见性的问题,因此在读写锁中,并没有这个读锁到写锁升级的过程

    2,读写锁的底层实现原理

    在看底层源码之前,先看这个ReentrantReadWriteLock的类图,如下图所示,其顶层接口是一个readWriteLock,内部定义读锁和写锁的的抽象方法,内部同时引入了Sync,该接口是一个AQS的子类,具有AQS的所有特性,一次内部具有公平锁和非公平锁,可重入等特性

    请添加图片描述

    而不管是读锁还是写锁,内部都实现了这个Lock的接口,Lock内部定义了所有的加解锁的规范,整个AQS中,Lock是作为一个全局的规范类来使用的

    public static class WriteLock implements Lock{}
    public static class ReadLock implements Lock{}
    
    • 1
    • 2

    2.1,读写状态state的设计

    在之前的AQS系列中,都是通过cas + 同步等待队列来实现这个加锁的方式,不管是共享锁还是独占锁,同步状态器state的值都是只用于表示独占状态或者共享状态,就是一个单一的职责,那么读写锁是如何通过这个state来表示两种状态的呢,直接看源码吧

    在这个ReentrantReadWriteLock的内部类中,有一个Sync的抽象的静态内部类,在中间有一段注释,其翻译结果如下

    读取与写入计数提取常量和函数。锁状态在逻辑上分为两个无符号短路:较低的一个表示独占(写入器)锁保持计数,而较高的是共享(读取器)锁保持数

    请添加图片描述

    也就是说,读写锁也是通过这个state这个关键字来区分的,state是int的数据类型,占4个字节,就是32位,那么高16位表示的就是共享节点的数据,低16位表示的就是独占节点的数据

    //高16位,共享节点	  低16位,独占节点	
    00000000 00000000 00000000 00000000
    
    • 1
    • 2

    因此就定义了变量 SHARED_SHIFT ,值为16,SHARED_UNIT 表示的是共享节点的单位,1右移16位;MAX_COUNT 表示的是最大线程的数据,不管是共享节点还是独占节点的最大值;EXCLUSIVE_MASK 表示的是独占节点的数据

    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
    • 1
    • 2
    • 3
    • 4

    而且在获取数据时,内部已经定义好了返回值,如获取共享结点的线程数,独占结点的重入数等

    //获取多少个线程占有这把共享锁,通过右移的方式获取
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    //获取独占锁的总线程数,通过位运算
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
    • 1
    • 2
    • 3
    • 4

    在共享锁中,通过 HoldCounter 方法和 ThreadLocalHoldCounter 来作为计数器的,内部主要是通过ThreadLocal 变量来记录可重入锁的次数。

    static final class HoldCounter {
    	int count = 0;
        // Use id, not reference, to avoid garbage retention
        final long tid = getThreadId(Thread.currentThread());
    }
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.2,写锁的底层实现

    研究完这个state的底层设计之后,再来研究这个 writeLock 写锁底层的实现原理

    private static Lock writeLock = readWriteLock.writeLock();
    writeLock.lock();
    writeLock.unLock();
    
    • 1
    • 2
    • 3
    2.2.1,lock加锁逻辑

    首先依旧是通过这个lock方法作为入口,进去会进入这个 acquire 方法,首先会进入这个 tryAcquire 方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&	//尝试加锁
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))	//加锁失败则进入队列,独占节点
            selfInterrupt();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    随后查看这个方法,根据方法名称可知这是一个获取锁的方法。

    由于是写锁加锁的逻辑,首先会获取state值,如果是0,那么高位肯定是没有值得,那么肯定是不存在读锁,后续直接操作这个写锁的逻辑即可。

    如果state值不为0,那么就需要判断这个state是读锁还是写锁,首先会获取 exclusiveCount 写锁的总个数,如果个数为0,那么表示这个state的值都是读锁的,那么直接返回false,如果这个线程不是重入的,那么也会直接返回false;如果当前线程满足上面的任意条件,那么会判断当前线程重入的次数,不能超过2的32次方-1。总而言之可以总结成两句话:写写互斥,写读互斥

    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();	//获取当前线程 
        int c = getState();		//获取状态,state位32位的值
        int w = exclusiveCount(c);	//获取独占的总线程个数
        //如果state值不为0
        if (c != 0) {	
            //如果独占的写锁个数为0并且不是重入锁,那么整个状态为读锁状态
            if (w == 0	//写读互斥 
                || current != getExclusiveOwnerThread())	//写写互斥
                return false;	//则直接返回false
            //重入,个数不能大于2的16-1次方
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //state的值+1,因为是写锁,只需要在低位加即可,不需要考虑左移
            setState(c + acquires);
            return true;
        }
    	//如果获取的state值为0,那么高16位肯定是全为0,那么就进入独占锁逻辑 
        if (writerShouldBlock() ||	//阻塞操作,没有队列则创建
                !compareAndSetState(c, c + acquires))	//cas抢锁
            return false;
        //同步状态器设置当前线程为独占锁线程
        setExclusiveOwnerThread(current);
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    如果获取锁失败,则会执行入队的操作,前面几篇AQS的文章在写入队,阻塞的这些方法已经写了很多次了,因此这里不做详细的解释

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    上面的是入队的操作,下面的是阻塞的逻辑,可以查看前面几篇AQS的文章,写的很详细了

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    2.2.2,unlock解锁逻辑

    上面讲解了加锁的逻辑,下面的是writeLock解锁的逻辑,比较简单

    public final boolean release(int arg) {
    	if (tryRelease(arg)) {
    		Node h = head;
    		if (h != null && h.waitStatus != 0)
    			unparkSuccessor(h);
    		return true;
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    首先会进入 tryRelease 方法中释放锁,就是将state的状态减1,由于是写锁,因此对低位操作刚好就是减少写锁的状态值

    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;	//状态减1
        boolean free = exclusiveCount(nextc) == 0;	//判断写锁状态是否为0
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    最后会调用这个 unparkSuccessor 进行出队和唤醒的操作,最终是调用这个LockSupport.unpark方法

    LockSupport.unpark(s.thread);
    
    • 1

    2.3,读锁的底层实现

    写锁的逻辑相对而言是比较简单的,接下来查看读锁的底层逻辑实现

    private static Lock readLock = readWriteLock.readLock();
    readLock.lock();
    readLock.unLock();
    
    • 1
    • 2
    • 3
    2.3.1,读锁的加锁逻辑

    依旧先查看这个readLock的lock加锁方法,很明显,操作的结点是一个共享结点,上面的写锁是一个独占结点。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    • 1
    • 2
    • 3
    • 4

    继续查看内部tryAcquireShared方法的加锁逻辑,很明显代码比写锁的更多。

    首先依旧是会先判断是否存在写结点,如果存在直接返回-1,并且会判断在写结点降级成读结点时,结点是否为同一个结点,如果不是同一个结点则返回-1

    如果只存在读结点,则会进行尝试加锁的操作,如果是第一次加这个读锁,则会单独记录,如果不是第一次加,则会将这些可重入锁存储在ThreadLocal里面,从而记录可重入锁的总数。最后如果加锁失败,则会自旋重试加锁

    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();		//获取同步状态器的值
        if (exclusiveCount(c) != 0 &&	//低位写结点的值不为0,读写互斥
                getExclusiveOwnerThread() != current)	//判断当前线程是否写线程,降级操作
            return -1;
        int r = sharedCount(c);		//读锁的总数
        if (!readerShouldBlock() &&	
                r < MAX_COUNT &&	//小于65535
                compareAndSetState(c, c + SHARED_UNIT)) {	//尝试加锁,加65535
            if (r == 0) {	//第一次进来获取读锁
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {	//代表重入
                firstReaderHoldCount++;	
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh); 
                rh.count++; //threadLocl + 1
            }
            return 1;
        }
        return fullTryAcquireShared(current);	//	加锁失败再次尝试获取
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    2.3.1,读锁的解锁逻辑

    接下来继续查看这个unlock方法,首先会进入这个 releaseShared 方法

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    接下来查看这个 tryReleaseShared 释放锁的方法,其内部主要就是做了两件事,第一件就是将重入锁的值降低为0,第二件事就是讲高位的值进行减1的操作

    protected final boolean tryReleaseShared(int unused) {   
    	Thread current = Thread.currentThread();
        //如果有可重入锁,则清0
        if (firstReader == current) {	
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1) firstReader = null;
            else firstReaderHoldCount--;
        } else {
            ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0) throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;	//减高位的值
            if (compareAndSetState(c, nextc)) return nextc == 0;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    3,总结

    ReentrantReadWriteLock读写锁底层也是通过AQS实现,通过state的高低位来区分读锁和写锁,高位表示读锁,低位表示写锁,高位读锁采用的是共享结点,低位写锁采用的是独占节点,共享结点中采用ThreadLocal来保存重入锁的次数,在整个读写锁中,主要是满足以下的规则:读读共享、读写互斥、写写互斥、写读降级

    内部的ReadLock和WriteLock满足AQS的所有特性,内部也全部实现了CAS抢锁、失败入队、队列不存在则先创建队列,线程阻塞、修改结点状态、结点出队、线程唤醒等操作

  • 相关阅读:
    ZPLPrinter Emulator SDK v4.0.22.722 Crack
    spring5.3 十一:spring启动过程源码分析
    全屋智能:鱼很大,但水更深
    pyflink map 字典写入ES
    计算机毕业设计springboot家具销售系统tj2lo源码+系统+程序+lw文档+部署
    数据库基本增删改查语法和多表联查的方式
    吉他&乐理
    要不是家里穷,我也不想当码农
    第11章 增删改数据&第12章 MySQL数据类型
    使用whistle抓包实战
  • 原文地址:https://blog.csdn.net/zhenghuishengq/article/details/133629550