本章讲解一下CAS,本质就是机器指令:cmpxchg+lock(根据处理器核数进行判断)
原子操作;
而在谈到并发操作里面,我们不得不谈到AQS,JDK的源码里面好多并发的类都是通过Sync(同步器)的内部类继承AQS而实现出五花八门的功能;
AQS是一个抽象类,类名为AbstractQueuedSynchronizer,抽象的都是一些公用的方法属性,其自身是没有实现任何同步接口的;
AQS定义了同步器中获取锁和释放锁,目的来让自定义同步器组件来使用或重写;
纵观AQS的子类,绝大多数都是一个叫Sync的静态内部类来继承AQS类,通过重写AQS中的一些方法来实现自定义同步器;
AQS定义了两种资源共享方式:EXCLUSIVE( 独占式:每次仅有一个Thread能执行 )、SHARED( 共享式:多个线程可同时执行 );
AQS维护了一个FIFO的CLH链表队列,且该队列不支持基于优先级的同步策略;
维护了一个volatile的int类型的state字段:private volatile int state
,该字段是实现AQS的核心关键词;
通过getState、setState、compareAndSetState方法类获取、设置更新state值;
该字段在不同的并发类中起着不同的纽带作用,下面会接着讲到state字段的一些应用场景;
正常默认的状态值为0;
对于释放操作的时候,前一个结点有唤醒后一个结点的任务;(本质是后节点轮训前一个节点的waitStatus状态值)
当前结点的前置结点waitStatus > 0,则结点处于CANCELLED状态,应该需要踢出队列;
当前结点的前置结点waitStatus = 0,则需要将前置结点改为SIGNAL状态;
当前结点的前置结点waitStatus = -1 ,则需要将前置结点改为wait状态;
当前结点的前置结点waitStatus = -2 ,则需要将前置结点改为condition状态;
+------+ prev +------+ prev +------+
| | <---- | | <---- | |
head | Node | next | Node | next | Node | tail
| | ----> | | ----> | |
+------+ +------+ +------+
在头尾结点中,需要特别指出的是头结点是一个空对象结点:这里可以这样去理解,头节点是不参与排队的,因为它已经获得了同步状态了,那么就说明该头节点的相关线程已经在执行相应的业务逻辑了,而在执行完业务逻辑,释放同步状态后,该头节点是肯定要被垃圾回收的,防止内存空间的浪费,这里就涉及到了gc root,如果对象还有引用的话,垃圾回收器是不会回收它的,所以需要把头节点持有的各种引用都置为null,方便之后的垃圾回收,而Thread也是头节点持有的引用之一,因此Thread对象也需要被置为null。
每一个Node结点都维护了一个指向前驱的指针和指向后驱的指针,结点与结点之间相互关联构成双向链表;
入队在尾,出队在头,出队后需要激活该出队结点的后继结点,若后继结点为空或后继结点waitStatus>0,则从队尾向
前遍历取 waitStatus<0 的触发阻塞唤醒;
CountDownLatch(倒计时计数器),简单大致意思为:A组线程等待另外B组线程,B组线程执行完了,A组线程才可以执行;
state初始化假设为N,后续每countDown()一次,state会CAS减1。
等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从 await() 函数返回,继续后余动作。
CyclicBarrier(回环栅栏),简单大致意思为:A组线程等待另外B组线程,B组线程执行完了,A组线程才可以执行;
state初始化假设为N,后续每await()一次,state会CAS减1。
等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从 await() 函数返回,继续后余动作。
ReentrantLock(互斥锁),简单大致意思为:独占式锁的类;
state初始化为0,表示未锁定状态,然后每lock()时调用tryAcquire()使state加1。
其他线程再 tryAcquire() 时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁;
Semaphore,简单大致意思为:A、B、C、D线程同时争抢资源,目前卡槽大小为2,若A、B正在执行且未执行完,那么C、D线程在门外等着,一旦A、B有1个执行完了,那么C、D就会竞争看谁先执行;
state初始值假设为N,后续每tryAcquire()一次,state会CAS减1,当state为0时其它线程处于等待状态。
直到state>0且
protected boolean isHeldExclusively()
需要被子类实现的方法,调用该方法的线程是否持有独占锁,一般用到了condition的时候才需要实现此方法
protected boolean tryAcquire(int arg)
需要被子类实现的方法,独占方式尝试获取锁,获取锁成功后返回true,获取锁失败后返回false
protected boolean tryRelease(int arg)
需要被子类实现的方法,独占方式尝试释放锁,释放锁成功后返回true,释放锁失败后返回false
protected int tryAcquireShared(int arg)
需要被子类实现的方法,共享方式尝试获取锁,获取锁成功后返回正数1,获取锁失败后返回负数-1
protected boolean tryReleaseShared(int arg)
需要被子类实现的方法,共享方式尝试释放锁,释放锁成功后返回true,释放锁失败后返回false
final boolean acquireQueued(final Node node, int arg)
对于进入队尾的结点,检测自己可以休息了,如果可以修改则进入SIGNAL状态且进入park()阻塞状态
private Node addWaiter(Node mode)
添加结点到链表队尾
private Node enq(final Node node)
如果addWaiter尝试添加队尾失败,则再次调用enq此方法自旋将结点加入队尾
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
检测结点状态,如果可以休息的话则设置waitStatus=SIGNAL并调用LockSupport.park休息;
private void unparkSuccessor(Node node)
释放锁时,该方法需要负责唤醒后继节点
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire{
如果尝试获取独占锁失败的话( 尝试获取独占锁的各种方式由AQS的子类实现 ),
那么就新增独占锁结点通过自旋操作加入到队列中,并且根据结点中的waitStatus来决定是否调用
LockSupport.park进行休息
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release{
如果尝试释放独占锁成功的话( 尝试释放独占锁的各种方式由AQS的子类实现 ),
那么取出头结点并根据结点waitStatus来决定是否有义务唤醒其后继结点
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
acquireShared{
如果尝试获取共享锁失败的话( 尝试获取共享锁的各种方式由AQS的子类实现 ),
那么新增共享锁结点通过自旋操作加入到队尾中,并且根据结点中的waitStatus来决定是否调用
LockSupport.park进行休息
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared{
如果尝试释放共享锁失败的话( 尝试释放共享锁的各种方式由AQS的子类实现 ),
那么通过自旋操作唤完成阻塞线程的唤起操作
}
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
带参构造方法还可通过传入变量还决定调用方是使用公平锁还是非公平锁;
AQS --> Sync ---> FairSync // 公平锁
|
|> NonfairSync // 非公平锁
ReentrantLock内的同步器都是通过Sync抽象接口来操作调用关系的,细看会发现基本上都是通过sync.xxx之类的这种调用方式的;
public void lock() {
sync.lock();
}
// FairSync 公平锁调用方式
final void lock() {
acquire(1); // 尝试获取独占锁
}
// NonfairSync 非公平锁调用方式
final void lock() {
// 首先判断state资源是否为0,如果恰巧为0则表明目前没有线程占用锁,则利用CAS占有锁
if (compareAndSetState(0, 1))
// 当独占锁之后则将设置exclusiveOwnerThread为当前线程
setExclusiveOwnerThread(Thread.currentThread()); else
// 若CAS占用锁失败的话,则再尝试获取独占锁
acquire(1);
}
这里的区别就是非公平锁在调用lock时首先检测了是否通过CAS获取锁,发现锁一旦空着的话,则抢先一步占为己有,不管有没有阻塞队列,只要当前线程来的时候发现state资源没被占用那么当前线程就抢先一步试一下CAS,CAS失败了它才去排队;
该方法是独占模式下线程获取state共享资源的入口,如果获取到资源的话就返回,否则创建独占模式结点加入阻塞队列,直到获取到共享资源;
而且这里需要加上自我中断判断,主要是因为线程在等待过程中被中断的话,它是不响应的,那么就只有等到线程获取到资源后通过自我判断将这个判断后续补上;
独占模式的该方法,正常情况下只要没有获取到锁,该方法一直处于阻塞状态,获取到了则跳出该方法区;
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 尝试获取锁资源,若获取到资源的话则线程直接返回,此方法由AQS的具体子类实现
// 否则获取资源失败的话,那么就进入等待队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// FairSync 公平锁的 tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取锁资源的最新内存值
int c = getState();
// 当state=0,说明锁资源目前还没有被任何线程被占用
if (c == 0) {
// 检查线程是否有阻塞队列
if (!hasQueuedPredecessors() &&
// 如果没有阻塞队列,则通过CAS操作获取锁资源
compareAndSetState(0, acquires)) {
// 没有阻塞队列,且CAS又成功获取锁资源,则设置独占线程对象为当前线程
setExclusiveOwnerThread(current);
// 返回标志,告诉上层该线程已经获取到了锁资源 return true;
}
}
// 执行到此,锁资源值不为0,说明已经有线程正在占用这锁资源
else if (current == getExclusiveOwnerThread()) {
// 既然锁已经被占用,则看看占用锁的线程是不是当前线程
int nextc = c + acquires;
// 如果占用的锁的线程是当前线程的话,则为重入锁概念,状态值做加1操作
// int类型值小于0,是因为该int类型的state状态值溢出了,
// 溢出了的话那得说明这个锁有多难获取啊,可能出问题了
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
// 返回成功标志,告诉上层该线程已经获取到了锁资源
return true;
}
// 返回失败标志,告诉上层该线程没有获取到锁资源
return false;
}
// NonfairSync 非公平锁的 tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
// 调用父类的非公平获取锁资源方法
return nonfairTryAcquire(acquires);
}
// NonfairSync 非公平锁父类 Sync 类的 nonfairTryAcquire 方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取锁资源的最新内存值
int c = getState();
// 当state=0,说明锁资源目前还没有被任何线程被占用
if (c == 0) {
// 先不管三七二十一,先尝试通过CAS操作获取锁资源
if (compareAndSetState(0, acquires)) {
// CAS一旦成功获取锁资源,则设置独占线程对象为当前线程
setExclusiveOwnerThread(current);
// 返回成功标志,告诉上层该线程已经获取到了锁资源
return true;
}
}
// 执行到此,锁资源值不为0,说明已经有线程正在占用这锁资源
// 既然锁已经被占用,则看看占用锁的线程是不是当前线程
else if (current == getExclusiveOwnerThread()) {
// 如果占用的锁的线程是当前线程的话,则为重入锁概念,状态值做加1操作
int nextc = c + acquires;
// int类型值小于0,是因为该int类型的state状态值溢出了,溢出了的话那得说明这个锁有多难获取啊,可能出问题了
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); //
return true; // 返回成功标志,告诉上层该线程已经获取到了锁资源
}
return false; // 返回失败标志,告诉上层该线程没有获取到锁资源
}
tryAcquire方法是AQS的子类实现的,也就是ReentrantLock的两个静态内部类实现的,目的就是通过CAS尝试获取锁资源,获取锁资源成功则返回true,获取锁资源失败则返回false;
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 按照给定的mode模式创建新的结点,模式有两种:Node.EXCLUSIVE独占模式、Node.SHARED共享模式;
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 将先队尾结点赋值给临时变量
if (pred != null) { // 如果pred不为空,说明该队列已经有结点了
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 通过CAS尝试将node结点设置为队尾结点
pred.next = node;
return node;
}
}
// 执行到此,说明队尾没有元素,则进入自旋首先设置头结点,然后将此新建结点添加到队尾
enq(node); // 进入自旋添加node结点
return node;
}
addWaiter通过传入不同的模式来创建新的结点尝试加入到队列尾部,如果由于并发导致添加结点到队尾失败的话那么就进入自旋将结点加入队尾;
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
// 自旋的死循环操作方式
Node t = tail;
// 因为是自旋方式,首次链表队列tail肯定为空,但是后续链表有数据后就不会为空了
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
// 队列为空时,则创建一个空对象结点作为头结点,无意思,可认为傀儡结点
// 空队列的话,头尾都指向同一个对象
tail = head;
} else {
// 进入 else 方法里面,说明链表队列已经有结点了
node.prev = t;
// 因为存在并发操作,通过CAS尝试将新加入的node结点设置为队尾结点
if (compareAndSetTail(t, node)) {
// 如果node设置队尾结点成功,
// 则将之前的旧的对象尾结点t的后继结点指向node,node的前驱结点也设置为t
t.next = node;
return t;
}
}
// 如果执行到这里,说明上述两个CAS操作任何一个失败的话,该方法是不会放弃的,因为是自旋操作,再次循环继续入队
}
}
enq通过自旋这种死循环的操作方式,来确保结点正确的添加到队列尾部,通过CAS操作如果头部为空则添加傀儡空结点,然后在循环添加队尾结点;
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
CAS操作,设置头结点、尾结点;
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 自旋的死循环操作方式
// 如果新建结点的前驱结点是头结点
final Node p = node.predecessor();
// 如果前驱结点为头结点,那么该结点则是老二,仅次于老大,也希望尝试去获取一下锁,
// 万一头结点恰巧刚刚释放呢?希望还是要有的,万一实现了呢。。。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC,原来头结点本来就是空
// 拿到锁资源后,则该node结点升级做头结点,且设置后继结点指针为空,便于GC回收
failed = false;
return interrupted;
}
// 根据前驱结点看看是否需要休息一会儿
// 阻塞操作,正常情况下,获取不到锁,代码就在该方法停止了,直到被唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
// 如果执行到这里,说明尝试休息失败了,因为是自旋操作,所以还会再次循环继续操作判断
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued也是采用一个自旋的死循环操作方式,只有头结点才能尝试获取锁资源,其余的结点挨个挨个在那里等待修改,等待被唤醒,等待机会成为头结点;而新添加的node结点也自然逃不过如此命运,先看看是否头结点,然后再看看是否能休息;
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱结点的状态值
int ws = pred.waitStatus;
// 若前驱结点的状态为SIGNAL状态的话,那么该结点就不要想事了,直接返回true准备休息
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 若前驱结点的状态为CANCELLED状态的话,那么就一直向前遍历,直到找到一个不为CANCELLED状态的结点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 剩下的结点状态,则设置其为SIGNAL状态,然后返回false标志等外层循环再次判断
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire主要是检测前驱结点状态,前驱结点为SIGNAL的话,则新结点可以安安心心休息了;
如果前驱结点大于零,说明前驱结点处于CANCELLED状态,那么则以入参pred前驱为起点,一直往前找,直到找到最近一个正常等待状态的结点;
如果前驱结点小于零,那么就将前驱结点设置为SIGNAL状态,然后返回false依赖acquireQueued的自旋再次判断是否需要进行休息;
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 阻塞等待
return Thread.interrupted(); // 被唤醒后查看是否有被中断过否?
}
parkAndCheckInterrupt首先调用park让线程进入等待状态,然后当park阻塞被唤醒后,再次检测是否曾经被中断过;
而被唤醒有两种情况,一个是利用unpark唤醒,一个是利用interrupt唤醒;
public void unlock() {
sync.release(1);
}
unlock释放锁资源,一般都是在finally中被调用,防止当临界区因为任何异常时怕锁不被释放;而释放锁不像获取锁lock的实现多色多样,没有所谓公平或不公平,就是规规矩矩的释放资源而已;
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放锁资源,此方法由AQS的具体子类实现
Node h = head;
if (h != null && h.waitStatus != 0) // 从头结点开始,唤醒后继结点
unparkSuccessor(h); // 踢出CANCELLED状态结点,然后唤醒后继结点
return true;
}
return false;
}
release尝试释放锁,并且有义务移除CANCELLED状态的结点,还有义务唤醒后继结点继续运行获取锁资源;
// NonfairSync 和 FairSync 的父类 Sync 类的 tryRelease 方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 获取锁资源值并做减1操作
if (Thread.currentThread() != getExclusiveOwnerThread())
// 查看当前线程是否和持有锁的线程是不是同一个线程
// 正常情况下,需要释放的线程肯定是持有锁的线程,否则不就乱套了,肯定哪里出问题了,所以抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 若此时锁资源值做减法操作后正好是0,则所有锁资源已经释放干净,因此持有锁的变量也置为空
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
// 若此时做减法操作还没有归零,那么这种情况就是那种重入锁,需要重重释放后才行
return free;
}
tryRelease主要通过CAS操作对state锁资源进行减1操作;
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 该node一般都是传入head进来,也就是说,需要释放头结点,
// 也就是当前结点需要释放锁操作,顺便唤醒后继结点
int ws = node.waitStatus;
if (ws < 0)
// 若结点状态值小于0,则归零处理,通过CAS归零,允许失败,但是不管怎么着,
// 仍然要往下走去唤醒后继结点
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 取出后继结点,这个时候一般都是Head后面的一个结点,所以一般都是老二
if (s == null || s.waitStatus > 0) {
// 若后继结点为空或者后继结点已经处于CANCELLED状态的话
s = null;
// 那么从队尾向前遍历,直到找到一个小于等于0的结点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒线程
}
unparkSuccessor主要是踢出CANCELLED状态结点,然后唤醒后继结点;但是这个唤醒的后继结点为空的话,那么则从队尾一直向前循环查找小于等于零状态的结点并调用unpark唤醒;
这里为什么要从队尾向前寻找?
分析了这么多,感觉是不是有一种豁然开朗的感觉,原来大家传的神乎其神的AQS是不是没有想象中那么难以理解;
在这里我简要总结一下AQS的流程的一些特性:
其实当了解了AQS后,这里以ReentrantLock为载体分析了一下,那么再去分析CountDownLatch、Semaphore、ReentrantReadWriteLock等那些集成AQS而实现不同功能的模块就会顺利很多;
后面还有干货哦(期待后传),未完待续…