多线程工具类 CompletableFuture
多线程同步工具类 countDownLatch
Future 和 Promise
。Future
相当于一个占位符
,代表一个操作将来的结果
。一般通过 get
可以直接阻塞得到结果
,或者让它异步执行然后通过 callback 回调结果
。
get 操作不要在 任务分发 循环体 内进行,否则整个操作就 不是 多线程异步 操作了
但如果回调中嵌入了回调
呢?如果层次很深,就是回调地狱。
Java 中的 CompletableFuture
其实就是 Promise
,用来解决回调地狱问题
。Promise
是为了让代码变得优美而存在的
有多优美?这么说吧,一旦你使用了 CompletableFuture,就会爱不释手,就像初恋女友一样,天天想着她
CountDownLatch
用于主线程等待其他子线程任务都执行完毕后再执行
,它允许一个或多个线程
一直等待
,直到其他线程的操作执行完后再执行
CountDownLatch 是通过一个 计数器 来实现的,计数器 的 初始值 是 线程的数量。
每当 一个线程执行完毕后,计数器的值就 -1 ,当计数器的值为 0 时,表示 所有线程都执行完毕
然后 在闭锁上 等待的线程就可以 继续执行 了
使用的场景: 如多模块数据加速加载
、治理大批量数据下游接口超时
等
CountDownLatch
主要的方法:
await() : 调用 await() 方法的 线程 会 被挂起,它会等待 直到 count 值为 0 才 继续执行
await(long timeout,TimeUnit unit) : 只不过 等待一定的时间后 count 值还没变为 0 的话就会 继续执行(适用允许丢失部分数据的场景)
countDown() : 将 count 值减 1
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Test020 {
/**
* 线程池
*/
private static final ExecutorService QUERY_POOL = new ThreadPoolExecutor(
10, 10,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10000),
new ThreadPoolExecutor.DiscardPolicy());
public static void main(String[] args) {
long start = System.currentTimeMillis();
try {
List<Long> resultList = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
QUERY_POOL.execute(() -> {
try {
resultList.addAll(dohandler());
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
System.out.println("结果:" + resultList + "耗时:" + (System.currentTimeMillis() - start));
} catch (Exception e) {
System.out.println("发生异常");
}
}
public static List<Long> dohandler() {
try {
Thread.sleep(2500);
return Lists.newArrayList(123L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
//创建初始化3个线程的线程池
private ExecutorService threadPool = Executors.newFixedThreadPool(3);
//保存每个学生的平均成绩
private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(); //注意这里要用并发map
private CountDownLatch countDownLatch = new CountDownLatch(3);
private void count() {
for (int i = 0; i < 3; i++) {
threadPool.execute(() -> {
//计算每个学生的平均成绩,代码略()假设为60~100的随机数
int score = (int) (Math.random() * 40 + 60);
try {
Thread.sleep(Math.round(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName() + "同学的平均成绩为" + score);
countDownLatch.countDown();
});
}
this.run();
threadPool.shutdown();
}
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = 0;
Set<String> set = map.keySet();
for (String s : set) {
result += map.get(s);
}
System.out.println("三人平均成绩为:" + (result / 3) + "分");
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
CyclicBarrier1 cb = new CyclicBarrier1();
cb.count();
Thread.sleep(100);
long end = System.currentTimeMillis();
System.out.println(end - now);
}
CompletableFuture
是对 Future 模式的应用
即实现
,支持流调用
、异步执行
,它支持完成后得到通知
CompletableFuture
默认的时候会使用 ForkJoinPool
池来提供线程来执行任务
从它的源代码中,我们可以看到,CompletableFuture
直接提供了几个便捷的静态方法入口
。其中有run 和 supply
两组
ForkJoinPool
线程池在 JDK 8 加入,主要用法和之前的线程池是相同的,也是把任务交给线程池去执行
,线程池
中也有任务队列来存放任务
,和之前的五种线程池不同的是,它非常适合执行可以分解子任务的任务
,比如树的遍历,归并排序,或者其他一些递归场景
run 的参数是 Runnable 没有返回值
supply 的参数是 Supplier 有返回值
runAsync(Runnable runnable) :同步执行,使用默认线程池
runAsync(Runnable runnable, Executor executor) :同步执行,手动线程池
supplyAsync(Supplier supplier) :异步执行,使用默认线程池
supplyAsync(Supplier supplier, Executor executor):异步执行,手动线程池
这两组静态函数
,都提供了传入自定义线程池的功能
。如果你用的不是外置的线程池
,那么它就会使用默认的 ForkJoin 线程池
。默认的线程池,大小和用途你是控制不了的,所以还是建议自己传递一个
。
典型的代码,写起来是这个样子:
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "test";
});
String result = future.join();
CompletableFuture
的主要作用,就是让代码写起来好看
。配合 Java8 之后的 Stream 流
,可以把整个计算过程抽象成一个流。
前面任务的计算结果,可以直接作为后面任务的输入,就像是管道一样
thenApply
thenApplyAsync
thenAccept
thenAcceptAsync
thenRun
thenRunAsync
thenCombine
thenCombineAsync
thenCompose
thenComposeAsync
下面代码的执行结果是 99,并不因为是异步就打乱代码执行的顺序了
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync((e) -> {
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return e * 10;
}).thenApplyAsync(e -> e - 1);
cf.join();
System.out.println(cf.get());
函数的作用还要看 then 后面的动词
:
apply 有入参和返回值,入参为前置任务的输出
accept 有入参无返回值,会返回 CompletableFuture
run 没有入参也没有返回值,同样会返回 CompletableFuture
combine 形成一个复合的结构,连接两个 CompletableFuture,并将它们的2个输出结果,作为 combine 的输入
compose 将嵌套的 CompletableFuture 平铺开,用来串联两个 CompletableFuture
上面的函数列表,其实还有很多
when
handle
when 的意思,就是任务完成时候的回调。
比如我们上面的例子,打算在完成任务后,输出一个 done。它也是属于只有入参没有出参的范畴,适合放在最后一步进行观测。
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync((e) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return e * 10;
}).thenApplyAsync(e -> e - 1)
.whenComplete((r, e)->{
System.out.println("done");
})
;
cf.join();
System.out.println(cf.get());
handle 和 exceptionally 的作用,和 whenComplete 是非常像的
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture 的任务是串联的,如果它的其中某一步骤发生了异常,会影响后续代码的运行的。
exceptionally 从名字就可以看出,是专门处理这种情况的。比如,我们强制某个步骤除以 0,发生异常,捕获后返回 -1,它将能够继续运行。
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync(e->e/0)
.thenApplyAsync(e -> e - 1)
.exceptionally(ex->{
System.out.println(ex);
return -1;
});
cf.join();
System.out.println(cf.get());
handle 更加高级一些,因为它除了一个异常参数,还有一个正常的入参。处理方法也都类似,不再赘述。
某一个业务接口,需要处理几百个请求
,请求之后再把这些结果给汇总起来
。
如果顺序执行的话,假设每个接口耗时 100ms,那么 100 个接口,耗时就需要 10 秒
。假如我们并行去获取的话,那么效率就会提高
。
使用 CountDownLatch
可以解决:
ExecutorService executor = Executors.newFixedThreadPool(5);
CountDownLatch countDown = new CountDownLatch(requests.size());
for(Request request:requests){
executor.execute(()->{
try{
//some opts
}finally{
countDown.countDown();
}
});
}
countDown.await(200,TimeUnit.MILLISECONDS);
使用 CompletableFuture
来替换CountDownLatch
:
ExecutorService executor = Executors.newFixedThreadPool(5);
List<CompletableFuture<Result>> futureList = requests.stream()
.map(request->
CompletableFuture.supplyAsync(e->{
//some opts
},executor))
.collect(Collectors.toList());
CompletableFuture<Void> allCF = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
allCF.join();
allOf,用来把所有的 CompletableFuture 组合在一起;还有 anyOf,表示只运行其中一个。
常用的,还有三个函数:
thenAcceptBoth:处理两个任务的情况,有两个任务结果入参,无返回值
thenCombine:处理两个任务的情况,有入参有返回值,最喜欢
runAfterBoth:处理两个任务的情况,无入参,无返回值
CyclicBarrier
和CountDownLatch
这两个工具都是在java.util.concurrent包
下
CyclicBarrier
的字面意思是可循环使用(Cyclic)的屏障(Barrier)
。它要做的是,让一组线程
到达一个屏障(也可以叫同步点)
时被阻塞
,直到最后一个线程到达屏障时
,屏障才会开门
,所有被屏障拦截的线程才会继续干活
。
这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后
这个屏障是可以重新使用的(reset()方法重置屏障点),这一点与CountDownLatch不同
CyclicBarrier
是一种同步机制
允许一组线程相互等待
,等到所有线程都
到达一个屏障点
才退出 await 方法
,它没有直接实现 AQS
而是借助 ReentrantLock 来实现的同步机制
CyclicBarrier 是可循环使用的
CountDownLatch 是一次性的
另外它体现的语义也跟CountDownLatch不同
CountDownLatch 减少 计数 到达条件采用的是 release 方式
CyclicBarrier 走向屏障点(await)采用的是 Acquire 方式
Acquire 是会阻塞的,这也实现了 CyclicBarrier 的另外一个特点,只要 有一个线程 中断
那么 屏障点 就 被打破,所有线程 都将 被唤醒(CyclicBarrier自己负责这部分实现,不是由AQS调度的)
这样也避免了因为一个线程中断引起永远不能到达屏障点而导致其他线程一直等待。
屏障点 被打破 的 CyclicBarrier 将不可再使用(会抛出BrokenBarrierException)除非执行 reset 操作
CyclicBarrier
有两个构造函数
CyclicBarrier(int parties) int类型的参数 表示有 几个线程 来参与这个 屏障拦截,(拿上面的例子,即有几个人跟团旅游);
CyclicBarrier(int parties,Runnable barrierAction) 当所有线程 到达一个屏障点 时,优先执行 barrierAction 这个线程
await():每个线程调用 await(),表示已经到达 屏障点, 然后 当前线程 被阻塞
private static ExecutorService executor = Executors.newFixedThreadPool(10);
private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
private void count() {
for (int i = 0; i < 3; i++) {
executor.execute(() -> {
//计算每个学生的平均成绩,代码略()假设为60~100的随机数
int score = (int) (Math.random() * 40 + 60);
try {
Thread.sleep(Math.round(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName() + "同学的平均成绩为" + score);
try {
//执行完运行await(),等待所有学生平均成绩都计算完毕
cyclicBarrier.await();
this.run();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
public void run() {
int result = 0;
Set<String> set = map.keySet();
for (String s : set) {
result += map.get(s);
}
System.out.println("三人平均成绩为:" + (result / 3) + "分");
}
CountDownLatch 的 计数器 只能使用一次
CyclicBarrier 的计数器可以使用 reset() 方法重置
所以 CyclicBarrier 能处理 更为复杂 的业务场景,比如如果计算 发生错误,可以 重置计数器,并让线程们重新执行一次
CyclicBarrier 还提供 其他有用的方法
getNumberWaiting 方法可以获得 CyclicBarrier 阻塞的 线程数量
isBroken 方法用来知道 阻塞的线程是否被中断
CountDownLatch 会阻塞主线程,CyclicBarrier 不会阻塞 主线程,只会阻塞 子线程
某线程 中断 CyclicBarrier 会抛出异常,避免了 所有线程 无限等待
CountDownLatch:一个或者多个线程,等待 其他多个线程 完成某件事情之后才能执行
CyclicBarrier:多个线程 互相等待,直到 到达 同一个同步点,再 继续一起执行
对于 CountDownLatch来说,重点是“ 一个线程(多个线程)等待 ”,而其他的 N个线程 在 完成“某件事情” 之后,可以终止,也可以等待
而对于 CyclicBarrier,重点是 多个线程,在 任意一个线程 没有完成,所有的线程都必须等待
CountDownLatch 是 计数器,线程 完成一个记录一个,只不过 计数 不是递增而是 递减
CyclicBarrier 更像是一个 阀门 ,需要所有线程 都到达 ,阀门 才能打开,然后继续执行
对于各种回调的嵌套
,CompletableFuture
为我们提供了更直观、更优美的 API
。在“多个任务等待完成状态
”这个应用场景,CompletableFuture
很好用