• 4、AQS之ReentrantReadWriteLock



    ReentrantReadWriteLock详解

    简介

    现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁(读多写少)。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源(读读可以并发);但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了(读写,写读,写写互斥)。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量
    针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它内部,维护了一对相关的锁,一个用于只读操作,称为读锁;一个用于写入操作,称为写锁,描述如下:
    线程进入读锁的前提条件:

    • 没有其他线程的写锁
    • 没有写请求或者有写请求,但调用线程和持有锁的线程是同一个

    线程进入写锁的前提条件:

    • 没有其他线程的读锁
    • 没有其他线程的写锁

    三个重要的特性:

    • 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平
    • 可重入:读锁和写锁都支持线程重入。以读写线程为例:读线程获取读锁后,能够再次获取读锁。写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁
    • 锁降级:遵循获取写锁、再获取读锁最后释放写锁的次序,写锁能够降级成为读锁

    使用

    读写锁接口ReadWriteLock

    一对方法,分别获得读锁和写锁 Lock 对象

    在这里插入图片描述

    类结构

    ReentrantReadWriteLock是可重入的读写锁实现类。在它内部,维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 Writer 线程,读锁可以由多个 Reader 线程同时持有。也就是说,写锁是独占的,读锁是共享的

    在这里插入图片描述

    使用读写锁

    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private final Lock r = readWriteLock.readLock();
    private final Lock w = readWriteLock.writeLock();
    // 读操作上读锁
    public Data get(String key) {
        r.lock();
        try {
            // TODO 业务逻辑
        } finally {
            r.unlock();
        }
    }
    
    // 写操作上写锁
    public Data put(String key, Data value) {
        w.lock();
        try {
            // TODO 业务逻辑
        } finally {
            w.unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    注意事项

    • 读锁不支持条件变量
    • 重入时升级不支持:持有读锁的情况下去获取写锁,会导致获取永久等待
    • 重入时支持降级: 持有写锁的情况下可以去获取读锁

    示例Demo

    public class Cache {
        static Map<String, Object> map = new HashMap<String, Object>();
        static ReadWriteLock rwl = new ReentrantReadWriteLock();
        static Lock r = rwl.readLock();
        static Lock w = rwl.writeLock();
        // 获取一个key对应的value
        public static Object get(String key) {
            r.lock();
            try {
                return map.get(key);
            } finally {
                r.unlock();
            }
        }
        // 设置key对应的value,并返回旧的value
        public static Object put(String key, Object value) {
            w.lock();
            try {
                return map.put(key, value);
            } finally {
                w.unlock();
            }
        }
        // 清空所有的内容
        public static void clear() {
            w.lock();
            try {
                map.clear();
            } finally {
                w.unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    上述示例中,Cache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的读锁和写锁来保证Cache是线程安全的。在读操作get(String key)方法中,需要获取读锁,这使得并发访问该方法时不会被阻塞。写操作put(String key,Object value)方法和clear()方法,在更新HashMap时必须提前获取写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,而只有写锁被释放之后,其他读写操作才能继续。Cache使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性,同时简化了编程方式

    锁降级

    锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。锁降级可以帮助我们拿到当前线程修改后的结果而不被其他线程所破坏,防止更新丢失

    锁降级的使用示例

    因为数据不常变化,所以多个线程可以并发地进行数据处理,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他处理线程被阻塞,直到当前线程完成数据的准备工作

    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();
    private volatile boolean update = false;
    
    public void processData() {
        r.lock();
        if (!update) {
            // 必须先释放读锁
            r.unlock();
            // 锁降级从写锁获取到开始
            w.lock();
            try {
                if (!update) {
                    // TODO 准备数据的流程(略)
                    update = true;
                }
                r.lock();
            } finally {
                w.unlock();
            }
            // 锁降级完成,写锁降级为读锁
        }
        try {
            //TODO 使用数据的流程(略)
        } finally {
            r.unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    锁降级中读锁的获取是否必要呢?答案是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新
    ReentrantReadWriteLock不支持锁升级(持有读锁、获取写锁,最后释放读锁的过程)。目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的

    源码分析

    思考

    1. 读写锁是怎样实现分别记录读写状态的?
    2. 写锁是怎样获取和释放的?
    3. 读锁是怎样获取和释放的?

    结构

    继承结构

    在这里插入图片描述


    逻辑分类

    在这里插入图片描述

    读写状态的设计

    设计的精髓:用一个变量如何维护多种状态
    在 ReentrantLock 中,使用 Sync ( 实际是 AQS )的 int 类型的 state 来表示同步状态,表示锁被一个线程重复获取的次数。但是,读写锁 ReentrantReadWriteLock 内部维护着一对读写锁,如果要用一个变量维护多种状态,需要采用“按位切割使用”的方式来维护这个变量,将其切分为两部分:高16为表示读,低16为表示写
    分割之后,通过位运算,迅速确定读锁和写锁的状态。假如当前同步状态为S,那么:

    • 写状态,等于 S & 0x0000FFFF(将高 16 位全部抹去)。 当写状态加1,等于S+1.
    • 读状态,等于 S >>> 16 (无符号补 0 右移 16 位)。当读状态加1,等于S+(1<<16),也就是S+0x00010000

    根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取

    在这里插入图片描述


    代码实现:java.util.concurrent.locks.ReentrantReadWriteLock.Sync

    在这里插入图片描述

    • exclusiveCount(int c) 静态方法,获得持有写状态的锁的次数,也就是写锁的重入计数
    • sharedCount(int c) 静态方法,获得持有读状态的锁的线程数量。不同于写锁,读锁可以同时被多个线程持有。而每个线程持有的读锁支持重入的特性,所以需要对每个线程持有的读锁的数量单独计数,这就需要用到 HoldCounter 计数器

    HoldCounter 计数器

    读锁的内在机制其实就是一个共享锁。一次共享锁的操作就相当于对HoldCounter 计数器的操作。**获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。**只有当线程获取共享锁后才能对共享锁进行释放、重入操作

    在这里插入图片描述


    通过 ThreadLocalHoldCounter 类,HoldCounter 与线程进行绑定。HoldCounter 是绑定线程的一个计数器,而 ThreadLocalHoldCounter 则是线程绑定的 ThreadLocal

    • HoldCounter是用来记录读锁重入数的对象
    • ThreadLocalHoldCounter是ThreadLocal变量,用来存放不是第一个获取读锁的线程的其他线程的读锁重入数对象

    写锁的获取

    **写锁是一个支持重进入的排它锁。**如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态
    写锁的获取是通过重写AQS中的tryAcquire方法实现的

    protected final boolean tryAcquire(int acquires) {
        //当前线程
        Thread current = Thread.currentThread();
        //获取state状态 存在读锁或者写锁,状态就不为0
        int c = getState();
        //获取写锁的重入数
        int w = exclusiveCount(c);
        //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
        if (c != 0) {
            // c!=0 && w==0 表示存在读锁
            // 当前存在读锁或者写锁已经被其他写线程获取,则写锁获取失败
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            // 超出最大范围 65535
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //同步state状态
            setState(c + acquires);
            return true;
        }
        // writerShouldBlock有公平与非公平的实现, 非公平返回false,会尝试通过cas加锁
        //c==0 写锁未被任何线程获取,当前线程是否阻塞或者cas尝试获取锁
        if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
            return false;
    
        //设置写锁为当前线程所有
        setExclusiveOwnerThread(current);
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    通过源码我们可以知道:

    • 读写互斥
    • 写写互斥
    • 写锁支持同一个线程重入
    • writerShouldBlock写锁是否阻塞实现取决公平与非公平的策略(FairSync和NonfairSync)

    在这里插入图片描述

    写锁的释放

    写锁释放通过重写AQS的tryRelease方法实现

    protected final boolean tryRelease(int releases) {
        //若锁的持有者不是当前线程,抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() ‐ releases;
        //当前写状态是否为0,为0则释放写锁
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    读锁的获取

    实现共享式同步组件的同步语义需要通过重写AQS的tryAcquireShared方法和tryReleaseShared方法。读锁的获取实现方法为:

    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        // 如果写锁已经被获取并且获取写锁的线程不是当前线程,当前线程获取读锁失败返回‐1 判断锁降级
        if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
            return1;
        //计算出读锁的数量
        int r = sharedCount(c);
        /**
         * 读锁是否阻塞 readerShouldBlock()公平与非公平的实现
         * r < MAX_COUNT: 持有读锁的线程小于最大数(65535)
         * compareAndSetState(c, c + SHARED_UNIT) cas设置获取读锁线程的数量
         */
        if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) { //当前线程获取读锁
    
            if (r == 0) { //设置第一个获取读锁的线程
                firstReader = current;
                firstReaderHoldCount = 1; //设置第一个获取读锁线程的重入数
            } else if (firstReader == current) { // 表示第一个获取读锁的线程重入
                firstReaderHoldCount++;
            } else { // 非第一个获取读锁的线程
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                28 else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++; //记录其他获取读锁的线程的重入次数
            }
            return 1;
        }
        // 尝试通过自旋的方式获取读锁,实现了重入逻辑
        return fullTryAcquireShared(current);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    由源码可知:

    • 读锁共享,读读不互斥
    • 读锁可重入,每个获取读锁的线程都会记录对应的重入数
    • 读写互斥,锁降级场景除外
    • 支持锁降级,持有写锁的线程,可以获取读锁,但是后续要记得把读锁和写锁读释放
    • readerShouldBlock读锁是否阻塞实现取决公平与非公平的策略(FairSync和NonfairSync)

    在这里插入图片描述

    读锁的释放

    获取到读锁,执行完临界区后,要记得释放读锁(如果重入多次要释放对应的次数),不然会阻塞其他线程的写操作
    读锁释放的实现主要通过方法tryReleaseShared:

    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        //如果当前线程是第一个获取读锁的线程
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount‐‐; //重入次数减1
        } else { //不是第一个获取读锁的线程
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            ‐‐rh.count; //重入次数减1
        }
        for (;;) { //cas更新同步状态
            int c = getState();
            int nextc = c ‐ SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                return nextc == 0;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    在这里插入图片描述

    StampedLock详解

    简介

    StampedLock是比ReentrantReadWriteLock更快的一种锁,支持乐观读、悲观读锁和写锁。和ReentrantReadWriteLock不同的是,StampedLock支持多个线程持有读锁的同时,还允许申请写锁,也就是锁的升级
    StampedLock的底层并不是基于AQS的

    示例

    class Point {
    	private double x, y;
    	private final StampedLock sl = new StampedLock();
    
    	void move(double deltaX, double deltaY) {
    		long stamp = sl.writeLock();
    		try {
    			x += deltaX;
    			y += deltaY;
    		} finally {
    			sl.unlockWrite(stamp);
    		}
    	}
    
    	double distanceFromOrigin() {
    		long stamp = sl.tryOptimisticRead();
    		double currentX = x, currentY = y;
    		if (!sl.validate(stamp)) {
    			stamp = sl.readLock();
    			try {
    				currentX = x;
    				currentY = y;
    			} finally {
    				sl.unlockRead(stamp);
    			}
    		}
    		return Math.sqrt(currentX * currentX + currentY  currentY);
    	}
    
    	void moveIfAtOrigin(double newX, double newY) {
    		long stamp = sl.readLock();
    		try {
    			while (x == 0.0 && y == 0.0) {
    				long ws = sl.tryConvertToWriteLock(stamp);
    				if (ws != 0L) {
    					stamp = ws;
    					x = newX;
    					y = newY;
    					break;
    				}
    				else {
    					sl.unlockRead(stamp);
    					stamp = sl.writeLock();
    				}
    			}
    		} finally {
    			sl.unlock(stamp);
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    StampedLock源码分析

    状态state

    • StampedLock也是通过一个int变量state、一个队列来实现的
    • state的默认值是256,也就是1 0000 0000
    • state是一个int变量,总共有32位
    • 前24位表示版本号、低8位表示锁。
    • 低8位的第1位表示是否为写锁,1表示写锁、0表示没有写锁
    • 剩下7位表示悲观读锁的个数

    节点属性

    队列中的节点对应的是WNode

    static final class WNode {
    	//前继节点
    	volatile WNode prev;
    	//后继节点
    	volatile WNode next;
    	//悲观读锁对应的栈
    	volatile WNode cowait;   
    	//对应的线程
    	volatile Thread thread;  
    	//节点的状态:取消CANCEL 1、等待WATING -1。
    	volatile int status;     
    	//节点类型:读锁、写锁
    	final int mode;   
    	WNode(int m, WNode p) { mode = m; prev = p; }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    队列demo

    队列首节点不存储数据

    在这里插入图片描述

    区别

    1. ReentrantReadWriteLock支持重入,StampedLock不支持重入
    2. ReentrantReadWriteLock基于AQS,StampedLock自己实现了一套同步机制
    3. ReentrantReadWriteLock性能不如StampedLock
    4. ReentrantReadWriteLock不支持锁升级,StampedLock支持锁升级
    5. ReentrantReadWriteLock所有数据都在一个链表存放,StampedLock读节点数据是通过另一个维度的节点来存放
  • 相关阅读:
    什么是RPA?一文了解RPA发展与进程!
    【Newman+Jenkins】实施接口自动化测试
    Flink系列之Flink集群搭建
    VMware中安装centos无网络,配置教程
    SQL必需掌握的100个重要知识点:使用视图
    智能优化之遗传算法
    SpringSecurity6从入门到上天系列第七篇:讲明白SpringBoot的自动装配完善上篇文章中的结论
    查询企业联系方式的途径有哪些?
    vscode使用远程服务器jupyter
    AI助力剧本创作:如何5分钟内构思出热门短剧大纲
  • 原文地址:https://blog.csdn.net/weixin_41381248/article/details/127763688