• 剑指JUC原理-16.读写锁


    • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
    • 📕系列专栏:Spring源码、JUC源码
    • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
    • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
    • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

    ReentrantReadWriteLock

    其定义就是支持冲入的读写锁,本质上也就是基于 ReentrantLock 实现的

    当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。

    类似于数据库中的 select … from … lock in share mode

    提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

    class DataContainer {
        private Object data;
        private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
        private ReentrantReadWriteLock.ReadLock r = rw.readLock();
        private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
        public Object read() {
            log.debug("获取读锁...");
            r.lock();
            try {
                log.debug("读取");
                sleep(1);
                return data;
            } finally {
                log.debug("释放读锁...");
                r.unlock();
            }
        }
        public void write() {
            log.debug("获取写锁...");
            w.lock();
            try {
                log.debug("写入");
                sleep(1);
            } finally {
                log.debug("释放写锁...");
                w.unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    测试 读锁-读锁 可以并发

    		DataContainer dataContainer = new DataContainer();
            new Thread(() -> {
                dataContainer.read();
            }, "t1").start();
            new Thread(() -> {
                dataContainer.read();
            }, "t2").start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响

    14:05:14.341 c.DataContainer [t2] - 获取读锁... 
    14:05:14.341 c.DataContainer [t1] - 获取读锁... 
    14:05:14.345 c.DataContainer [t1] - 读取
    14:05:14.345 c.DataContainer [t2] - 读取
    14:05:15.365 c.DataContainer [t2] - 释放读锁... 
    14:05:15.386 c.DataContainer [t1] - 释放读锁... 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    测试 读锁-写锁 相互阻塞

    		DataContainer dataContainer = new DataContainer();
            new Thread(() -> {
                dataContainer.read();
            }, "t1").start();
            Thread.sleep(100);
            new Thread(() -> {
                dataContainer.write();
            }, "t2").start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    输出结果

    14:04:21.838 c.DataContainer [t1] - 获取读锁... 
    14:04:21.838 c.DataContainer [t2] - 获取写锁... 
    14:04:21.841 c.DataContainer [t2] - 写入
    14:04:22.843 c.DataContainer [t2] - 释放写锁... 
    14:04:22.843 c.DataContainer [t1] - 读取
    14:04:23.843 c.DataContainer [t1] - 释放读锁... 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    写锁-写锁 也是相互阻塞的,这里就不测试了

    注意事项

    • 读锁不支持条件变量

    ReentrantReadWriteLock 中的读锁不支持条件变量,主要是因为读锁在 ReentrantReadWriteLock 中是共享的,多个线程可以同时持有读锁来访问共享资源。条件变量通常用于在多线程环境下实现线程间的协调和通信,而读锁的共享特性可能导致条件变量的信号在多个线程之间产生歧义或不确定性。

    • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
    		r.lock();
            try {
                // ...
                w.lock();
                try {
                    // ...
                } finally{
                    w.unlock();
                }
            } finally{
                r.unlock();
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 重入时降级支持:即持有写锁的情况下去获取读锁
    // 下面以ReentrantReadWriteLock 的 CachedData 类来说明,这段代码主要是使用读写锁来实现对缓存数据的并发访问,以提高并发读取操作的性能。
    
    
    class CachedData {
        Object data;
        // 是否有效,如果失效,需要重新计算 data
        volatile boolean cacheValid;
        final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        void processCachedData() {
            // 先加读锁,判断缓存是否失效,如果没有失效,那么可以直接返回即可。使用完了将读锁解开即可
            rwl.readLock().lock();
            if (!cacheValid) {
                // 如果失效了,释放读锁,然后获得写锁,重新对其进行计算
                // 获取写锁前必须释放读锁
                rwl.readLock().unlock();
                rwl.writeLock().lock();
                try {
                    // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
                    
                    /*
                    在上述代码中,两次检查 cacheValid 的作用是为了在获取写锁之前和获取写锁后再次确认缓存的有效性。让我来详细解释一下:
    
    第一次检查 cacheValid 发生在首次获取写锁之前。这是为了避免出现竞态条件(race condition)的情况,即在当前线程释放读锁后,有可能其他线程已经获取了写锁并更新了缓存。如果没有进行第一次检查,当前线程获取写锁后可能会重复更新缓存,造成不必要的计算和数据更新。
    
    第二次检查 cacheValid 发生在获取写锁之后。虽然在第一次检查时缓存无效,但在当前线程获取写锁之前,可能有其他线程已经更新了缓存并将 cacheValid 设置为有效。因此,在获取写锁后再次检查 cacheValid 可以避免重复更新缓存,确保只有一个线程更新缓存数据。
    
    通过这样的双重检查机制,可以有效地避免多个线程同时更新缓存数据,确保在并发环境下对缓存的更新操作是正确且高效的。此外,结合读写锁的降级操作,可以使得在缓存有效时多个线程能够同时读取数据,从而提高系统的并发性能。
                    
                    */
                    
                    
                    if (!cacheValid) {
                        data = ...
                        cacheValid = true;
                    }
                    /*
                    锁的降级,写锁释放开,但是我还想同时持有它的读锁,这是为了释放开的那个瞬间,其他线程的读取权限就ok了
    
    				加读锁的目的也是为了你在读的的时候不受其他的写的线程的干扰
                    */
                    
                    // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
                    rwl.readLock().lock();
                } finally {
                    rwl.writeLock().unlock();
                }
            }
            // 自己用完数据, 释放读锁 
            try {
                use(data);
            } finally {
                rwl.readLock().unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    应用之缓存

    缓存更新策略

    更新时,是先清缓存还是先更新数据库

    先清缓存

    读操作的速度是大于写操作的

    在这里插入图片描述

    先更新数据库

    在这里插入图片描述

    补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询

    在这里插入图片描述

    这种情况的出现几率非常小

    读写锁实现一致性缓存

    使用读写锁实现一个简单的按需加载缓存(核心:写操作 加 写锁;读操作 加 读锁

    class GenericCachedDao<T> {
        // HashMap 作为缓存非线程安全, 需要保护
        HashMap<SqlPair, T> map = new HashMap<>();
    
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        GenericDao genericDao = new GenericDao();
        public int update(String sql, Object... params) {
            SqlPair key = new SqlPair(sql, params);
            // 加写锁, 防止其它线程对缓存读取和更改
            lock.writeLock().lock();
            try {
                int rows = genericDao.update(sql, params);
                map.clear();
                return rows;
            } finally {
                lock.writeLock().unlock();
            }
        }
        public T queryOne(Class<T> beanClass, String sql, Object... params) {
            SqlPair key = new SqlPair(sql, params);
            // 加读锁, 防止其它线程对缓存更改
            lock.readLock().lock();
            try {
                T value = map.get(key);
                if (value != null) {
                    return value;
                }
            } finally {
                lock.readLock().unlock();
            }
            // 加写锁, 防止其它线程对缓存读取和更改
            lock.writeLock().lock();
            try {
                // get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
                // 为防止重复查询数据库, 再次验证
                T value = map.get(key);
                if (value == null) {
                    // 如果没有, 查询数据库
                    value = genericDao.queryOne(beanClass, sql, params);
                    map.put(key, value);
                }
                return value;
            } finally {
                lock.writeLock().unlock();
            }
        }
        // 作为 key 保证其是不可变的
        class SqlPair {
            private String sql;
            private Object[] params;
            public SqlPair(String sql, Object[] params) {
                this.sql = sql;
                this.params = params;
            }
            @Override
            public boolean equals(Object o) {
                if (this == o) {
                    return true;
                }
                if (o == null || getClass() != o.getClass()) {
                    return false;
                }
                SqlPair sqlPair = (SqlPair) o;
                return sql.equals(sqlPair.sql) &&
                        Arrays.equals(params, sqlPair.params);
            }
            @Override
            public int hashCode() {
                int result = Objects.hash(sql);
                result = 31 * result + Arrays.hashCode(params);
                return result;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    注意

    以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑:

    • 适合读多写少,如果写操作比较频繁,以上实现性能低
    • 没有考虑缓存容量
    • 没有考虑缓存过期
    • 只适合单机
    • 并发性还是低,目前只会用一把锁
    • 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)

    读写锁原理

    读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个

    t1 w.lock,t2 r.lock

    在这里插入图片描述

    其实该流程和 ReentrantLock 几乎是一样的,但是还是有一些区别的,比如state不太一样,因为state既要给读锁用,也要给写锁用,所以要将state分成两部分。

    t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁
    使用的是 state 的高 16 位。

    其实t1肯定是能加上锁,接下来分析一下源码:

    ctrl + f12 找到 writelock 里面的lock方法:

    public void lock() {
       sync.acquire(1);
    }
    
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    /*
    	首先会调用tryAcquire 尝试加锁,如果成功了那么后续的代码就不执行了,如果加锁失败了,才会进入这个队列
    
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    protected final boolean tryAcquire(int acquires) {
                /*
                 * Walkthrough:
                 * 1. If read count nonzero or write count nonzero
                 *    and owner is a different thread, fail.
                 * 2. If count would saturate, fail. (This can only
                 *    happen if count is already nonzero.)
                 * 3. Otherwise, this thread is eligible for lock if
                 *    it is either a reentrant acquire or
                 *    queue policy allows it. If so, update state
                 *    and set owner.
                 */
                Thread current = Thread.currentThread();
        // 首先拿到整个state状态
                int c = getState();
                int w = exclusiveCount(c);
        // 如果不等于0,意味着既有可能其他线程加了读锁,也有可能是其他线程加了写锁
        // 因为高16位不等于0或者低16位不等于0都有可能导致 不等于0
                if (c != 0) {
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    // w == 0 代表 加的读锁部分   而 往后执行代表着 可能加的写锁,但是这个写锁是不是自己加的呢?比如先加的写锁,发生了重入,又加了一次写锁
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    // 如果写锁部分 再加 1超过写锁的最大范围了 65535 2的16次方
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    // 可以理解为发生了可重入
                    setState(c + acquires);
                    return true;
                }
        // 如果能往下走,说明c是等于0的,代表别的线程都没有加锁,首先判断写锁是否需要阻塞,其实就意味着公平非公平,如果是非公平锁 就 总会返回false,公平锁会检查这个队列。然后就接着往后面走 是否能compareAndSetState
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
        
        // 将对应线程设置成 owner
                setExclusiveOwnerThread(current);
                return true;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    接下来看 加 读锁的lock方法

    public void lock() {
                sync.acquireShared(1);
            }
    
    public final void acquireShared(int arg) {
        // 尝试去获取这个读锁
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写
    锁占据,那么 tryAcquireShared 返回 -1 表示失败

    tryAcquireShared 返回值表示

    • -1 表示失败
    • 0 表示成功,但后继节点不会继续唤醒(0或者1会在后面的信号量章节介绍)
    • 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1
    protected final int tryAcquireShared(int unused) {
                
                Thread current = Thread.currentThread();
                int c = getState();
        // 检查写锁部分是否不为0 此时t1已经将其变为1了,去检查加写锁的是不是当前线程呢?
        // 这种情况其实就是 t2 已经加了写锁,然后又加了读锁,这里是应该成功的,因为这是锁降级的过程
        // 最终我们当前的情况来看,t2 其实就是返回-1了。
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    在这里插入图片描述

    返回-1,这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

    在这里插入图片描述

    private void doAcquireShared(int arg) {
        // 唤醒的时候,判断的逻辑稍有不同
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    // 死循环,去找t2 有没有前驱节点
                    final Node p = node.predecessor();
                    // 如果前驱节点是 head,那么说明其是 第二个节点,是有资格争抢锁的
                    if (p == head) {
                        // 调用tryAcquireShared  返回-1表示失败,返回0或者1表示成功
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    // 如果返回-1,说明并没有释放锁,那么就会走到这个逻辑,和ReentrantLock逻辑一致,走park	shouldParkAfterFailedAcquire将其前驱节点设置成-1,然后 重新for循环,设置为park
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在这里插入图片描述

    t3 r.lock,t4 w.lock

    这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

    在这里插入图片描述

    t2 t3都是读锁,所以状态都是 SHARED,而t4是写锁,所以状态是EXCLUSIVE

    t1 w.unlock

    这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

    public void unlock() {
                sync.release(1);
            }
            
    public final boolean release(int arg) {
        // 如果return true呢,就会执行后续的逻辑
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
        // 首先在原来的基础上减1
                int nextc = getState() - releases;
        // 然后去查看写锁部分是不是减成0了
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
        // 如果没有减成0 代表着这事一次锁的重入
                setState(nextc);
                return free;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    在这里插入图片描述

    接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内
    parkAndCheckInterrupt() 处恢复运行

    这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

    private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            
                            // 此时进入到这个条件中
                            
                            // 替换头结点
                            
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        // 此时这里唤醒了,就可以继续运行了,然后再循环
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    
    // 返回-1表示失败,返回0或者1表示成功
    protected final int tryAcquireShared(int unused) {
                
                Thread current = Thread.currentThread();
                int c = getState();
        // 返回-1的之前分析过了,就不再做分析了
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
        // 获取读锁,也就是高16位
                int r = sharedCount(c);
        // 不应该被阻塞住
                if (!readerShouldBlock() &&
                    // 没有超过最大基数
                    r < MAX_COUNT &&
                    // 对于高位来讲,不是加1,是加了65536
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    
                    // 如果都符合,代表后续读锁加锁成功了
                    
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    在这里插入图片描述

    这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

    在这里插入图片描述

    // 替换头结点
    private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                // 拿到当前节点的下一个 
                Node s = node.next;
                // 如果节点的状态是 shared的话,
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用
    doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行

    private void doReleaseShared() {
            
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        // 在这里会将节点的状态从 -1 改成 0
    
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        // 对头结点的后继结点又要去唤醒,这就接上了 此时就唤醒t3的park
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    在这里插入图片描述

    private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        // 又再次来到了 tryAcquireShared方法,和前面的流程是一样的
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        // 此时t3 被唤醒了,和之前的流程是一样的
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    protected final int tryAcquireShared(int unused) {
                
                Thread current = Thread.currentThread();
                int c = getState();
        // 返回-1的之前分析过了,就不再做分析了
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
        // 获取读锁,也就是高16位
                int r = sharedCount(c);
        // 不应该被阻塞住
                if (!readerShouldBlock() &&
                    // 没有超过最大基数
                    r < MAX_COUNT &&
                    // 对于高位来讲,不是加1,是加了65536
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    
                    // 如果都符合,代表后续读锁加锁成功了
                    
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

    在这里插入图片描述

    这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

    在这里插入图片描述

    // 替换头结点
    private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                // 拿到当前节点的下一个 
                Node s = node.next;
                // 如果节点的状态是 shared的话,
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点

    t2 r.unlock,t3 r.unlock

    t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零

    在这里插入图片描述

    public void unlock() {
                sync.releaseShared(1);
            }
    
    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {
                    int c = getState();
                    // 获取状态,减去高位的1
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        // Releasing the read lock has no effect on readers,
                        // but it may allow waiting writers to proceed if
                        // both read and write locks are now free.
                        return nextc == 0;
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入
    doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

    在这里插入图片描述

    private void doReleaseShared() {
            
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        // 本质上唤醒t4
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;😉 这次自己是老二,并且没有其他
    竞争,tryAcquire(1) 成功,修改头结点,流程结束

    在这里插入图片描述

    StampedLock

    该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用

    这里对优化性能部分做了专门的解读:

    之前学读写锁,其实已经看到了读读可以并发,已经很快了,但是还不够快,读读并发的时候,底层还是用cas的方式去修改它的状态,读锁的高16位去修改它的状态,它还是性能上比不上不加锁,如果希望读取的性能达到这种极致,那么就可以使用StampedLock。

    加解读锁

    long stamp = lock.readLock();
    lock.unlockRead(stamp);
    
    • 1
    • 2

    加解写锁

    long stamp = lock.writeLock();
    lock.unlockWrite(stamp);
    
    • 1
    • 2

    乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通
    过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

    // 这里面没有加任何的锁
    long stamp = lock.tryOptimisticRead();
    // 验戳
    if(!lock.validate(stamp)){
     	// 锁升级
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

    class DataContainerStamped {
        private int data;
        private final StampedLock lock = new StampedLock();
        public DataContainerStamped(int data) {
            this.data = data;
        }
        public int read(int readTime) {
            long stamp = lock.tryOptimisticRead();
            log.debug("optimistic read locking...{}", stamp);
            sleep(readTime);
            if (lock.validate(stamp)) {
                log.debug("read finish...{}, data:{}", stamp, data);
                return data;
            }
            // 锁升级 - 读锁
            log.debug("updating to read lock... {}", stamp);
            try {
                stamp = lock.readLock();
                log.debug("read lock {}", stamp);
                sleep(readTime);
                log.debug("read finish...{}, data:{}", stamp, data);
                return data;
            } finally {
                log.debug("read unlock {}", stamp);
                lock.unlockRead(stamp);
            }
        }
        public void write(int newData) {
            long stamp = lock.writeLock();
            log.debug("write lock {}", stamp);
            try {
                sleep(2);
                this.data = newData;
            } finally {
                log.debug("write unlock {}", stamp);
                lock.unlockWrite(stamp);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    测试 读-读 可以优化

    public static void main(String[] args) {
            DataContainerStamped dataContainer = new DataContainerStamped(1);
            new Thread(() -> {
                dataContainer.read(1);
            }, "t1").start();
            sleep(0.5);
            new Thread(() -> {
                dataContainer.read(0);
            }, "t2").start();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    输出结果,可以看到实际没有加读锁

    15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 
    15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 
    15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 
    15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1
    
    • 1
    • 2
    • 3
    • 4

    测试 读-写 时优化读补加读锁

    public static void main(String[] args) {
            DataContainerStamped dataContainer = new DataContainerStamped(1);
            new Thread(() -> {
                dataContainer.read(1);
            }, "t1").start();
            sleep(0.5);
            new Thread(() -> {
                dataContainer.write(100);
            }, "t2").start();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    输出结果

    15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 
    15:57:00.717 c.DataContainerStamped [t2] - write lock 384 
    15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 
    15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 
    15:57:02.719 c.DataContainerStamped [t1] - read lock 513 
    15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 
    15:57:03.719 c.DataContainerStamped [t1] - read unlock 513 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    注意

    • StampedLock 不支持条件变量
    • StampedLock 不支持可重入

    相关推荐博客

    👉👉👉 剑指JUC原理-1.进程与线程-CSDN博客

    👉👉👉 剑指JUC原理-2.线程-CSDN博客

    👉👉👉 剑指JUC原理-3.线程常用方法及状态-CSDN博客

    👉👉👉 剑指JUC原理-4.共享资源和线程安全性-CSDN博客

    👉👉👉 剑指JUC原理-5.synchronized底层原理-CSDN博客

    👉👉👉 剑指JUC原理-6.wait notify-CSDN博客

    👉👉👉 剑指JUC原理-7.线程状态与ReentrantLock-CSDN博客

    👉👉👉 剑指JUC原理-8.Java内存模型-CSDN博客

    👉👉👉 剑指JUC原理-9.Java无锁模型-CSDN博客

    👉👉👉 剑指JUC原理-10.并发编程大师的原子累加器底层优化原理(与人类的优秀灵魂对话)-CSDN博客

    👉👉👉 剑指JUC原理-11.不可变设计-CSDN博客

    👉👉👉 剑指JUC原理-12.手写简易版线程池思路-CSDN博客

    👉👉👉 剑指JUC原理-13.线程池-CSDN博客

    👉👉👉 剑指JUC原理-14.ReentrantLock原理-CSDN博客

    👉👉👉 剑指JUC原理-15.ThreadLocal-CSDN博客

    👉👉👉 剑指JUC原理-17.CompletableFuture-CSDN博客

    👉👉👉 剑指JUC原理-18.同步协作-CSDN博客

    👉👉👉 剑指JUC原理-19.线程安全集合-CSDN博客

    👉👉👉 剑指JUC原理-20.并发编程实践-CSDN博客

  • 相关阅读:
    福伦王梅花代工爱马仕新款自行车售价16.5万售罄,交不了货?
    【Django】4 Django模型
    Mysql 主从复制与读写分离
    minio文件服务器-docker docker-compose 搭建部署以及使用大全
    朴素贝叶斯算法分类
    Linux ./configure sudo make && make install
    光栅阶次分析器
    SpringCloud OpenFeign 服务调用传递 token
    Makefile 总述
    数据结构与算法之Python实现——栈
  • 原文地址:https://blog.csdn.net/qq_40851232/article/details/134319560