• 10 读写锁ReentrantReadWriteLock


    1 介绍

    为什么要使用读写锁

            需要高并发读取和较低并发写入的应用程序,降低锁的粒度,提高系统性能

    使用场景

            读多写少的共享资源

            缓存管理:读 >> 写,控制多个线程同时读缓存,需要刷新or修改操作时才使用写锁

            数据库连接池:多个线程从池中获取连接(读操作),只有一个线程可以设置连接到池中(写操作)

            文件读写

            数据结构的并发访问

    2 使用

    1. import java.util.concurrent.locks.ReentrantReadWriteLock;
    2. public class SharedResource {
    3. private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    4. private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
    5. private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    6. public void readFromResource() {
    7. readLock.lock(); // 获取读锁
    8. try {
    9. // 执行读取共享资源的操作
    10. } finally {
    11. readLock.unlock(); // 释放读锁
    12. }
    13. }
    14. public void writeToResource() {
    15. writeLock.lock(); // 获取写锁
    16. try {
    17. // 执行写入共享资源的操作
    18. } finally {
    19. writeLock.unlock(); // 释放写锁
    20. }
    21. }
    22. }

    3 原理分析

    读写锁两个状态,读状态、写状态

    但AQS中只有一个state,如何记录两种状态?

            高低位;int4个字节,共32位,采用高16位控制读,低16位控制写

    00000000 00000000 00000000 00000000

    加锁时如何判断读锁、写锁?

            高16位>0,表示有读锁(sharedCount())

            低16位>0,表示有写锁(exclusiveCount())

    如何实现可重入?

            写锁只有一个线程独占,重入则低16位+1即可

            写锁有多个线程持有,如何记录?ThreadLocal线程私有

    4 读锁源码

    读锁:tryAcquireShared()、tryReleaseShared();读读共享

    1. protected final int tryAcquireShared(int unused) {
    2. Thread current = Thread.currentThread();
    3. int c = getState();
    4. //是否其它线程占用排它锁,如果是则不允许获取共享锁
    5. if (exclusiveCount(c) != 0 &&
    6. getExclusiveOwnerThread() != current)
    7. return -1;
    8. //共享锁被获取的数量
    9. int r = sharedCount(c);
    10. //获取共享锁
    11. //1 未阻塞
    12. //2 不超最大读计数
    13. //3 设置共享锁成功
    14. if (!readerShouldBlock() &&
    15. r < MAX_COUNT &&
    16. compareAndSetState(c, c + SHARED_UNIT)) {
    17. //第一次读
    18. if (r == 0) {
    19. firstReader = current;
    20. firstReaderHoldCount = 1;
    21. //重入
    22. } else if (firstReader == current) {
    23. firstReaderHoldCount++;
    24. } else {
    25. //其它线程读
    26. HoldCounter rh = cachedHoldCounter;
    27. if (rh == null || rh.tid != getThreadId(current))
    28. //记录每个线程的重入次数
    29. cachedHoldCounter = rh = readHolds.get();
    30. else if (rh.count == 0)
    31. readHolds.set(rh);
    32. rh.count++;
    33. }
    34. return 1;
    35. }
    36. //若不满足上述条件,则执行方法内的获取共享锁逻辑
    37. return fullTryAcquireShared(current);
    38. }
    1. final int fullTryAcquireShared(Thread current) {
    2. HoldCounter rh = null;
    3. //1 自旋 获取共享锁or失败
    4. for (;;) {
    5. int c = getState();
    6. //2 若存在写锁,且不是当前线程持有的,不允许获取共享锁
    7. if (exclusiveCount(c) != 0) {
    8. if (getExclusiveOwnerThread() != current)
    9. return -1;
    10. //3 读线程阻塞
    11. } else if (readerShouldBlock()) {
    12. if (firstReader == current) {
    13. } else {
    14. if (rh == null) {
    15. rh = cachedHoldCounter;
    16. if (rh == null || rh.tid != getThreadId(current)) {
    17. rh = readHolds.get();
    18. if (rh.count == 0)
    19. readHolds.remove();
    20. }
    21. }
    22. if (rh.count == 0)
    23. return -1;
    24. }
    25. }
    26. // 4 不允许再申请共享锁
    27. if (sharedCount(c) == MAX_COUNT)
    28. throw new Error("Maximum lock count exceeded");
    29. // 5 尝试获取锁
    30. if (compareAndSetState(c, c + SHARED_UNIT)) {
    31. if (sharedCount(c) == 0) {
    32. firstReader = current;
    33. firstReaderHoldCount = 1;
    34. } else if (firstReader == current) {
    35. firstReaderHoldCount++;
    36. } else {
    37. if (rh == null)
    38. rh = cachedHoldCounter;
    39. if (rh == null || rh.tid != getThreadId(current))
    40. rh = readHolds.get();
    41. else if (rh.count == 0)
    42. readHolds.set(rh);
    43. rh.count++;
    44. cachedHoldCounter = rh; // cache for release
    45. }
    46. return 1;
    47. }
    48. }
    49. }

    5 写锁源码

    写锁:tryAcquire()、tryRelease();写写互斥,读写互斥

    1. public final void acquire(int arg) {
    2. if (!tryAcquire(arg) &&
    3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4. selfInterrupt();
    5. }
    6. protected final boolean tryAcquire(int acquires) {
    7. Thread current = Thread.currentThread();
    8. int c = getState();
    9. //获取低16位(写锁)的值,sharedCount()方法获取高16位值
    10. int w = exclusiveCount(c);
    11. //代表有加锁
    12. if (c != 0) {
    13. //1 是读锁,读写互斥,不能加写锁
    14. //2 是写锁,但不是设置重入的写锁,线程之间写写互斥
    15. if (w == 0 || current != getExclusiveOwnerThread())
    16. return false;
    17. if (w + exclusiveCount(acquires) > MAX_COUNT)
    18. throw new Error("Maximum lock count exceeded");
    19. //重入写锁
    20. setState(c + acquires);
    21. return true;
    22. }
    23. //公平锁需要排队,非公平直接让当前线程尝试获取锁
    24. if (writerShouldBlock() ||
    25. !compareAndSetState(c, c + acquires))
    26. return false;
    27. setExclusiveOwnerThread(current);
    28. return true;
    29. }

    6 存在问题

            读的过程不允许写,悲观锁,可能会造成锁饥饿

            为了进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock。StampedLock和ReentrantReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!在原先读写锁的基础上新增了一种叫乐观读(Optimistic Reading)的模式。该模式并不会加锁,所以不会阻塞线程,会有更高的吞吐量和更高的性能

  • 相关阅读:
    SwitchyOmega_Chromium插件的下载安装以及使用
    Redis 哨兵集群方案
    es笔记一之es安装与介绍
    ubuntu20.4部署java web环境
    工地安全帽佩戴识别系统
    让 GPT-4 来修复 Golang “数据竞争”问题(续) - 每天5分钟玩转 GPT 编程系列(7)
    【Leetcode】189. 轮转数组
    学生HTML个人网页作业作品----(画家企业8页)
    java Map及Map.Entry详解
    MySQL实战优化高手08 生产经验:在数据库的压测过程中,如何360度无死角观察机器性能?
  • 原文地址:https://blog.csdn.net/m0_61253315/article/details/133984482