全称是 AbstractQueuedSynchronizer,是同步器的相关框架,juc中很多锁的实现类依赖同步器(AQS的子类)完成核心操作
子类主要实现的方法
acquire(AQS已实现)
// 如果获取锁失败
if (!tryAcquire(arg)) {
// 入队, 可以选择阻塞当前线程 park unpark
}
**release(AQS已实现)
// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}
自定义同步器
class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0, 1)) {
// 加上了锁,并设置 owner 为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override // 是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//cas设置state成功,则设置当前线程为OwnerThread
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else //否则执行acquire进入阻塞队列
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
非公平性:虽然等待队列是先来先服务的,但是当锁被释放时,等待队列中的线程仍可能与队列外的线程去竞争锁,可能会竞争失败
注意:等待队列的head指向的要么是正在运行的线程所在节点,要么是dummy结点(不含线程)
非公平性具体体现在acquireQueued方法中
加锁
public final void acquire(int arg) {
//1.尝试获取锁,获取成功则结束
//2.获取失败则构建Node,关联当前线程,将其添加到等待队列中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前节点的前驱节点
final Node p = node.predecessor();
//前驱节点若为头节点则重试获取锁,获取成功则将此节点设为头节点,并将前驱节点移除
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//首次修改前驱节点的waitStatus=-1,返回false
//再次执行时,waitStatus已经为-1,返回true,此时执行parkAndCheckInterrupt()中断当前线程
//当线程恢复时从这开始执行,继续循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//将node添加至队尾
enq(node);
return node;
}
解锁
public final boolean release(int arg) {
//1.尝试释放锁,释放成功则将state设置为0并返回true
if (tryRelease(arg)) {
//2.找到头结点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取当前节点的后继节点
Node s = node.next;
//后继节点若为空或者后继节点的waitStatus>0则遍历队列直到遇到第一个waitStatus<=0的节点或者遍历到队尾
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//恢复s节点关联线程的运行
if (s != null)
LockSupport.unpark(s.thread);
}
进行加锁时,首先判断state是否为0,若不为0则说明该锁已经被占用,判断占用锁的线程是否为当前线程,若是则将state+1,返回true,不是则返回false。同理释放锁的时候会对判断占用锁的线程是否为当前线程,若是则将state-1,state=0表示释放成功返回true,否则返回false
static final class NonfairSync extends Sync {
// ...
// Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
当我们的线程被打断后会重新进入循环,如果当前线程位于等待队列的第一个则会尝试获取锁,获取失败则再次进入阻塞,若不是队列第一个则直接进入阻塞状态,代码见acquireQueued()
简单说就是不可打断模式下,若阻塞的线程被打断会再次进入阻塞队列
NoneFairSyn
private final boolean parkAndCheckInterrupt() {
//1.将当前线程阻塞在此,若被打断才会往下执行
LockSupport.park(this);
//2.若被打断则返回true,同时清楚interrupted标记
//若是被正常恢复则返回false
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前节点的前驱节点
final Node p = node.predecessor();
//前驱节点若为头节点则重试获取锁,获取成功则将此节点设为头节点,并将前驱节点移除
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//首次修改前驱节点的waitStatus=-1,返回false
//再次执行时,waitStatus已经为-1,返回true,此时执行parkAndCheckInterrupt()中断当前线程
//当线程恢复时从这开始执行,继续循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//若被打断则将interrupted置为true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
//1.尝试获取锁,获取失败则加入阻塞队列
//2.从阻塞队列出来后,返回interupted(该interrupted是方法内部的变量,不是线程的标记)
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//若interrupted=true,则执行selfInterrupt()重新将线程的interrupted设为true
selfInterrupt();
}
若阻塞的线程被打断则直接抛出异常,不继续在等待队列进行等待
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//1.尝试获取锁
if (!tryAcquire(arg))
//2.获取失败则进入阻塞队列,但是是不可打断模式
//该方法与acquireQueued()基本一致,只是对于打断处理不同,如果被打断则抛出异常退出等待队列
doAcquireInterruptibly(arg);
}
公平锁采用的是FairSync,与NoneFairSync的区别主要在于tryAcquire的不同,在竞争锁时先要判断队列中是否存在线程等待,若不存在或者等待队列中第一个线程就是当前线程时获取成功,否则进入等待队列
FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//1. 判断队列是否为空 || 队列中第一个待恢复的线程是否为当前线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
acquire与NoneFairSync一致,都使用的是AQS的方法
每个条件变量都维护了一条等待队列,如图所示
它将state置为0后park该线程并设置ws=-2,将该线程添加到此条件变量的等待队列中
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//创建node关联当前线程,并将node添加到等待队列队尾
Node node = addConditionWaiter();
//释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//park当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
从头遍历等待队列,直到遇见第一个ws=-2的节点,将该节点移出等待队列并添加到阻塞队列尾部
public final void signal() {
//1.校验当前线程是否为持有锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//2.得到头结点
Node first = firstWaiter;
if (first != null)
//3.signal头节点
doSignal(first);
}
//将头节点移出等待队列
//如果头结点的waitStatus>0则一直移出头结点
//直到遇到ws<0的节点,将其放入阻塞队列
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}