• ReentrantLock源码分析


    ReentrantLock源码分析

    ReentrantLock介绍

    Java中提供的锁:synchronized,lock锁
    ReentrantLock就是一个互斥锁,可以让多线程执行期间,只有一个线程在执行指定的一段代码。
    使用方式:
    在这里插入图片描述
    我们要分析的是:为什么在多个线程进行竞争的时候lock()可以加锁,只让一个线程进入,为什么在线程执行完业务逻辑的时候释放锁时,别的线程才可以竞争锁资源。

    ReentrantLock的lock方法

    一、简单分析
    进入到lock方法后,内部调用了sync.lock()方法, 去找方法的实现时,发现了两个实现:

    • FairSync:公平锁
      每个线程都会执行lock方法时,会先查看是否有线程排队,如果有,直接去排队,如果没有才会尝试去竞争锁资源
    • NonFairStnc:非公平锁
      每个线程都会执行lock方法时,先尝试获取锁资源,获取不到载排队(也不是很公平的排队)
      在这里插入图片描述

    如果需要使用公平锁需要在构造函数中传入参数true,默认是使用的非公平锁

    更推荐使用非公平锁,非公平锁的效率比公平锁更高。
    在这里插入图片描述
    从源码的角度也发现了公平锁直接调用acquire方法尝试获取锁,
    而非公平锁会先基于CAS的方式尝试获取锁资源,如果获取不到,才会执行acquire方式尝试获取锁

    二、分析AQS
    在这里插入图片描述
    在这里插入图片描述
    AQS就是AbstractQueuedSynchronizer类,AQS内部维护这一个队列
    还有三个核心属性:state,head,tail

    在这里插入图片描述
    在这里插入图片描述
    三、lock方法源码

     final void lock() {
                acquire(1);//调用acquire方法尝试获取锁
            }
    
    
    • 1
    • 2
    • 3
    • 4
    final void lock() {
    			//以CAS的方式,尝试将state从0改为1
                if (compareAndSetState(0, 1))
                //证明修改state成功,也就代表获取锁资源成功。
                //将当前线程设置到AQS中的exclusiveOwnerThread(AOS中),代表当前线程拿着锁资源(和后面的可重入锁有关)
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);//如果获取不到,才会执行acquire方式尝试获取锁
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    ReentrantLock的acquire方法

    //公平锁还是非公平锁都会调用当前的acquire方法
     public final void acquire(int arg) {
     		//tryAcquire方法,分为两种实现,一种是公平锁,一种是非公平锁
     		//公平锁:如果state为0,再看看是否有线程排队,如果有就去排队,如果是锁重入的操作,直接获取锁。
     		//非公平锁:如果state为0,直接尝试cas修改,如果是锁重入操作,直接获取锁。
            if (!tryAcquire(arg) &&
            //addWaiter方法:在线程没有通过tryAcquire拿到锁资源时,需要将当前线程封装为node对象,去AQS内部排队
            //acquireQueued方法:查看当前线程是否是排在队列前面,如果是就尝试获取锁资源。如果长时间没拿到锁,也需要将当前线程挂起
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    ReentrantLock的tryAcquire方法

    tryAcquire方法是AQS提供的,内部并没有任何的实现,需要继承AQS的类自己去实现逻辑代码查看到tryAcquire在ReentrantLock中提供了两种实现:公平锁、非公平锁

    • 非公平锁的具体实现
    protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
    //非公平锁的实现
    final boolean nonfairTryAcquire(int acquires) {
    			//拿到了当前线程  
                final Thread current = Thread.currentThread();
                //拿到AQS的state值
                int c = getState();
                if (c == 0) {
                //直接基于CAS的方式,尝试修改state,从0——1,如果成功就代表拿到锁资源
                    if (compareAndSetState(0, acquires)) {
                    //将exclusiveOwnerThread属性设置为当前线程
                        setExclusiveOwnerThread(current);
                        //返回true
                        return true;
                    }
                }
                //说明state肯定不为0,不为0就代表当前lock被线程占用
                //判断占用的线程是不是当前线程
                else if (current == getExclusiveOwnerThread()) {
                   	//锁重入操作
                   	//对state+1
                    int nextc = c + acquires;
                    //判断锁重入是否已经达到最大值
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                        //将AQS的state设置好
                    setState(nextc);
                    返回true
                    return true;
                }
                //返回false
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 公平锁的具体实现
    protected final boolean tryAcquire(int acquires) {
    			//获取到当前线程
                final Thread current = Thread.currentThread();
                //获取state
                int c = getState();
                //没有线程占用资源
                if (c == 0) {
                	//首先查看有没有线程排队
                	//如果没有排队则尝试获取锁资源
                    if (!hasQueuedPredecessors() && 
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //查看是否是锁重入操作
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    公平锁和非公平锁的tryAcquire方法的唯一区别就是,当判断state为0之后。

    • 公平锁会先查看是否有线程正在排队,如果有,直接返回false,如果没有线程排队,执行CAS尝试获取锁资源。
    • 非公平锁不管有没有线程排队,直接以CAS的方式尝试获取锁资源。

    ReentrantLock的addWaiter方法

    在线程执行tryAcquire方法没有获取到锁资源之后,会返回false,再配置上if中的!操作,会执行&&后面的方法,而在acquireQueued(addWaiter(Node.EXCLUSIVE),arg)的参数中执行了addWaiter,要将当前获取锁失败的线程封装为Node,排队到队列中。

    //获取锁失败,封装Node,排队到AQS的队列中
     private Node addWaiter(Node mode) {  
     		//将线程封装为Node对象
            Node node = new Node(Thread.currentThread(), mode);
            //获取到tail节点,pred
            Node pred = tail;
           // 如果tail节点不为空
            if (pred != null) {
           // 将当前节点的prev指向tail
                node.prev = pred;
              //  为了避免并发问题,以CAS的方式将tail指向当前线程
                if (compareAndSetTail(pred, node)) {
                  // 将之前的tail的next指向当前节点
                    pred.next = node;
                   // 返回当前节点
                    return node;
                }
            }
            //如果队列为空,或者CAS操作失败后,会执行enq方法,将当前node排到队列的末尾
            enq(node);
            return node;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    //enq方法,传入的node就是当前节点
      private Node enq(final Node node) {
      		//死循环
            for (;;) {
            //获取tail节点
                Node t = tail;
                if (t == null) { // Must initialize
                //如果队列为空,先初始化head节点,作为头
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                //此时,队列肯定不为空,采用之前的逻辑将当前节点插入到队列的末尾作为tail,循环到插入成功为止
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    整体逻辑为,先初始化Node节点,将当前线程传入,并且标识为互斥锁。尝试将当前Node插入到AQS队列的末尾

    • 队列为空:执行enq,先初始化空Node作为头,然后再将当前Node插入,作为tail
    • 队列不为空:直接将当前Node插入,作为tail

    ReentrantLock的acquireQueue方法

    首先查看当前node是否排在队列的第一个位置(不算head),直接再次执行tryAcquire方法竞争锁资源,尝试将当前线程挂起,最终排在有效节点后,才会将当前线程挂起。

    	//队伍前面,竞争锁资源,队伍非前面,挂起线程
       final boolean acquireQueued(final Node node, int arg) {
       		//竞争锁资源失败
            boolean failed = true;
            try { 
            	//线程中断标识
                boolean interrupted = false;
                //死循环
                for (;;) {
                	//predecessor就是获取当前节点的上一个节点
                    final Node p = node.predecessor();
                    //如果当前节点的上一个节点是head,如果是就执行tryAcquire方法竞争锁资源
                    if (p == head && tryAcquire(arg)) {
                    //竞争锁资源成功,进入当前业务代码...
                    //因为当前线程已经拿到锁资源就将当前线程设置为head,并且将node中的prev和thread置为空
                        setHead(node);
                        将之前的头结点置为空,让GC将之前的head回收掉
                        p.next = null; // help GC
                        //将获取锁失败的标识置为false
                        failed = false;
                        //返回中断标识,默认情况为false
                        return interrupted;
                    }
                    //如果当前node的上一个节点不是head则将当前线程尝试挂起
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    //判断当前线程是否可以挂起  
     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     		//拿到上一个节点的状态有1,-1,-2,-3
            int ws = pred.waitStatus;
            //如果ws为-1,直接返回true,当前节点可以挂起线程
            if (ws == Node.SIGNAL)
                return true;
                //如果ws>0,说明肯定是cancelled(1)状态,绕过这个节点,找上一个节点的上一个
            if (ws > 0) {
               //循环,直到找到一个节点的状态是<=0的节点
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                //找到的节点的状态可能是-2,-3,需要将这种状态的节点以CAS的方式改变状态为signal
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    //找到上一个节点的状态是正常的,就可以调用当前方法将线程挂起
     private final boolean parkAndCheckInterrupt() {
     	//直接使用unsafe类的park方法挂起线程
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    ReentrantLock的unlock方法

    unlock释放锁操作不分为公平和非公平,都是执行sync的release方法
    释放锁的核心,就是将state从大于0的数值更改为0即为释放锁成功
    并且unlock方法应该会涉及到将AQS队列中阻塞的线程进行唤醒,阻塞用的是park方法,唤醒必然是unpark方法

    public void unlock() {
    		//每次只释放1
            sync.release(1);
        }
    
    • 1
    • 2
    • 3
    • 4

    ReentrantLock的release方法

    在释放锁时,只有state被减为0之后,才会去唤醒AQS队列中被挂起的线程
    在唤醒挂起线程时,如果head的next状态不正确,会从后往前找到离head最近的节点进行唤醒
    为什么从后往前找?(addWaiter是先将prev指针赋值,最后才会将上一个节点的next指针赋值,为了避免丢失节点或者跳过节点,必须从后往前找! )

    //释放锁操作
     public final boolean release(int arg) {
     		//先查看tryRelease方法
            if (tryRelease(arg)) {
            	//释放锁成功,进行后续处理
                Node h = head;
               // 如果head不为nul1,并且当前head的状态不为0
                if (h != null && h.waitStatus != 0)
                	 //说明AQS的队列中,有Node在排队,并且线程已经挂起了!
                	 //需要唤醒被挂起的Node
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
      protected final boolean tryRelease(int releases) {
      		//直接获取state,并且- releases。将state - 1
           int c = getState() - releases;
           //如果释放锁的线程,不是占用锁的线程,直接抛出异常。  
           if (Thread.currentThread() != getExclusiveOwnerThread())
               throw new IllegalMonitorStateException();
           //声明了一个标识。
           boolean free = false;
           //判断state - 1后是否为0
           if (c == 0) {
           		//如果为0,锁资源释放掉了。
               free = true;
               //将占用互斥锁的线程标识置位null
               setExclusiveOwnerThread(null);
           }
           //锁之前重入了,一次没释放掉,将c赋值给state,等待下次再次执行时,再次判断
           setState(c);
           return free;
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    // 唤醒AQS中被挂起的线程
    private void unparkSuccessor(Node node) {
           //获取head的状态
            int ws = node.waitStatus;
            if (ws < 0)
            	//将当前head的状态设置为0
                compareAndSetWaitStatus(node, ws, 0);
    		//拿到next节点
            Node s = node.next;
            //如果下一个节点为null,或者状态为CANCEL,需要找到离head节点最近的有效Node
            if (s == null || s.waitStatus > 0) {
                s = null;
                //从后往前找这个节点(为什么从后往前找,需要查看addwaiter的内容,)
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
           // 找到最近的node后,直接唤醒
            if (s != null)
            //唤醒线程
                LockSupport.unpark(s.thread);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
  • 相关阅读:
    从IPC到分布式软总线的随笔
    【python画画】蘑菇云爱心
    Java—Collection
    20220918
    【MM小贴士】SAP创建成本中心采购订单带出默认会计科目和成本中心
    太空射击第10课: Score (繪畫和文字)
    3D视觉 之 线激光3D相机
    【人工智能】MindSpore Hub
    1个G的视频能压缩到几M?视频过大这样压缩
    端口映射与容器互联
  • 原文地址:https://blog.csdn.net/xiaowanziddd/article/details/126215598