AbstractQueuedSynchronizer是Java的并发基础,想学好Java的并发,还需要先从AQS开始学起,本节参考网络知识,学习AQS的实现原理;
有关CLH知识可以参考:《Building FIFO and Priority-Queuing Spin Locks from Atomic Swap》
AQS是CLH锁的一个变种,了解CLH原理对学习AQS有一定理解上的帮助,CLH队列属于自旋锁的一种,先来看一下CLH队列锁的特点:
参考网络中对CLH队列锁的一种实现:
public class ClhSpinLock implements Lock{
private final ThreadLocal<Node> prev;
private final ThreadLocal<Node> node;
private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());
public ClhSpinLock() {
this.node = new ThreadLocal<Node>() {
protected Node initialValue() {
return new Node();
}
};
this.prev = new ThreadLocal<Node>() {
protected Node initialValue() {
return null;
}
};
}
public void lock() {
final Node node = this.node.get();
node.locked = true;
Node pred = this.tail.getAndSet(node);
this.prev.set(pred);
// 自旋
while (pred.locked);
}
public void unlock() {
final Node node = this.node.get();
node.locked = false;
this.node.set(this.prev.get());
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
private static class Node {
private volatile boolean locked;
}
}
上述代码中使用到了ThreadLocal,可以方便的在加锁时将当前Node存放ThreadLocal,在释放锁时,只需要从ThreadLocal获取当前线程绑定的节点;
通过图解的方式,分析加锁的过程:

通过CAS设置tail节点,如果自旋的前驱节点的locked如果是false,则认为加锁成功,上面的实现方式是不支持锁重入功能的,但是在AQS实现中支持公平/非公平和可重入;

当前线程释放锁时,首先将locked = false表示下个节点可以获取锁并且让当前节点指向前驱节点;
在开始AQS源码之前,先来通过AQS实现一个互斥公平锁:
public class CustomLock implements Lock {
private final Sync sync;
public static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 这里的c为什么先获取出来,为什么不是每次使用的时候都是getState
// 个人理解:它是在获取锁之前的状态,如果在做CAS的时候,拿c去做,结果失败说明被其他线程做了修改
// 如果是compareAndSetState(getState(), acquires),那state即使其他线程做了更新,这里也会CAS成功;
int c = getState();
if (c == 0) {
// 如果cas失败,会返回false,加锁失败;
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 这里setState为什么不是CAS去修改;
// 如果是锁重入的话,说明当前线程已经加到了锁,不会有其他的线程对state做修改,所以这里不会有并发问题;
setState(nextc);
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
// 释放锁时,当前线程已经持有锁,所以不需要防止并发;
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果c!=0说明还有锁重入
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
public Condition newCondition(){
return new ConditionObject();
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
}
public CustomLock() {
sync = new Sync();
}
// 加锁直到获取锁或者当前锁有权利获取锁时被中断;
@Override
public void lock() {
sync.acquire(1);
}
// 可被中断,如果在获取锁的过程中,没有获取需要等待,则可以通过thread.interrupt中断,响应InterruptedException异常
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 尝试加锁,如果获取不到,则返回false
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(timeout));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
可见通过AQS实现一个锁是非常的简单方便,AQS还可以实现共享锁,下面的源码分析中会介绍到;
实现AQS抽象类时,如果是独占模式,只需要实现:
tryAcquire
tryRelease
isHeldExclusively
如果是共享模式,只需要实现:
tryAcquireShared
tryReleaseShared
isHeldExclusively
Node节点主要包含节点的状态,前驱和后继节点,如果有使用Condition还会使用到nextWaiter,在下面会对这些操作进行详细的分析:
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
static final int CANCELLED = 1;
// 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
static final int SIGNAL = -1;
// 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
static final int CONDITION = -2;
// 当前节点获取到锁的信息需要传递给后继节点,共享锁模式使用该值
static final int PROPAGATE = -3;
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 节点绑定的线程
volatile Thread thread;
// Condition条件队列,当使用Condition时它表示后继节点
Node nextWaiter;
// 判断是否是共享锁
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取同步队列的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS中的CLH队列采用双向链表,AQS中维护着head和tail;在AQS中还需要配合着state成员字段来使用;
当只有一个线程加锁时,其实是不需要构建CLH队列;

独占锁主要涉及的方法有:
加锁:
释放锁:
tryAcquire方法在AQS内部是空实现,需要实现方自定义实现逻辑,以上面的自定义实现来说,我们认为getState()==0时,没有线程竞争锁,可以直接加锁成功;
tryAcquire只做一次尝试,如果获取锁失败,则直接返回false;
以ReentrantLock为例,内部实现tryAcquire时有公平锁和非公平锁之后;
独占模式获取,忽略中断。 通过调用至少一次tryAcquire(int)实现,成功返回。 否则线程排队;这个过程中不会被线程中断而结果,只有获取到资源之后才会判断Thread.interrupted(),如果需要中断则再调用Thread.currentThread().interrupt()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// addWaiter 向CLH队列添加节点;
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 当前节点拿到锁,但是被中断了,则调用线程的 Thread.currentThread().interrupt()
selfInterrupt();
}
首先会进行一次tryAcquire,如果失败,则将当前线程封装成Node节点添加到CLH队列;
private Node addWaiter(Node mode) {
// 独占模式节点
Node node = new Node(Thread.currentThread(), mode);
// 如果tail为空null,尝试向队尾添加node,这一步CAS尝试不需要挂起线程;
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果添加失败,进入到enq
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果tail为空,则初始化一个节点,head和tail分别指向它;然后再循环一轮进入else
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 如果CLH队列不为空,则向队尾添加节点
// 添加节点需要维护前驱和后继,这是两步操作,无法保证原子性,这里先保证前驱节点的设置正确;
node.prev = t;
if (compareAndSetTail(t, node)) {
// 如果尾节点设置成功,再配置后继节点,这里在多线程操作的时候,这里如果被挂机,则CLH队列里面只能通过pre从后往前找才靠谱;
t.next = node;
return t;
}
}
}
}
再次判断是否满足条件获取锁,如果不满足,则进入park阻塞,等待被unpark;
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前线程node的前驱节点
final Node p = node.predecessor();
// 如果前驱是head,则再次尝试获取锁,如果获取到,则不需要再进行park;
if (p == head && tryAcquire(arg)) {
// 如果当前节点成功获取锁,则设置当前节点为head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断当前节点获取锁失败之后是否应该阻塞自己,只有当前驱节点waitStatus=-1时,才会返回true;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 这个状态用来标识线程是否被中断,如果被中断,则在acquireQueued返回true,调用处会执行Thread.currentThread().interrupt()
interrupted = true;
}
} finally {
// 如果出现一些异常,没有获取锁就结束了for循环,则将当前节点删除掉;
// 比如 线程被stop,会执行finally内的方法;
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱状态如果是-1,则返回true,添加进来的节点需要通过前驱的状态进行控制,前驱节点状态-1,表示后继节点在等待获取锁;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 如果前驱节点都已经被取消了,则向前找到一个没有被取消的节点,返回false,会重新走调用处的for循环;
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果不是-1 和1 则将其改为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果前驱节点状态是-1,则将当前节点阻塞;
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
前驱节点状态=-1表示后继节点在等待获取锁资源;
这个方法单独拿出来说,是因为它包含了几种场景;
由于不知道在acquireQueued内是什么导致的需要cancelAcquire,也可能是当前节点刚获取到锁,结果被stop,也可能是正在排队获取锁;
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 找到一个waitStatus不为1的前驱
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点的情况下:
// 将找到的前驱设置为尾节点;
// 第一种情况:当前要取消的节点是尾节点,则尾节点前移;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果当前节点不是尾节点
int ws;
// 前驱节点不是头节点,并且(前驱节点的状态为SIGNAL 或者 前驱状态为-1,0,将期改为-1) 并且 前驱线程不为空;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 第二种场景:当前取消节点的前驱不是头节点也不是尾节点;则将前驱节点的next指针指向node.next;
// 这里也有个疑问点,为什么只改了node前驱的next指针,而没有改node.next的pre指针?
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 除此之外,就是需要唤醒后继节点;
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 当node.next不满足被唤醒条件时,则从后往前找满足唤醒条件的节点
Node s = node.next;
// s==null为什么也会进行从后往前找?
// s==null,不能表示后继节点为空,因为在enq添加节点的时候,是先维护前驱节点,再维护后继节点,如果前驱维护成功,后继节点还未进行维护时,线程挂起,这里后继节点就是空,所以需要从后往前找,CLH队列里面前驱节点是靠谱的;
if (s == null || s.waitStatus > 0) {
s = null;
// 找到waitStatus 为 0(默认) 或者 -1的节点;将其唤醒
// 这里为什么是倒着找的呢?
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
上面提到了一个疑问为什么删除节点的时候,只维护node.pre指向node.next,而没有将node.next.pre=node.pre
shouldParkAfterFailedAcquire中有这样一段代码:
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;

这是当Node2被删除后的结构,当Node3拿到资源执行的时候,执行上面这段代码,之后:

这个时候,Node1和Node2的关系已经被得到维护;
尝试以独占方式获取,如果线程中断或时间超时会立即中止;
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取一次,获取不到则进入doAcquireNanos
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算出未来时间,超过这个时间未获取到,则响应失败
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 这里的park传入了时间,阻塞nanosTimeout,超时则自动唤醒;
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
对acquire方法有了深入理解之后,tryAcquireNanos思想类似,就不细写了;
以独占方式获得,如果线程被中断,则立即中止抛出:InterruptedException异常;
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 如果线程被中断,则直接抛出异常;
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁
if (!tryAcquire(arg))
// 获取不到,则阻塞加入队列
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 线程如果被调用interrupt会被唤醒,如果true,则表示线程被中断,这里直接抛出中断异常;
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryRelease方法是交由用户自定义的,release会调用tryRelase做循环判断;
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 当node.next不满足被唤醒条件时,则从后往前找满足唤醒条件的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 找到waitStatus 为 0(默认) 或者 -1的节点;将其唤醒
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
为什么是通过前驱节点的状态来控制后继节点阻塞情况?唤醒线程时为什么是从后往前找?
https://www.cnblogs.com/dennyzhangdd/p/7218510.html



加锁:
释放锁:
tryAcquireShared如果返回值>=0 表示获取锁成功,如果返回值是0,表示当前节点获取锁成功,但后面的线程再获取会失败;
在独占锁中,tryAcquire只需要返回true和false来表示占用成功或失败,而对于tryAcquireShared返回 >=0 则表示成功,否则失败;
public final void acquireShared(int arg) {
// 尝试获取共享资源,如果>=0表示获取成功;
if (tryAcquireShared(arg) < 0)
// 否则,加入阻塞队列;
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 共享模式的节点类型为:SHARED
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果前驱节点是head,则尝试再获取;
if (p == head) {
int r = tryAcquireShared(arg);
// 如果获取成功,则设置头节点 并唤醒后续节点
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// 因为是共享锁,获取到锁之后,可以理解为会传播到后继SHARED类型节点也获到锁
if (propagate > 0 || // 表示有剩余资源,可以唤醒后续节点(如果有SHARED类型的)
h == null || h.waitStatus < 0 || // 旧head为空(共享锁可能后继节点释放锁) 或waitStatus<0 (表示后继节点在等待获取锁)
(h = head) == null || h.waitStatus < 0) // 新head为空 或waitStatus<0 (表示后继节点在等待获取锁)
{
Node s = node.next;
// 如果下个节点的类型是Shared,则进行唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}
setHeadAndPropagate 方法内部判断h和head是否为空null;为什么要做这些判断?因为共享锁可能会有很多的线程在同时释放锁doReleaseShared如果队列中有等待线程,则可能被很接近同时被唤醒,各自setHead,很有可能第一被唤醒的线程的h和head已经发生了变化;
在共享锁中,可以被唤醒,如果竞争不到资源还是会被park的;
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果状态是-1,表示后继节点等待,改为0表示当前节点已获取资源释放;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 从后往前找第一个满足唤醒的节点
unparkSuccessor(h);
}
// 多个线程同时在doreleaseshared时,t1将head.waitStatus=0,则t2进来判断ws==0,则将head改为-2,则t1在获取锁执行setHeadAndPropagate会再次做唤醒下个节点;
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点没有发生变化,则退出循环,否则,继续做唤醒的动作;
if (h == head) // loop if head changed
break;
}
}
本文内容只是对API的理解,对同步器框架的设计精髓还有得挖掘,,日后对AQS有较的理解之后,再来补原理;