单词1: countdown
常见释义: 英[ˈkaʊntdaʊn] 美[ˈkaʊntdaʊn]
n. 倒数读秒,倒计时(如发射宇宙飞船时); 大事临近的时期;
[例句]The countdown to Christmas starts here.
现在开始圣诞节倒计时。
[其他] 复数:countdowns
单词2: latch
常见释义 英[lætʃ] 美[lætʃ]
n. 插销; 门闩; 弹簧锁; 碰锁;
vt. 用插销插上; 用碰锁锁上;
[例句]He lifted the latch and opened the door.
他拉起门闩开了门。
[其他] 第三人称单数:latches 复数:latches 现在分词:latching 过去式:latched 过去分词:latched
场景:完成某件事情时,前面的一些事情都要完成,然后自己才能继续。
远离:使用一个计数器进行实现,计数器初始值就是其它线程的数量。当每个被计数的线程完成任务后,计数器值减一,当计数器的值为0时,表示所有线程都已经完成了任务,然后在CountDownLatch上等待的线程就可以恢复执行。
public static void main(String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run () {
try {
// 业务处理
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + ":" + " finished");
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
};
List<Thread> threads = new ArrayList<>(4);
for(int i= 0;i<4;i++) {
Thread thread = new Thread(runnable);
threads.add(thread);
}
for(Thread thread: threads){
thread.start();
}
System.out.println("main end");
}
如下,main线程和其它线程运行时序不定,达不到其它线程都结束后main县城再结束的效果。
public static void main(String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run () {
try {
// 业务处理
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + ":" + " finished");
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
};
List<Thread> threads = new ArrayList<>(4);
for(int i= 0;i<4;i++) {
Thread thread = new Thread(runnable);
threads.add(thread);
}
for(Thread thread: threads){
thread.start();
}
for(Thread thread: threads){
thread.join();
}
System.out.println("main end");
}
join原理:
A线程中执行 B.join(),则A线程阻塞住,直到线程B完成,A线程才能再继续。
join方法的本质调用的是Object中的wait方法实现线程的阻塞,即有monitor锁的概念。所以B线程执行一半要通知A线程继续则无法实现。
public class Test {
final static int CNT = 3;
static CountDownLatch countDownLatch = new CountDownLatch(CNT);
public static void main(String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run () {
try {
// 业务处理
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + ":" + "business finished");
countDownLatch.countDown();
// 其它处理
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + ":" + " finished");
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
};
List<Thread> threads = new ArrayList<>(4);
for (int i = 0; i < CNT; i++) {
Thread thread = new Thread(runnable);
threads.add(thread);
}
for (Thread thread : threads) {
thread.start();
}
countDownLatch.await();
System.out.println("main end");
}
}
输出如下: main线程只等待业务线程的业务逻辑结束就行。(当然应该写两个try finally, 避免结束不了)
改进如下:
public class Test {
final static int CNT = 3;
static CountDownLatch countDownLatch = new CountDownLatch(CNT);
public static void main(String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run () {
// 业务处理
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + ":" + "business finished");
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
// 其它处理
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + ":" + " finished");
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
};
List<Thread> threads = new ArrayList<>(4);
for (int i = 0; i < CNT; i++) {
Thread thread = new Thread(runnable);
threads.add(thread);
}
for (Thread thread : threads) {
thread.start();
}
countDownLatch.await();
System.out.println("main end");
}
}
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
CountDownLatch内部自己实现了继承AbstractQueuedSynchronizer的同步器,如下
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownLatch 并没有往aqs队列加入节点,而是使用aqs的共享模式
aqs的java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
只要countDown没有减到0,那么Node.SHARED
就是head, 则一直for(;😉 死循环的执行判断countDown为0,只要为0,就返回退出了
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
其中tryAcquireShared(arg);
则是CountDownLatch中d的判断逻辑 java.util.concurrent.CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}