• ReentarantLock源码浅析


    ReentrantLock

    ReentrantLock: 就是一个互斥锁,可以让多线程执行期间,只有一个线程在执行指定的一段代码
    使用方式: 直接new

            ReentrantLock reentrantLock = new ReentrantLock();
            //加锁
            reentrantLock.lock();
            try{
               //业务代码......
            } finally {
                //释放锁
                reentrantLock.unlock();
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    lock方法

    public void lock() {
    	sync.lock();
    }
    
    • 1
    • 2
    • 3

    sync.lock();有两个实现

    1 FairSync: 公平锁
    每哥线程会在执行lock方法时,会先查看是否有线程排队,如果有,直接去排队,没有会去尝试竞争一下锁资源
    2 NonfairSync: 非公平锁
    每个线程会执行lock方法时,先尝试获取,获取不到再排队

    如果要使用公平锁需要在构建时 new ReentarntLock(true);
    要使用非公平锁直接使用无参构造.

    从源码角度,公平锁直接调用 acquire 方法尝试获取锁,
    而非公平锁会先基于 CAS的方式尝试获取锁资源,如果获取不到,再调用 acquire方法 尝试获取锁

    分析 AQS

    AQS 就是 AbstractQueuedSynchronizer 内部维护了一个双向链表
    Node head:
    Node tail;
    int state;
    Thread thread;

    线程在竞争锁资源时,通过CAS将state从0设置为1,即代表竞争锁资源成功

    非公平锁 的lock方法

    final void lock() {
        // 以CAS的方式,尝试将 state从0变成1
    	if (compareAndSetState(0, 1))
    	    //证明修改state状态成功,也就代表获取锁资源成功
    		//将当前线程设置到AQS中的 exclusiveOwnerThread,代表当前线程拿着锁资源(和后面的可重入锁有关)
    		setExclusiveOwnerThread(Thread.currentThread());
    	else
    		acquire(1);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    公平锁 的lock方法

    final void lock() {
    	acquire(1);
    }
    
    • 1
    • 2
    • 3

    无论公平锁还是非公平锁都会调用 acquire方法

    acquire

    public final void acquire(int arg) {
        // tryAcquire 分为公平锁实现和非公平锁实现
    	//公平锁操作: 如果state为0 再看有线程排队,我就去排队,如果是锁重入的操作(如果是自己拿着锁资源),直接获取锁
    	//非公平锁:   如果state为0 如果为0,直接尝试CAS修改,如果是锁重入的操作(如果是自己拿着锁资源),直接获取锁
    	if (!tryAcquire(arg) &&
    	    // addWaiter方法,在线程没有通过 tryAcquire 拿到锁资源时,需要将当前线程封装为Node对象,去AQS内部排队
    		// acquireQueued方法: 查看当前线程是否在队列前面的,是就尝试获取锁资源,长时间没拿到锁,就将当前线程挂起
    		acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    		selfInterrupt();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    tryAcquire

    tryAcquire 方法是AQS提供的,内部并没有任何实现,需要继承

    非公平锁实现
    final boolean nonfairTryAcquire(int acquires) {
        // 拿到当前线程
    	final Thread current = Thread.currentThread();
    	// 拿到 AQS的state值
    	int c = getState();
    	// 如果为0 就代表当前没有线程占用锁资源
    	if (c == 0) {
    	    //直接基于 CAS 的方式,尝试修改 state, 从0~1,如果成功就代表拿到锁资源
    		if (compareAndSetState(0, acquires)) {
    		    // 将 exclusiveOwnerThread 设置为当前线程
    			setExclusiveOwnerThread(current);
    			return true;
    		}
    	}
    	// 说明state不为0,表示当前lock被线程占用
    	//判断占用锁资源的线程是不是当前线程
    	else if (current == getExclusiveOwnerThread()) {
    	    // 锁重入
    		// 对state +1
    		int nextc = c + acquires;
    		// 判断锁重入是否到最大值
    		if (nextc < 0) // overflow
    			throw new Error("Maximum lock count exceeded");
    		// 将 AQS的state 设置为当前值
    		setState(nextc);
    		return true;
    	}
    	//表示没有拿到锁资源
    	return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    公平锁的实现
    protected final boolean tryAcquire(int acquires) {
        //获取当前线程
    	final Thread current = Thread.currentThread();
    	//获取 state
    	int c = getState();
    	// 没有线程占用资源
    	if (c == 0) {
    	    // 首先看有没有线程排队
    		if (!hasQueuedPredecessors() &&
    		    //如果没有线程排队, CAS 尝试获取锁资源
    			compareAndSetState(0, acquires)) {
    			setExclusiveOwnerThread(current);
    			return true;
    		}
    	}
    	//有线程占用,判断是否是锁重入
    	else if (current == getExclusiveOwnerThread()) {
    		int nextc = c + acquires;
    		if (nextc < 0)
    			throw new Error("Maximum lock count exceeded");
    		setState(nextc);
    		return true;
    	}
    	//返回false,去排队
    	return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    公平锁和非公平锁的 tryAcquire方法的唯一区别就是,当判断state为0之后
    公平锁会先查看是否有线程正在排队,如果有,直接返回false,如果没有尝试CAS获取锁资源
    非公平锁不管有没有线程排队,直接以CAS尝试获取锁资源
    state不为0二者都会判断是不是锁重入操作

    在线程执行 tryAcquire方法没有获取到锁资源后,会返回false,在配置上if中的操作,
    会执行 && 后面的方法,而在 acquireQueued(addWaiter(Node.EXCLUSIVE),arg)的参数中执行了 addWaiter,
    要将当前获取锁失败的线程封装成Node, 排到AQS队列中

    addWater
    private Node addWaiter(Node mode) {
        // 将当前线程封装成Node对象
    	Node node = new Node(Thread.currentThread(), mode);
    	// 获取到tail节点,pred
    	Node pred = tail;
    	//如果 tail不为空
    	if (pred != null) {
    	    // 将当前节点的 prev 只想 tail
    		node.prev = pred;
    		// 为了避免并发问题,以 CAS 方式,将tail 指向当前线程
    		if (compareAndSetTail(pred, node)) {
    		    // 将之前的tail 的next 指向当前节点
    			pred.next = node;
    			// 返回当前节点
    			return node;
    		}
    	}
    	//如果在队列为空,或者CAS失败,会执行 enq ,将当前node 排到队列末尾 
    	enq(node);
    	return node;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    private Node enq(final Node node) {
    	for (;;) {
    	    // 获取tail 节点
    		Node t = tail;
    		if (t == null) { 
    		    // 如果对列为空,先初始化head节点,作为头
    			if (compareAndSetHead(new Node()))
    				tail = head;
    		} else {
    		    // 队列肯定不为空,采用之前的逻辑,知道将node插入到末尾
    			node.prev = t;
    			if (compareAndSetTail(t, node)) {
    				t.next = node;
    				return t;
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    整体逻辑为,先初始化node节点,将当前线程传入,表示为互斥锁
    尝试将当前Node插入到AQS队列末尾
    队列为空,执行enq,先初始化一个Node作为 head,然后再插入当前Node
    队列不为空,直接将Node插入到末尾,作为tail

    acquireQueued

    首先查看当前Node是否排在队列的第一个位置 (不算Head),
    直接再次执行tryAcquire方法竞争锁资源,尝试将当前线程挂起,
    最终排在有效节点后,将当前线程挂起
    //队伍前面竞争锁资源,队伍非前面,挂起线程

    final boolean acquireQueued(final Node node, int arg) {
        //锁资源竞争失败标识
    	boolean failed = true;
    	try {
    	    // 中断标识
    		boolean interrupted = false;
    		for (;;) {
    		    // predecessor就是获取当前节点的上一个节点
    			final Node p = node.predecessor();
    			// 如果当前节点的上一个节点是 head ,就执行 tryAcquire ,尝试获取锁资源
    			if (p == head && tryAcquire(arg)) {
    			    // 竞争锁资源成功,进入业务代码
    				// 因为当前线程已经拿到锁资源,将当前线程的Node置为head,并将Node中的prev和thread置为null
    				setHead(node);
    				//将之前的头节点的next位置置为null,让GC将之前的head回收掉
    				p.next = null; 
    				// 将获取锁资源失败的标志位置为 false
    				failed = false;
    				// 返回中断标识,默认为false
    				return interrupted;
    			}
    			//尝试将当前线程挂起
    			//先判断能不能挂起,不能挂起,尝试让他能挂起
    			if (shouldParkAfterFailedAcquire(p, node) &&
    				parkAndCheckInterrupt())
    				interrupted = true;
    		}
    	} finally {
    	    //在lock时不会触发
    		if (failed)
    			cancelAcquire(node);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    //判断当前线程是否可以挂起
    static final int CANCELLED = 1; //绕过这个节点
    static final int SIGNAL = -1; //标识节点可以被挂起
    static final int CONDITION = -2; //将状态转为SIGNAL,然后挂起
    static final int PROPAGATE = -3; //共享锁

    shouldParkAfterFailedAcquire
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //拿到了上一个节点的状态
    	int ws = pred.waitStatus;
    	//如果 ws 为 -发直接返回true,当前节点可以挂起线程
    	if (ws == Node.SIGNAL)
    		return true;
    	//如果 ws > 0 说明是 CANCELLED状态,绕过这个节点,找上一个节点的上一个
    	if (ws > 0) {
    	    // 循环,直到找到上一个节点为小于等于0的节点
    		do {
    			node.prev = pred = pred.prev;
    		} while (pred.waitStatus > 0);
    		pred.next = node;
    	} else {
    	    // 可能为 0,-2,-3 直接以 CAS 的方式将节点状态改为 -1
    		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    	}
    	return false;
    }
    
    //尝试将当前线程挂起,用 LockSupport
    private final boolean parkAndCheckInterrupt() {
    	LockSupport.park(this);
    	return Thread.interrupted();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    unlock方法

    释放锁不分公平和非公平,都是执行sync的release方法
    释放锁的核心,就是将state从大于0的数值更改为0,即为释放锁成功
    并且unlock 方法应该涉及到将 AQS队列中阻塞的线程唤醒,

    public void unlock() {
    	sync.release(1);//只释放1
    }
    
    • 1
    • 2
    • 3

    release

    在释放锁时,只有state被减为0之后,才回去唤醒AQS队列中被挂起的线程,
    在唤醒挂起线程时,如果head的next状态不正确,会从后往前找到离head最近的节点进行唤醒,
    为什么从后往前找?
    addWaiter方法是先将prev 指针赋值,最后才会将上一个节点的next指针赋值,
    为了避免丢失节点或者跳过节点,必须从后往前找

    public final boolean release(int arg) {
        //直接尝试去释放锁
    	if (tryRelease(arg)) {
    	    // 释放锁成功
    		Node h = head;
    		// 如果head不是null,并且当前head的状态不为0
    		if (h != null && h.waitStatus != 0)
    		    //说明 AQS的队列中有Node在排队,并且线程已经挂起了,需要唤醒它
    			unparkSuccessor(h);
    		return true;
    	}
    	return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    tryRelease

    //尝试释放锁

    protected final boolean tryRelease(int releases) {
        //直接获取 state , 将state -1
    	int c = getState() - releases;
    	// 如果释放锁的线程,不是占用锁的线程,直接抛出异常
    	if (Thread.currentThread() != getExclusiveOwnerThread())
    		throw new IllegalMonitorStateException();
    	boolean free = false;
    	//判断 state 是否为0
    	if (c == 0) {
    	    //锁释放成功
    		free = true;
    		// 将占用互斥锁的线程标志位置为 null
    		setExclusiveOwnerThread(null);
    	}
    	// 锁重入了,一次没释放掉,将c赋值给state,等待下次释放
    	setState(c);
    	return free;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    unparkSuccessor

    //唤醒被挂起的线程

    private void unparkSuccessor(Node node) {
    	//获取head的状态
    	int ws = node.waitStatus;
    	if (ws < 0)
    	    将当前head的状态置为0
    		compareAndSetWaitStatus(node, ws, 0);
    
    	//获取下一个节点
    	Node s = node.next;
    	//如果下一个节点为null,或者状态为 CANCEL ,需要找到离head最近的有效节点
    	if (s == null || s.waitStatus > 0) {
    		s = null;
    		//从后往前找这个节点
    		for (Node t = tail; t != null && t != node; t = t.prev)
    			if (t.waitStatus <= 0)
    				s = t;
    	}
    	//找到最近的Node之后,唤醒它
    	if (s != null)
    		LockSupport.unpark(s.thread);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
  • 相关阅读:
    两数之和-leetcode
    windows 不能ping通虚拟机问题
    「Python入门」Python多线程
    动态域名解析,快解析有哪些优势?
    Effective Objective-C 学习(三)
    浮点数(小数)在计算机中如何用二进制存储?
    HTML 布局
    [PAT练级笔记] 28 Basic Level 1028 人口普查
    一种基于宏和serde_json实现的rust web中统一返回类
    如何从 Ubuntu 卸载 Redis
  • 原文地址:https://blog.csdn.net/Logan_addoil/article/details/128054091