Java中提供的锁:synchronized,lock锁
ReentrantLock就是一个互斥锁,可以让多线程执行期间,只有一个线程在执行指定的一段代码。
使用方式:
我们要分析的是:为什么在多个线程进行竞争的时候lock()可以加锁,只让一个线程进入,为什么在线程执行完业务逻辑的时候释放锁时,别的线程才可以竞争锁资源。
一、简单分析
进入到lock方法后,内部调用了sync.lock()方法, 去找方法的实现时,发现了两个实现:
如果需要使用公平锁
需要在构造函数中传入参数true
,默认是使用的非公平锁
更推荐使用非公平锁,非公平锁的效率比公平锁更高。
从源码的角度也发现了公平锁直接调用acquire方法尝试获取锁,
而非公平锁会先基于CAS的方式尝试获取锁资源,如果获取不到,才会执行acquire方式尝试获取锁
二、分析AQS
AQS就是AbstractQueuedSynchronizer类,AQS内部维护这一个队列
还有三个核心属性:state,head,tail
三、lock方法源码
final void lock() {
acquire(1);//调用acquire方法尝试获取锁
}
final void lock() {
//以CAS的方式,尝试将state从0改为1
if (compareAndSetState(0, 1))
//证明修改state成功,也就代表获取锁资源成功。
//将当前线程设置到AQS中的exclusiveOwnerThread(AOS中),代表当前线程拿着锁资源(和后面的可重入锁有关)
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);//如果获取不到,才会执行acquire方式尝试获取锁
}
//公平锁还是非公平锁都会调用当前的acquire方法
public final void acquire(int arg) {
//tryAcquire方法,分为两种实现,一种是公平锁,一种是非公平锁
//公平锁:如果state为0,再看看是否有线程排队,如果有就去排队,如果是锁重入的操作,直接获取锁。
//非公平锁:如果state为0,直接尝试cas修改,如果是锁重入操作,直接获取锁。
if (!tryAcquire(arg) &&
//addWaiter方法:在线程没有通过tryAcquire拿到锁资源时,需要将当前线程封装为node对象,去AQS内部排队
//acquireQueued方法:查看当前线程是否是排在队列前面,如果是就尝试获取锁资源。如果长时间没拿到锁,也需要将当前线程挂起
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法是AQS提供的,内部并没有任何的实现,需要继承AQS的类自己去实现逻辑代码查看到tryAcquire在ReentrantLock中提供了两种实现:公平锁、非公平锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
//非公平锁的实现
final boolean nonfairTryAcquire(int acquires) {
//拿到了当前线程
final Thread current = Thread.currentThread();
//拿到AQS的state值
int c = getState();
if (c == 0) {
//直接基于CAS的方式,尝试修改state,从0——1,如果成功就代表拿到锁资源
if (compareAndSetState(0, acquires)) {
//将exclusiveOwnerThread属性设置为当前线程
setExclusiveOwnerThread(current);
//返回true
return true;
}
}
//说明state肯定不为0,不为0就代表当前lock被线程占用
//判断占用的线程是不是当前线程
else if (current == getExclusiveOwnerThread()) {
//锁重入操作
//对state+1
int nextc = c + acquires;
//判断锁重入是否已经达到最大值
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//将AQS的state设置好
setState(nextc);
返回true
return true;
}
//返回false
return false;
}
protected final boolean tryAcquire(int acquires) {
//获取到当前线程
final Thread current = Thread.currentThread();
//获取state
int c = getState();
//没有线程占用资源
if (c == 0) {
//首先查看有没有线程排队
//如果没有排队则尝试获取锁资源
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//查看是否是锁重入操作
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁和非公平锁的tryAcquire方法的唯一区别就是,当判断state为0之后。
在线程执行tryAcquire方法没有获取到锁资源之后,会返回false,再配置上if中的!操作,会执行&&后面的方法,而在acquireQueued(addWaiter(Node.EXCLUSIVE),arg)的参数中执行了addWaiter,要将当前获取锁失败的线程封装为Node,排队到队列中。
//获取锁失败,封装Node,排队到AQS的队列中
private Node addWaiter(Node mode) {
//将线程封装为Node对象
Node node = new Node(Thread.currentThread(), mode);
//获取到tail节点,pred
Node pred = tail;
// 如果tail节点不为空
if (pred != null) {
// 将当前节点的prev指向tail
node.prev = pred;
// 为了避免并发问题,以CAS的方式将tail指向当前线程
if (compareAndSetTail(pred, node)) {
// 将之前的tail的next指向当前节点
pred.next = node;
// 返回当前节点
return node;
}
}
//如果队列为空,或者CAS操作失败后,会执行enq方法,将当前node排到队列的末尾
enq(node);
return node;
}
//enq方法,传入的node就是当前节点
private Node enq(final Node node) {
//死循环
for (;;) {
//获取tail节点
Node t = tail;
if (t == null) { // Must initialize
//如果队列为空,先初始化head节点,作为头
if (compareAndSetHead(new Node()))
tail = head;
} else {
//此时,队列肯定不为空,采用之前的逻辑将当前节点插入到队列的末尾作为tail,循环到插入成功为止
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
整体逻辑为,先初始化Node节点,将当前线程传入,并且标识为互斥锁。尝试将当前Node插入到AQS队列的末尾
首先查看当前node是否排在队列的第一个位置(不算head),直接再次执行tryAcquire方法竞争锁资源,尝试将当前线程挂起,最终排在有效节点后,才会将当前线程挂起。
//队伍前面,竞争锁资源,队伍非前面,挂起线程
final boolean acquireQueued(final Node node, int arg) {
//竞争锁资源失败
boolean failed = true;
try {
//线程中断标识
boolean interrupted = false;
//死循环
for (;;) {
//predecessor就是获取当前节点的上一个节点
final Node p = node.predecessor();
//如果当前节点的上一个节点是head,如果是就执行tryAcquire方法竞争锁资源
if (p == head && tryAcquire(arg)) {
//竞争锁资源成功,进入当前业务代码...
//因为当前线程已经拿到锁资源就将当前线程设置为head,并且将node中的prev和thread置为空
setHead(node);
将之前的头结点置为空,让GC将之前的head回收掉
p.next = null; // help GC
//将获取锁失败的标识置为false
failed = false;
//返回中断标识,默认情况为false
return interrupted;
}
//如果当前node的上一个节点不是head则将当前线程尝试挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//判断当前线程是否可以挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//拿到上一个节点的状态有1,-1,-2,-3
int ws = pred.waitStatus;
//如果ws为-1,直接返回true,当前节点可以挂起线程
if (ws == Node.SIGNAL)
return true;
//如果ws>0,说明肯定是cancelled(1)状态,绕过这个节点,找上一个节点的上一个
if (ws > 0) {
//循环,直到找到一个节点的状态是<=0的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//找到的节点的状态可能是-2,-3,需要将这种状态的节点以CAS的方式改变状态为signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//找到上一个节点的状态是正常的,就可以调用当前方法将线程挂起
private final boolean parkAndCheckInterrupt() {
//直接使用unsafe类的park方法挂起线程
LockSupport.park(this);
return Thread.interrupted();
}
unlock释放锁操作不分为公平和非公平,都是执行sync的release方法
释放锁的核心,就是将state从大于0的数值更改为0即为释放锁成功
并且unlock方法应该会涉及到将AQS队列中阻塞的线程进行唤醒,阻塞用的是park方法,唤醒必然是unpark方法
public void unlock() {
//每次只释放1
sync.release(1);
}
在释放锁时,只有state被减为0之后,才会去唤醒AQS队列中被挂起的线程
在唤醒挂起线程时,如果head的next状态不正确,会从后往前找到离head最近的节点进行唤醒
为什么从后往前找?(addWaiter是先将prev指针赋值,最后才会将上一个节点的next指针赋值,为了避免丢失节点或者跳过节点,必须从后往前找! )
//释放锁操作
public final boolean release(int arg) {
//先查看tryRelease方法
if (tryRelease(arg)) {
//释放锁成功,进行后续处理
Node h = head;
// 如果head不为nul1,并且当前head的状态不为0
if (h != null && h.waitStatus != 0)
//说明AQS的队列中,有Node在排队,并且线程已经挂起了!
//需要唤醒被挂起的Node
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//直接获取state,并且- releases。将state - 1
int c = getState() - releases;
//如果释放锁的线程,不是占用锁的线程,直接抛出异常。
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//声明了一个标识。
boolean free = false;
//判断state - 1后是否为0
if (c == 0) {
//如果为0,锁资源释放掉了。
free = true;
//将占用互斥锁的线程标识置位null
setExclusiveOwnerThread(null);
}
//锁之前重入了,一次没释放掉,将c赋值给state,等待下次再次执行时,再次判断
setState(c);
return free;
}
// 唤醒AQS中被挂起的线程
private void unparkSuccessor(Node node) {
//获取head的状态
int ws = node.waitStatus;
if (ws < 0)
//将当前head的状态设置为0
compareAndSetWaitStatus(node, ws, 0);
//拿到next节点
Node s = node.next;
//如果下一个节点为null,或者状态为CANCEL,需要找到离head节点最近的有效Node
if (s == null || s.waitStatus > 0) {
s = null;
//从后往前找这个节点(为什么从后往前找,需要查看addwaiter的内容,)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到最近的node后,直接唤醒
if (s != null)
//唤醒线程
LockSupport.unpark(s.thread);
}