• “深入探讨Java JUC中的ReentrantLock锁:实现多线程同步与并发控制“


    简介

    1、从Java5开始,Java提供了一种功能更强大的线程同步机制——通过显式定义同步锁对象来实现同步,在这种机制下,同步锁由Lock对象充当。

    2、Lock 提供了比synchronized方法和synchronized代码块更广泛的锁定操作,Lock允许实现更灵活的结构,可以具有差别很大的属性,并且支持多个相关的Condition对象。

    3、Lock是控制多个线程对共享资源进行访问的工具。通常,锁提供了对共享资源的独占访问,每次只能有一个线程对Lock对象加锁,线程开始访问共享资源之前应先获得Lock对象。

    4、某些锁可能允许对共享资源并发访问,如ReadWriteLock(读写锁),Lock、ReadWriteLock是Java5提供的两个根接口,并为Lock 提供了ReentrantLock(可重入锁)实现类,为ReadWriteLock提供了ReentrantReadWriteLock 实现类。

    5、Java8新增了新型的StampedLock类,在大多数场景中它可以替代传统的ReentrantReadWriteLock。ReentrantReadWriteLock 为读写操作提供了三种锁模式:Writing、ReadingOptimistic、Reading。
    在这里插入图片描述

    ReentrantLock

    什么是ReentrantLock

    ReentrantLock 是 Java 中的一种锁实现,它提供了与传统的 synchronized 关键字相似的功能,但具有更多的灵活性和控制能力。

    ReentrantLock特性

    可重入性: 与 synchronized 一样,ReentrantLock 具有可重入性,这意味着线程可以多次获取同一个锁而不会出现死锁。

    锁的公平性: ReentrantLock 支持公平锁和非公平锁。在公平锁模式下,锁将按照线程请求的顺序分配。在非公平锁模式下,锁将在可用时立即分配给等待线程。

    Condition 对象: ReentrantLock 提供了 Condition 对象,它允许线程在特定条件下等待和通知其他线程。这对于线程间的协作非常有用。

    中断响应: ReentrantLock 支持中断响应,这意味着线程可以在等待锁的过程中响应中断信号。

    超时锁定: ReentrantLock 允许您尝试获取锁,并设置一个超时时间。如果在超时时间内无法获取锁,线程可以执行其他操作。

    lock 锁和synchronized 对比

    可重入性:
    ReentrantLock 具有可重入性,允许同一线程多次获取同一个锁而不会引发死锁。
    synchronized 也是可重入的,同一线程可以多次获得同一个锁。
    灵活性:
    ReentrantLock 提供了更多的灵活性和控制,允许你选择公平性和非公平性、设置超时、使用读写锁等高级功能。
    synchronized 相对较简单,提供的功能较少,不支持超时、读写锁等高级功能。
    条件等待:
    ReentrantLock 提供了 Condition 对象,允许线程在特定条件下等待,然后在条件满足时重新获取锁。
    synchronized 缺少这种直接的条件等待机制,但可以使用 wait() 和 notify() 方法实现类似的功能。
    公平性:
    ReentrantLock 允许你选择锁的公平性,以公平或非公平方式分配锁。在公平模式下,锁将按照等待顺序分配给等待的线程。
    synchronized 使用的是非公平锁,不保证按等待顺序分配。
    性能:
    synchronized 在某些情况下可能比 ReentrantLock 更高效,因为它是 JVM 内置的一种机制。
    ReentrantLock 在高竞争情况下可以提供更好的性能,但它的创建和维护成本通常更高。
    异常处理:
    ReentrantLock 具有灵活的异常处理机制,可以捕获并处理锁操作中的异常。
    synchronized 的异常处理相对较简单,一旦发生异常,锁将自动释放。
    可中断性:
    ReentrantLock 允许线程响应中断,可以在等待锁时中断线程。
    synchronized 不支持线程中断。
    锁的可绑定性:
    ReentrantLock 允许将锁绑定到多个条件。
    synchronized 不提供类似的绑定条件的机制。

    使用案例

    在这个示例中,我们创建了一个ReentrantLock实例,并使用它来保护SharedResource对象中的doWork方法。两个线程(“Thread 1"和"Thread 2”)共享SharedResource对象,并分别调用doWork方法。lock.lock()获取锁,lock.unlock()释放锁,确保在同一时刻只有一个线程可以进入doWork方法的同步块。这确保了线程之间的安全性和同步执行。

    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class LockExample {
        public static void main(String[] args) {
            // 创建一个ReentrantLock实例
            Lock lock = new ReentrantLock();
    
            // 创建一个共享资源
            SharedResource resource = new SharedResource(lock);
    
            // 创建多个线程并启动
            Thread thread1 = new Thread(new Worker(resource), "Thread 1");
            Thread thread2 = new Thread(new Worker(resource), "Thread 2");
    
            thread1.start();
            thread2.start();
        }
    }
    
    class SharedResource {
        private Lock lock;
    
        public SharedResource(Lock lock) {
            this.lock = lock;
        }
    
        public void doWork() {
            lock.lock(); // 获取锁
            try {
                // 同步的代码块
                for (int i = 1; i <= 5; i++) {
                    System.out.println(Thread.currentThread().getName() + " is working: " + i);
                }
            } finally {
                lock.unlock(); // 释放锁
            }
        }
    }
    
    class Worker implements Runnable {
        private SharedResource resource;
    
        public Worker(SharedResource resource) {
            this.resource = resource;
        }
    
        @Override
        public void run() {
            resource.doWork();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    AQS回顾

    AQS即AbstractQueuedSynchronizer的缩写,这个是个内部实现了两个队列的抽象类,分别是同步队列和条件队列。其中同步队列是一个双向链表,里面储存的是处于等待状态的线程,正在排队等待唤醒去获取锁,而条件队列是一个单向链表,里面储存的也是处于等待状态的线程,只不过这些线程唤醒的结果是加入到了同步队列的队尾,AQS所做的就是管理这两个队列里面线程之间的等待状态-唤醒的工作。
    在同步队列中,还存在2中模式,分别是独占模式和共享模式,这两种模式的区别就在于AQS在唤醒线程节点的时候是不是传递唤醒,这两种模式分别对应独占锁和共享锁。
    AQS是一个抽象类,所以不能直接实例化,当我们需要实现一个自定义锁的时候可以去继承AQS然后重写获取锁的方式和释放锁的方式还有管理state,而ReentrantLock就是通过重写了AQS的tryAcquire和tryRelease方法实现的lock和unlock。
    详情可以参考 https://juejin.cn/post/7006895386103119908

    ReentrantLock实现原理

    请添加图片描述

    ReentrantLock结构

    在这里插入图片描述
    ReentrantLock实现Lock接口 有三个内部类 分别是 Sync、NonfairSync、FairSync,其中Sync内部类继承自AQS承接了AQS的功能,NonfairSync代表非公平锁、FairSync 代表公平锁 他们都是继承了Sync类,通过Sync重写的方法tryAcquire、tryRelease可以知道,ReentrantLock实现的是AQS的独占模式,也就是独占锁,这个锁是悲观锁。

    非公平锁实现原理

    获取锁

    请添加图片描述
    ReentrantLock有两个构造方法,无参构造方法默认是创建非公平锁,fair传false 也是非公平锁
    默认非公平锁 所以子类NonfairSync 实现父类的抽象方法执行 lock
    1.先用case 尝试去更新state的值
    如果能更新成功就表示可以抢占到锁 把state更新成1 并设置线程信息执行结束
    如果更新失败即此时state不等于0代表此时锁被其他线程占据着则执行acquire方法
    2.nonfairTryAcquire 首先会获取state的值 判断state是否等于0 如果此时等于0 则代表有线程释放锁了,并且把state改回了0 ,
    如果此时state 等于0 就再次尝试用cas 去将state的值由0变更成1 如果变更成功就代表抢占到了锁 然后设置一下线程信息(这里就体现了非公平锁的特性 不会在意阻塞队列中是否有等待的线程)然后结束
    如果此时state不等于0 或者 cas 更新 state值失败则代表有线程占据着锁 此时会去判断当前线程是否是获得锁的线程 如果是获得锁的线程则代表是重入的则将state进行+1 然后执行结束
    3.如果这个当前线程不是获得锁的线程,则会构建一个Node节点 然后由尾部放到阻塞队列中 park住

        public ReentrantLock() {
            sync = new NonfairSync();
        }
    
        /**
         * Creates an instance of {@code ReentrantLock} with the
         * given fairness policy.
         *
         * @param fair {@code true} if this lock should use a fair ordering policy
         */
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
      final void lock() {
               //cas 原子操作修改state 的值 如果能修改成功则把0变成1 然后记录当前线程id
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    //抢占锁逻辑
                    acquire(1);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
      public final void acquire(int arg) {
             //尝试获取独占锁 
            if (!tryAcquire(arg) &&
                //如果失败则假如aqs 队列中
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
           final boolean nonfairTryAcquire(int acquires) {
                //获取当前线程
                final Thread current = Thread.currentThread();//拿到 State 值    
                int c = getState();
                //如果是0 表示可以去获得锁
                if (c == 0) {
                   //cas 原子操作修改state 的值 如果能修改成功则把0变成1 然后记录当前线程id
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //如果当前线程就是获得锁的线程
                else if (current == getExclusiveOwnerThread()) {
                    //增加重入次数
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
     private Node addWaiter(Node mode) {
            //构建一个node
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            // tail = 尾节点 默认是null
            Node pred = tail;
            if (pred != null) {
                //如果尾节点不等于空 把当前节点当成尾节点 然后把prev指针指向上一个节点 把新进来的节点改成尾节点
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                     //把上一个节点的next 指针指向刚进来的节点
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
     private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                     //如果尾节点 = = null 用cas 构建一个节点 
                    if (compareAndSetHead(new Node()))
                        //把头节点赋值给尾节点
                        tail = head;
                } else {
                    //如果尾节点不等于空 把当前节点当成尾节点 然后把prev指针指向上一个节点 把新进来的节点改成尾节点
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        //把上一个节点的next 指针指向刚进来的节点
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
     final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    //获取当前节点的前一个节点
                    final Node p = node.predecessor();
                    //如果当前节点的前一个结点是头节点 则说明有资格去争夺锁
                    if (p == head && tryAcquire(arg)) {
                        //把当前节点设置成头节点
                        setHead(node);
                        
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    // 当获取(资源)失败后,检查并且更新结点状态
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前驱结点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1
            // 可以进行park操作
            return true; 
        if (ws > 0) { // 表示状态为CANCELLED,为1
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点
            // 赋值pred结点的next域
            pred.next = node; 
        } else { // 为PROPAGATE -3 或者是0 表示无状态,(为CONDITION -2时,表示此节点在condition queue中) 
            // 比较并设置前驱结点的状态为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 
        }
        // 不能进行park操作
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
     private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
    • 1
    • 2
    • 3
    • 4

    释放锁

    请添加图片描述
    1.判断当前线程是不是锁的所有者,如果是则进行步骤2,如果不是则抛出异常。
    2.判断此次释放锁后state的值是否为0,如果是则代表锁有没有重入,然后将锁的所有者设置成null且返回true,然后执行步骤3,如果不是则代表锁发生了重入执行步骤4。
    3.现在锁已经释放完,即state=0,唤醒同步队列中的后继节点进行锁的获取。
    4.锁还没有释放完,即state!=0,不唤醒同步队列。

     public void unlock() {
            sync.release(1);
        }
    
    • 1
    • 2
    • 3
       public final boolean release(int arg) {
            //释放锁成功
            if (tryRelease(arg)) {
                Node h = head;
                //如果头节点不为空 并且状态不为0 
                if (h != null && h.waitStatus != 0)
                    //唤醒
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
     protected final boolean tryRelease(int releases) {
                //state -1
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                     //如果c =0 表示当前是无锁状态 把线程iq清空
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                //重新设置 state
                setState(c);
                return free;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                //设置head节点的状态为0 
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            //拿到head节点的下一个节点
            Node s = node.next;
            //如果下一个节点为null 或者 status>0则表示是 CANCELLED 状态
            //听过尾部节点开始扫描  找到距离 head最近的一个 waitStatus<=0的节点
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //如果next 节点不等于空直接唤醒这个线程
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    公平锁实现原理

    请添加图片描述
    1.获取状态的state的值,如果state=0即代表锁没有被其它线程占用(但是并不代表同步队列没有线程在等待),执行步骤2。如果state!=0则代表锁正在被其它线程占用,执行步骤3。
    2.判断同步队列是否存在线程(节点),如果不存在则直接将锁的所有者设置成当前线程,且更新状态state,然后返回true。
    3.判断锁的所有者是不是当前线程,如果是则更新状态state的值,然后返回true,如果不是,那么返回false,即线程会被加入到同步队列中

    final void lock() {
        acquire(1);
    }
    
    public final void acquire(int arg) {
        //同步队列中有线程 且 锁的所有者不是当前线程那么将线程加入到同步队列的尾部,
        //保证了公平性,也就是先来的线程先获得锁,后来的不能抢先获取。
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //判断状态state是否等于0,等于0代表锁没有被占用,不等于0则代表锁被占用着。
        if (c == 0) {
            //调用hasQueuedPredecessors方法判断同步队列中是否有线程在等待,如果同步队列中没有
            //线程在等待 则当前线程成为锁的所有者,如果同步队列中有线程在等待,则继续往下执行
            //这个机制就是公平锁的机制,也就是先让先来的线程获取锁,后来的不能抢先获取。
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //判断当前线程是否为锁的所有者,如果是,那么直接更新状态state,然后返回true。
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        //如果同步队列中有线程存在 且 锁的所有者不是当前线程,则返回false。
        return false;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
  • 相关阅读:
    机器学习入门到大神专栏总览
    C和指针 第13章 高级指针话题 13.6 总结
    备战蓝桥杯---图论基础理论
    企业电脑监控软件
    算法与数据结构【30天】集训营——线性表的定义及特点-顺序表的表示与实现及操作(03)
    基于体素场景的摄像机穿模处理
    优先级反转那些事儿
    python从入门到实践 第17章:使用API自己感悟和部分代码
    FPGA+ARM异核架构,基于米尔MYC-JX8MMA7核心板的全自动血细胞分析仪
    HIVE优化和数据倾斜、合并小文件
  • 原文地址:https://blog.csdn.net/qq_41956309/article/details/133965075