前言:在并发编程过程中我们有时CountDownLatch 来达到异步线程任务执行完毕后,发送通知或进行其它业务的处理,关于 CountDownLatch使用参考:https://blog.csdn.net/l123lgx/article/details/122229867;
CountDownLatch 是怎么工作的:
1 需要new CountDownLatch 来设置state 的值:
CountDownLatch countDownLatch = new CountDownLatch(1);
CountDownLatch:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
new Sync(count):
Sync(int count) {
setState(count);
}
AbstractQueuedSynchronizer.setState(count):
/**
* The synchronization state.
*/
private volatile int state;
protected final void setState(int newState) {
// 将new CountDownLatch(1)的参数1 传入赋值给state
state = newState;
}
2 在开启线程处理任务时,在业务处理完毕后:
new Thread(()->{
try {
// 你的业务处理
Thread.sleep(1000);
}catch (Exception ex){
ex.printStackTrace();
}
// state 的值-1
countDownLatch.countDown();
System.out.println("countDown");
}).start();
CountDownLatch.countDown()
public void countDown() {
sync.releaseShared(1);
}
sync.releaseShared(1):
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 如果当前线程在对state -1 后state = 0 则唤醒AQS中阻塞的线程节点
doReleaseShared();
return true;
}
return false;
}
CountDownLatch.tryReleaseShared(int releases)
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 获取AbstractQueuedSynchronizer state 的值
int c = getState();
if (c == 0)// c为0 直接返回
return false;
int nextc = c-1;// 否则将 state 的值-1
if (compareAndSetState(c, nextc))// cas 替换state 的值
return nextc == 0;// state 的值-1 为0 则返回true
}
}
doReleaseShared():
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;// AQS 中的头节点
if (h != null && h != tail) {// 如果AQS 中有节点
int ws = h.waitStatus;// 获取节点的重入次数
if (ws == Node.SIGNAL) {// 如果节点是就绪状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 将waitStatus替换为0
continue; // loop to recheck cases 失败则下一次循环
unparkSuccessor(h);// 成功则唤醒AQS 的中最早进入的一个节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
3 在主线程中通过调用 .await()等待你线程任务的执行完毕:
countDownLatch.await():
countDownLatch.await();
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
AbstractQueuedSynchronizer.acquireSharedInterruptibly(1):
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();// 线程被中断则直接抛出异常
if (tryAcquireShared(arg) < 0)// 如果state 的值不为0 则返回-1 为0返回1
doAcquireSharedInterruptibly(arg);
}
countDownLatch.tryAcquireShared(int acquires):
protected int tryAcquireShared(int acquires) {
// 如果state 的值不为0 则返回-1 为0返回1
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly(arg):
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将当前节点加入到AQS 双向链表中,此时初始的 waitStatus 为0
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取当前节点的前置节点
final Node p = node.predecessor();
if (p == head) {// 如果前置节点为head 节点,也即改node 为AQS 的第一个有效节点
int r = tryAcquireShared(arg);// 得到CountDownLatch中的state 的值并判断返回如果state 为0则返回1 否则返回-1
if (r >= 0) {
// 将当前node 节点设置为AQS的头部节点并唤醒AQS中最早加入的一个节点线程
setHeadAndPropagate(node, r);
p.next = null; // help GC 移除原头部节点
failed = false;
return;
}
}
// 设置node 的前置节点的waitStatus 为signal 并挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);// 清除AQS 中的失效节点
}
}
setHeadAndPropagate(node, r):
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);// 设置当前节点为头部节点
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();//
}
}
AbstractQueuedSynchronizer.setHead(Node node)
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
doReleaseShared():
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;// 获取AQS的头部节点
if (h != null && h != tail) {// AQS 中有节点
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 将头部节点的waitStatus 设置为0
continue; // loop to recheck cases
unparkSuccessor(h);// 唤醒AQS 中最早加入的一个有效节点线程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
unparkSuccessor(h):
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
4 CountDownLatch 过程:
(1)初始化CountDownLatch时传入要等待定线程个数(int);
(2) 在每个线程中执行完任务后调用countDown()方法对CountDownLatch 的state -1 ;如果减1后state 的值为0,则尝试去唤醒AQS 中处于阻塞的节点;
(3)在主线中调用await() 方法来处理所有线程都执行完任务,如果当前CountDownLatch 的state 不为0 则将当前线程封装为node 加入到AQS双向链表中,随后进行锁的获取,如果获取不到则进行当前线程的挂起;
5 总结:
CountDownLatch 使用state 的变量值来判断线程执行任务的情况,每个线程在执行完成任务后都对改state 减去1 以代表一个任务的执行完毕;适应await() 方法来阻塞等待还没有执行完任务的线程;所以CountDownLatch 的执行 state 值的设置,以及使用countDown()减1 来代表任务的执行完毕,使用await() 方法来阻塞等待还没有执行完任务的线程,3个元素缺一不可;如果使用不当则使得线程阻塞,从而影响业务的执行。