ReentrantLock,意思是“可重入锁”,关于可重入锁的概念在下一节讲述。ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法。
基本语法上,ReentrantLock与synchronized很相似,它们都具备一样的线程重入特性,只是代码写法上有点区别而已。一个表现为API层面的互斥锁(Lock),一个表现为原生语法层面的互斥锁(synchronized)。
ReentrantLock相对synchronized而言还是增加了一些高级功能,主要有以下三项:
ReentrantLock
对象可以同时绑定多个Condition
对象(条件变量或条件队列),而在synchronized中,锁对象的 wait()和notify()或notifyAll()方法可以实现一个隐含条件,但如果要和多于一个的条件关联的时候,就不得不额外地添加一个锁,而ReentrantLock则无需这么做,只需要多次调用newCondition()方法即可。而且我们还可以通过绑定Condition对象来判断当前线程通知的是哪些线程(即与Condition对象绑定在一起的其它线程)。reentrantlock 主要有两个构造器 :
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
分别由非公平锁NofairSync
(默认) 和 公平锁 FairSync
来实现的
非公平锁 :
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
非公平锁的代码如上面所示, NonfairSync 是 ReentrantLock的内部静态类, 并且添加了 final 关键字, 防止其他类去继承 NofairSync
**如果一个类要被声明为static的,只有一种情况,就是静态内部类。**如果在外部类声明为static,程序会编译都不会过 :
1.静态内部类跟静态方法一样,只能访问静态的成员变量和方法,不能访问非静态的方法和属性,但是普通内部类可以访问任意外部类的成员变量和方法
2.静态内部类可以声明普通成员变量和方法,而普通内部类不能声明static成员变量和方法。
3.静态内部类可以单独初始化
NofairSync的继承关系如上图所示, 可以看到 NofairSync 继承了 Sync , Sync 继承了AQS, 所以可以使用AQS的提供的加锁/解锁方法
如图所示 :
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
exclusiveOwnerThread
是 ReentrantLock
的一个字段, 对象引用指向获取锁的线程
Thread-1 :
addwaiter的逻辑是创建一个Node类对象, 然后插入到队列中, 队列使用双向链表实现
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
...
}
源码如下 :
// ReentrantLock -> lock()
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AbstractQueuedSynchronizer -> acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// AbstractQueuedSynchronizer -> acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
如果加锁成功(没有竞争),会设置 :
非公平锁的体现
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加锁实现
final void lock() {
// 尝试获取锁
if (compareAndSetState(0, 1))
// 获取锁成功. 设置当前线程为锁拥有的线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果尝试失败,进入 acquire [1]
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// [1]AQS 继承过来的方法
public final void acquire(int arg) {
// [2]tryAcquire 再次尝试获取锁
if (!tryAcquire(arg) &&
// 如果 tryAcquire失败, 就执行 addWaiter[4] , 再执行 acquireQueued[5]
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
选中方法, ctrl + alt + b
即可查看 这个方法的所有实现的类 NonfairSync -> tryAcquire
// NonfairSync -> tryAcquire [2] -> [3]
protected final boolean tryAcquire(int acquires) {
// Sync -> nonfairTryAcquire
return nonfairTryAcquire(acquires);
}
AbstractQueuedSynchronizer ==> Sync -> nonfairTryAcquire
// [3] Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果还没有获得锁
if (c == 0) {
// 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败, 回到调用处
return false;
}
// [4]AQS 继承过来的方法, 方便阅读, 放在此处
private Node addWaiter(Node mode) {
// 将当前线程关联到一个 Node 对象上, 模式为独占模式
Node node = new Node(Thread.currentThread(), mode);
// 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 双向链表
pred.next = node;
return node;
}
}
// 尝试将 Node 加入 AQS, 进入 [6]
enq(node);
return node;
}
// AQS 继承过来的方法, 方便阅读, 放在此处
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 判断尾部是否为 null ,如果是 就创建一个虚拟节点, head -> node(null)
if (t == null) { // Must initialize
// 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// cas 尝试将 Node 对象加入 AQS 队列尾部
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// AQS 继承过来的方法, 方便阅读, 放在此处
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取锁
if (p == head && tryAcquire(arg)) {
// 初始状态 head -> node(null) -> node(thred = thread-1) -> node(thred = thread-2)
// head -> node(thred = thread-1) -> node(thred = thread-2) , node(null) 被 GC 回收
// head -> node(thred = null) -> node(thred = thread-2) , thread-1 已经获取锁了, 对应的node 被设置为 null
// 获取成功, 设置自己(当前线程对应的 node)为 head
setHead(node);
p.next = null; // help GC
failed = false;
// 返回中断标记 false
return interrupted;
}
// 判断是否应当 park, 进入 [7]
if (shouldParkAfterFailedAcquire(p, node) &&
// park 等待, 此时 Node 的状态被置为 Node.SIGNAL [8]
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
[7]shouldParkAfterFailedAcquire
// AQS 继承过来的方法, 方便阅读, 放在此处
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 上一个节点都在阻塞, 那么自己也阻塞好了
return true;
// > 0 表示取消状态 , 取消状态就是当前线程不再等待锁, 取消等待锁的状态了
if (ws > 0) {
// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这次还没有阻塞
// 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL
来决定,而不是本节点的waitStatus 决定
Sync 继承自 AQS
public class ReentrantLock implements Lock, java.io.Serializable {
// 解锁实现
public void unlock() {
sync.release(1);
}
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean release(int arg) {
// 尝试释放锁, 进入 [1]
if (tryRelease(arg)) {
// 从队列头节点 开始 unpark
Node h = head;
// 队列不为 null 并且 waitStatus == Node.SIGNAL(-1) 才需要 unpark
if (h != null && h.waitStatus != 0)
// unpark AQS 中等待的线程, 进入[2]
unparkSuccessor(h);
return true;
}
return false;
}
waitStatus == Node.SIGNAL(-1)
)的线程 , 最后返回 true
// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
// 判断当前线程是否是获取锁的那个线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// c == 0 是为了
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
// 如果状态为 Node.SIGNAL 尝试重置状态为 0
// 不成功也可以
int ws = node.waitStatus;
if (ws < 0) // ws = Node.SIGNAL(-1)
// 重置状态state
compareAndSetWaitStatus(node, ws, 0);
// 找到需要 unpark 的节点(state==-1), 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
Node s = node.next; // head-> node(null) -> node(thread-1)
// waitStatus > 0 表示取消状态
if (s == null || s.waitStatus > 0) {
s = null;
// 不考虑已取消的节点(), 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
for (Node t = tail; t != null && t != node; t = t.prev)
// waitStatus <= 0 阻塞
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
LockSupport.unpark(s.thread);
static final class NonfairSync extends Sync {
// Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 没有其他人获取锁的情况
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
int nextc = c + acquires;
, 表示重入锁的次数 setState(nextc)
// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
// 判断当前线程是否是获取锁的那个线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// c == 0 是为了
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// 从AQS继承来的方法
private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是 true, 则 park 会失效
LockSupport.park(this);
// interrupted 会清除打断标记
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 1. 先获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 2. 返回打断状态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 返回的 interrupted = true
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果打断的状态为 true
selfInterrupt();
}
static void selfInterrupt() {
// 重新产生一次中断
Thread.currentThread().interrupt();
}
}
static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 如果没有获得到锁, 进入 [1]
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 在 park 过程中如果被 interrupt 会进入此
parkAndCheckInterrupt())
// 这时候抛出异常, 而不会再次进入 for (;;)
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
注意 , 与不可打断模式, 这里是设置 打断标记为true, 会继续进行for循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
可打断模式 : 会直接抛出异常
if (shouldParkAfterFailedAcquire(p, node) &&
// 在 park 过程中如果被 interrupt 会进入此
parkAndCheckInterrupt())
// 这时候抛出异常, 而不会再次进入 for (;;)
throw new InterruptedException();
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h != t 说明 队列中有 等待的线程
return h != t &&
// h.next == null 说明没有 等待的线程
// h = head -> node(null) -> node(thread1)
// 判断当前线程是否是第二个节点(第一个为head)对应的线程
((s = h.next) == null || s.thread != Thread.currentThread());
}
}
hasQueuedPredecessors() = false
, !hasQueuedPredecessors() = true
acquire(1) -> tryAcquire(acquires = 1)
// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
hasQueuedPredecessors :
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h != t 说明 队列中有 等待的线程
return h != t &&
// h.next == null 说明没有 等待的线程
// h = head -> node(null) -> node(thread1)
// 判断当前线程是否是第二个节点(第一个为head)对应的线程
((s = h.next) == null || s.thread != Thread.currentThread());
}
hasQueuedPredecessors() = false
, !hasQueuedPredecessors() = true
acquire(1) -> tryAcquire(acquires = 1)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
让当前持有锁的线程 thread-1 wait , 进入Condition等待队列
让NonfairSync同步器释放锁 :
唤醒 AQS等待队列中优先级最高的节点 , 让其竞争锁
阻塞 thread-1 (unpark)
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
// 进入 fullyRelease 流程
// fullRelease
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 是当前线程处于等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
假设 Thread-1 要来唤醒 Thread-0