今天的话,就来以一个应用场景来进行一步一步的推导,在实现案例的过程中,将CompletableFuture相关的知识点逐步讲述明白。
应用场景如下:
我们在查询掘金文章页面数据为应用场景,如何使用异步编程进行优化。
以掘金展示页面为例,点进文章页面时,大致需要渲染的数据为以下几点:
- //1. 文章内容信息 1s
- //2. 作者相关信息 0.5s 依赖于1的查询结果
- //3. 文章评论信息 0.5s 依赖于1的查询结果
- //4. 文章分类信息 0.5s 依赖于1的查询结果
- //5. 文章专栏信息 0.5s 依赖于1的查询结果
- //6. 相关文章信息 0.5s 依赖于1的查询结果
- //...
- 复制代码
补充:这是我随意拆分的,里面具体的表结构和接口请求先后的关系,以及具体的请求时间都是比较随意的,具体想要陈述的就是同步和异步编程的关系。
那么我们就要根据这个组装一个视图数据来进行返回。
注意:实际上并非是如此,掘金文章页面内容的数据是多个接口返回的,我只是为了模拟内容,这么写罢了,切勿当真,真正应用还需要分业务场景,或者应用场景中可异步编排,到那个时候希望大家能应用上。
现在来说:按照以前我们以前串行化的执行方式,那么总花费的时间就是3.5s,也就是从一开始执行到六,无疑这样是非常慢的,并且 2,3,4,5,6都是依赖于 1的结果查询,但2,3,4,5,6并不互相依赖,此时我们可以将他们从串行化变成异步执行,自己准备一个线程池,然后在执行的时候,将它们放进线程池中异步运行。
如此总耗费时间就从原来的 3.5s 变成了 1.5s,编程思想的改变,对于性能还是有一定程度的提高的
接下来我们就开始接触CompletableFuture吧
再讲CompletableFuture之前,我还是秉承着一贯的理念,先讲述一些之前的东西,然后再将它引入进来,不至于让大家对于它的出现处于一种非常迷茫的状态。
在之前我们如果只是普通异步的执行一个任务,无需返回结果的话,只要将一个任务实现 Runnable接口,然后将放进线程池即可。
如果需要返回结果,就让任务实现Callable接口,但实际上Callable与 Thread 并没有任何关系,Callable 还需要使用Future与线程建立关系,然后再让线程池执行,最后通过futureTask.get()方法来获取执行的返回结果。
futureTask是Future接口的一个基本实现。
(Callable 类似于Runnable 接口,但 Runnable 接口中的 run()方法不会返回结果,并且也无法抛出经过检查的异常,但是 Callable中 call()方法能够返回计算结果,并且也能够抛出经过检查的异常。)
一个小案例:
- /**
- * @description:
- * @author: Yihui Wang
- * @date: 2022年08月21日 11:34
- */
- public class Demo {
- public static ExecutorService executorService = new ThreadPoolExecutor(
- 10,
- 100,
- 30L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(100),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.DiscardOldestPolicy());
-
- public static void runnableTest(){
- RunnableTest runnableTest = new RunnableTest();
- executorService.submit(runnableTest);
- }
-
- public static void callableTest() throws ExecutionException, InterruptedException, TimeoutException {
- CallableAndFutureTest callableAndFutureTest = new CallableAndFutureTest();
- FutureTask<String> task = new FutureTask<>(callableAndFutureTest);
- // 采用线程池执行完程序并不会结束, 如果是想测试一次性的那种 可以采用
- // new Thread(task).start();
- executorService.submit(task);
- //System.out.println("尝试取消任务,传true表示取消任务,false则不取消任务::"+task.cancel(true));
- System.out.println("判断任务是否已经完成::"+task.isDone());
- //结果已经计算出来,则立马取出来,如若摸没有计算出来,则一直等待,直到结果出来,或任务取消或发生异常。
- System.out.println("阻塞式获取结果::"+task.get());
- System.out.println("在获取结果时,给定一个等待时间,如果超过等待时间还未获取到结果,则会主动抛出超时异常::"+task.get(2, TimeUnit.SECONDS));
- }
- public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
- runnableTest();
- callableTest();
- }
-
- }
-
- class RunnableTest implements Runnable{
- @Override
- public void run() {
- System.out.println("我是Runnable执行的结果,我无法返回结果");
- }
- }
- class CallableAndFutureTest implements Callable<String> {
- @Override
- public String call() throws Exception {
- String str = "";
- for (int i = 0; i < 10; i++) {
- str += String.valueOf(i);
- Thread.sleep(100);
- }
- return str;
- }
- }
-
- 复制代码
看起来Callable搭配Future好像已经可以实现我们今天要实现的效果了,从结果的意义上来说,确实可以,但是并不优雅,也会存在一些问题。
如果多个线程之间存在依赖组合,该如何呢?
这个时候就轮到 CompletableFuture出现了~
我先直接将实现应用场景的效果代码写出来,然后再接着慢慢的去讲
- package com.nzc;
-
- import lombok.Data;
-
- import java.util.concurrent.*;
-
- /**
- * @description:
- * @author: Yihui Wang
- * @date: 2022年08月21日 11:48
- */
- public class CompletableFutureDemo {
-
- public static ExecutorService executorService = new ThreadPoolExecutor(
- 10,
- 100,
- 30L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(100),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.DiscardOldestPolicy());
-
- public static ArticleVO asyncReturn(){
- ArticleVO article=new ArticleVO();
- long startTime=System.currentTimeMillis();
- CompletableFuture<ArticleVO> articleContent = CompletableFuture.supplyAsync(() -> {
- try {
- article.setId(1L);
- article.setContent("我是宁在春写的文章内容");
- Thread.sleep(1000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return article;
- },executorService);
-
- // 这里的res 就是第一个个 CompletableFuture 执行完返回的结果
- // 如果要测试它们的异步性,其实应该在下面的所有执行中,都让它们沉睡一会,这样效果会更加明显
- // executorService 是放到我们自己创建的线程池中运行
- CompletableFuture<Void> author = articleContent.thenAcceptAsync((res) -> {
- res.setAuthor(res.getId()+"的作者是宁在春");
- },executorService);
-
- CompletableFuture<Void> articleComment = articleContent.thenAcceptAsync((res) -> {
- res.setComment(res.getId()+"的相关评论");
- },executorService);
-
- CompletableFuture<Void> articleCategory = articleContent.thenAcceptAsync((res) -> {
- res.setCategory(res.getId()+"的分类信息");
- },executorService);
-
- CompletableFuture<Void> articleColumn = articleContent.thenAcceptAsync((res) -> {
- res.setColumn(res.getId()+"的文章专栏信息");
- },executorService);
-
- CompletableFuture<Void> recommend = articleContent.thenAcceptAsync((res) -> {
- res.setRecommend(res.getId()+"的文章推荐信息");
- },executorService);
-
- CompletableFuture<Void> futureAll = CompletableFuture.allOf(articleContent, author, articleComment, articleCategory, articleColumn, recommend);
-
- try {
- // get() 是一个阻塞式方法 这里是阻塞式等待所有结果返回
- // 因为要等待所有结果返回,才允许方法的结束,否则一些还在执行中,但是方法已经返回,就会造成一些错误。
- futureAll.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- long endTime=System.currentTimeMillis();
- System.out.println("耗费的总时间===>"+(endTime-startTime));
-
- // 所有任务执行完成后,将构建出来的视图结果进行返回
- return article;
- }
-
- public static void main(String[] args) {
- ArticleVO articleVO = asyncReturn();
- System.out.println(articleVO);
- }
-
- }
-
- @Data
- class ArticleVO {
- private Long id;
- private String content;
- private String author;
- private String comment;
- private String category;
- private String column;
- private String recommend;
- }
- 复制代码
这里就是对应着应用场景里的那几步,引入以下lombok包就可以直接测试了。
为了更好的看出效果,也可以在执行某个任务的时候,让它睡上一会。
看完上面的例子,算是看到他是如何的啦,接下来还是需要详细说一说的,思维导图如下:

CompletableFuture.runAsync()创建无返回值的简单异步任务 Executor表示线程池~
- package com.nzc;
-
- import java.util.concurrent.*;
-
- /**
- * @description:
- * @author: Yihui Wang
- * @date: 2022年08月21日 15:58
- */
- public class AsyncDemo {
-
- public static ExecutorService executorService = new ThreadPoolExecutor(
- 10,
- 100,
- 30L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(100),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.DiscardOldestPolicy());
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- System.out.println("主线程开始");
- CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
- try {
- Thread.sleep(500L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("通过CompletableFuture.runAsync创建一个简单的异步任务~");
- // 另外此处还可以填写第二个参数,放进自定义线程池中执行
- },executorService);
- //runAsync.isDone() 可以判断任务是否已经 完成
- System.out.println("任务是否完成==>" + runAsync.isDone());
- //这里是阻塞式等待任务完成
- runAsync.get();
- System.out.println("主线程结束");
- System.out.println("任务是否完成==>" + runAsync.isDone());
- }
- }
-
- 复制代码
CompletableFuture.supplyAsync()创建有返回值的简单异步任务
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
- return "我是由宁在春创建的有返回结果的异步任务";
- }, executorService);
- // 如果只有一条返回语句,还可以写的更加简便
- //CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "我是有返回结果的异步任务", executorService);
-
- //这里同样也是阻塞式的
- String result = supplyAsync.get();
- System.out.println("异步任务执行的回调结果:==>"+result);
- }
- 复制代码

简单说就是,这两个方法就是将执行任务的线程排起来,执行完一个,接着再执行第二个。并且它不需要接收上一个任务的结果,也不会返回结果,一定程度上来说,它的应用场景并不是特别高。
- public static ExecutorService executorService = new ThreadPoolExecutor(
- 10,
- 100,
- 30L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(100),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.DiscardOldestPolicy());
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- thenRun();
- thenRunAsync();
- }
-
- public static void thenRun() throws ExecutionException, InterruptedException {
- long startTime = System.currentTimeMillis();
- System.out.println("主线程开始1");
- CompletableFuture<String> future =
- CompletableFuture.supplyAsync(() -> {
- System.out.println("我是一个无需传参也没有返回值的简单异步任务1");
- return "我是宁在春";
- });
- CompletableFuture<Void> thenRun = future.thenRun(() -> {
- try {
- Thread.sleep(500L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("等待任务1执行完后,我再执行任务2");
- });
- CompletableFuture<Void> thenRun1 = future.thenRun(() -> {
- try {
- Thread.sleep(500L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("等待任务1执行完后,我再执行任务3");
- });
- CompletableFuture<Void> thenRun2 = future.thenRun(() -> {
- try {
- Thread.sleep(500L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("等待任务1执行完后,我再执行任务4");
- });
- future.get();
- long endTime = System.currentTimeMillis();
- System.out.println("主线程结束1,耗费时间为:"+(endTime-startTime));
- }
-
- public static void thenRunAsync() throws ExecutionException, InterruptedException {
- long startTime = System.currentTimeMillis();
- System.out.println("主线程开始2");
- CompletableFuture<String> future =
- CompletableFuture.supplyAsync(() -> {
- System.out.println("我是一个无需传参也没有返回值的简单异步任务 一");
- return "我是宁在春";
- },executorService);
- CompletableFuture<Void> thenRunAsync1 = future.thenRunAsync(() -> {
- try {
- Thread.sleep(500L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("等待任务一执行完后,我再执行任务二");
- },executorService);
-
- CompletableFuture<Void> thenRunAsync2 = future.thenRunAsync(() -> {
- try {
- Thread.sleep(500L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("等待任务一执行完后,我再执行任务三");
- },executorService);
-
- CompletableFuture<Void> thenRunAsync3 = future.thenRunAsync(() -> {
- try {
- Thread.sleep(500L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("等待任务一执行完后,我再执行任务四");
- },executorService);
-
- // 这里是让所有的阻塞,等待所有任务完成,才结束整个任务
- CompletableFuture<Void> allOf = CompletableFuture.allOf(future,thenRunAsync1, thenRunAsync2, thenRunAsync3);
- allOf.get();
- long endTime = System.currentTimeMillis();
- System.out.println("主线程结束2,耗费时间为:"+(endTime-startTime));
- }
- }
- 复制代码
小结:
浅显的说它们两个的区别的话,其实就是thenRunAsync可异步执行,thenRun不可异步执行,不过都可以异步的阻塞式等待结果的返回。
在案例中我是自己手动创建了线程池,但其实就算我没有手动创建线程池,当调用thenRunAsync方法,它也是放在异步线程中执行的。
源码比较:
- public CompletableFuture<Void> thenRun(Runnable action) {
- return uniRunStage(null, action);
- }
-
- public CompletableFuture<Void> thenRunAsync(Runnable action) {
- return uniRunStage(asyncPool, action);
- }
-
- public CompletableFuture<Void> thenRunAsync(Runnable action,
- Executor executor) {
- return uniRunStage(screenExecutor(executor), action);
- }
-
- /**
- * Default executor -- ForkJoinPool.commonPool() unless it cannot
- * support parallelism.
- */
- private static final Executor asyncPool = useCommonPool ?
- ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
-
- 复制代码
thenRun它是同第一个任务是同一个线程,所以当第一个任务结束后,它才会开始执行任务。thenRunAsync它则是不一样的,如果我传入我自定义的线程池,它就是放入我们自定义的线程池进行运行,如果传线程池这个参数的话,就是默认使用ForkJoin线程池之后的类比区别也是同样的,总共是三组这样的方法。

thenApply 和 thenApplyAsync 让线程成为了一种串行化的关系,第一个任务执行完的返回值会作为第二个的任务的入参.
案例的话,比较简单.
- package com.nzc;
-
- import java.util.concurrent.*;
-
- /**
- * @description:
- * @author: Yihui Wang
- * @date: 2022年08月21日 16:32
- */
- public class ThenApplyAndAsyncDemo {
-
-
- public static ExecutorService executorService = new ThreadPoolExecutor(
- 10,
- 100,
- 30L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(100),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.DiscardOldestPolicy());
-
- /**
- * @param args
- */
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- thenApply();
- thenApplyAsync();
- }
-
-
- /**
- * 线程串行化
- * 1、我进入商场
- * 2、找到了我要买的商品
- * 3、准备付款结账
- * 4、拿着东西回家!!!
- * 你会发现这是一步扣一步的在走,其实业务场景中也有很多这样的场景,希望大家在遇到的时候能够想到
- *
- * @return
- * @throws ExecutionException
- * @throws InterruptedException
- */
- public static String thenApply() throws ExecutionException, InterruptedException {
- System.out.println("主线程开始1");
- // CompletableFuture<String> future =
- // CompletableFuture.supplyAsync(() -> {
- // return "我进入商场, ";
- // });
- // CompletableFuture<String> future1 = future.thenApply(res -> {
- // return res += "找到了我要买的商品,";
- // });
- // future.thenApply(res->{
- // return res+="准备付款结账,";
- // }).thenApply(res->{
- // return res+="拿着东西回家!!!";
- // });
-
- // 上面那种分开写和下面这种链式写法是一样的
- CompletableFuture<String> future =
- CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getId());
- return "我进入商场, ";
-
- }).thenApply(res -> {
- System.out.println(Thread.currentThread().getId());
- return res += "找到了我要买的商品,";
- }).thenApply(res -> {
- System.out.println(Thread.currentThread().getId());
- return res += "准备付款结账,";
- }).thenApply(res -> {
- return res += "拿着东西回家!!!";
- });
- String result = future.get();
- System.out.println("主线程1结束, 子线程的结果为:" + result);
- return result;
- }
-
- /**
- * 这里因为是异步的原因,它们之间倒是没有一个顺序上的规范
- *
- * @return
- * @throws ExecutionException
- * @throws InterruptedException
- */
- public static String thenApplyAsync() throws ExecutionException, InterruptedException {
- System.out.println("主线程2开始");
- CompletableFuture<String> future =
- CompletableFuture.supplyAsync(() -> {
- return "我进入商场, ";
- },executorService).thenApplyAsync(res -> {
- System.out.println(Thread.currentThread().getId());
- return res += "找到了我要买的商品,";
- },executorService).thenApplyAsync(res -> {
- try {
- System.out.println(Thread.currentThread().getId());
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- return res += "准备付款结账,";
- },executorService).thenApplyAsync(res -> {
- System.out.println(Thread.currentThread().getId());
-
- return res += "拿着东西回家!!!";
- });
- String result = future.get();
- System.out.println("主线程2结束, 子线程的结果为:" + result);
- return result;
- }
- }
- 复制代码
小结:
thenApply 和 thenApplyAsync 本质上就是将它们串起来了,必须要先完成第一个任务,才能接着做下面的任务
这里的本质区别和前面和之前说的还是一样
但是你如果仔细看了案例代码,你会发现我在里面打印了线程ID. 并且我在测试的时候,尝试将放入自定义线程池和不放入两种情况,实际上 thenApplyAsync 执行的任务线程确实不是一个.
但效果其实和 thenApply 是一样的,都需要等待上一个任务完成。
注意我说的是效果,并非是内部的执行机制。再说就又得进去看源码了...

如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept()和 thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。
thenAccept消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
thenAcceptAsync则是异步的消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
- package com.nzc;
-
- import java.util.concurrent.*;
- import java.util.function.Consumer;
-
- /**
- * @description:
- * @author: Yihui Wang
- * @date: 2022年08月21日 17:21
- */
- public class ThenAcceptDemo {
-
- public static ExecutorService executorService = new ThreadPoolExecutor(
- 10,
- 100,
- 30L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(100),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.DiscardOldestPolicy());
-
-
- public static void main(String[] args) throws Exception {
- thenAccept();
- thenAcceptAsync();
- }
-
- private static String action1 = "";
-
- public static void thenAccept() {
- System.out.println("主线程开始");
- CompletableFuture.supplyAsync(() -> {
- try {
- action1 = "逛jd,想买台电脑~ ";
- } catch (Exception e) {
- e.printStackTrace();
- }
- return action1;
- }).thenApply(string -> {
- return action1 + "选中了,付款,下单成功!!";
- }).thenApply(String -> {
- return action1 + "等待快递到来";
- }).thenAccept((res) -> {
- System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + res);
- });
- }
-
- private static String action2 = "";
-
- public static void thenAcceptAsync() {
- System.out.println("主线程开始");
- CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
- try {
- action2 = "逛jd,想买台电脑~ ";
- } catch (Exception e) {
- e.printStackTrace();
- }
- return action2;
- }).thenApply(string -> {
- return action2 + "选中了,付款,下单成功!!";
- }).thenApply(String -> {
- return action2 + "等待快递到来";
- });
-
- // 这里不采用链式写法,是因为thenAcceptAsync 无返回值,
- // 第二个thenAcceptAsync 连接在第一个thenAcceptAsync后,会没有入参值
- // 就都拿出来了。
- future.thenAcceptAsync((res) -> {
- System.out.println("线程ID"+Thread.currentThread().getId()+"拿到依任务一二的返回结果,===>异步的执行任务三,晚饭时间了,打算一边看电影");
- },executorService);
- future.thenAcceptAsync((res) -> {
- System.out.println("线程ID"+Thread.currentThread().getId()+"拿到依任务一二的返回结果,===>异步的执行任务四,一边干饭~");
- },executorService);
- }
- }
-
- 复制代码
thenAcceptAsync也是我们今天文章开头中用到的,异步编排式的组合视图结果集。
这一部分平时用的倒是不少,也比较方便~
上面说了这么多,但是万一我们在执行某个任务的时候出现异常该如何处理呢?
别慌,它也封装好了的。
exceptionally 异常处理,出现异常时触发,可以回调给你一个从原始Future中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。
一般而言,exceptionally都是写到方法调用的末尾,以来出来中间过程中会出现的异常。
另外就是 handle 也可以用来处理异常。

- public class ExceptionallyDemo {
- public static void main(String[] args) throws Exception{
- System.out.println("主线程开始");
- CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
- int i= 1/0;
- System.out.println("子线程执行中");
- return i;
- }).exceptionally(ex -> {
- System.out.println(ex.getMessage());
- return -1;
- });
- System.out.println(future.get());
- }
- }
-
- //主线程开始
- //java.lang.ArithmeticException: / by zero
- //-1
- 复制代码
- public static void main(String[] args) throws Exception {
- CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
- System.out.println("任务开始");
- int i = 0 / 1;
- return i;
- }).handle((i, ex) -> {
- System.out.println("进入 handle 方法");
- if (ex != null) {
- System.out.println("发生了异常,内容为:" + ex.getMessage());
- return -1;
- } else {
- System.out.println("正常完成,内容为: " + i);
- return i;
- }
- });
- }
- 复制代码
handle是有入参和带返回值的,入参是之前任务执行的结果。
当然一切具体的使用还是要看业务场景啦
thenCompose 合并两个有依赖关系的 CompletableFutures的执行结果,有入参有返回值。
它的入参是第一个future和第一二两个的任何的返回结果。
thenAcceptBoth则是会将两个任务的执行结果作为方法入参,传递到指定方法中,但无返回值
runAfterBoth 则是不会把执行结果当做方法入参,也没有返回值。
- package com.nzc;
-
- import java.util.WeakHashMap;
- import java.util.concurrent.*;
-
- /**
- * @description:
- * @author: Yihui Wang
- * @date: 2022年08月21日 17:53
- */
- public class ThenCombineDemo {
-
- public static void main(String[] args) throws Exception {
- test();
- }
-
- private static Integer num = 10;
- public static void test() throws Exception {
- System.out.println("主线程开始");
- //第一步加 10
- CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
- System.out.println("第一个任务:让num+10;任务开始");
- num += 10;
- return num;
- });
- CompletableFuture<String > future1 = CompletableFuture.supplyAsync(() -> {
- System.out.println("第二个任务:让num+1;任务开始");
- return num + 1;
- //它的入参是第一个future和第一二两个的任何的返回结果。
- }).thenCombine(future,(w,s)->{
- System.out.println("任务一的结果==>"+w);
- System.out.println("任务二的结果==>"+s);
- return "我是两个任务的合并"+(w+s);
- });
- System.out.println(future.get());
- System.out.println(future1.get());
- }
- }
- /**
- * 主线程开始
- * 第一个任务:让num+10;任务开始
- * 第二个任务:让num+1;任务开始
- * 任务一的结果==>21
- * 任务二的结果==>20
- * 20
- * 我是两个任务的合并41
- */
- 复制代码
thenAcceptBoth
- package com.nzc;
-
- import java.util.WeakHashMap;
- import java.util.concurrent.*;
-
- /**
- * @description:
- * @author: Yihui Wang
- * @date: 2022年08月21日 17:53
- */
- public class ThenCombineDemo {
-
- public static void main(String[] args) throws Exception {
- test();
- }
-
- private static Integer num = 10;
- public static void test() throws Exception {
- System.out.println("主线程开始");
- //第一步加 10
- CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
- System.out.println("第一个任务:让num+10;任务开始");
- num += 10;
- return num;
- });
- CompletableFuture<Void > future1 = CompletableFuture.supplyAsync(() -> {
- System.out.println("第二个任务:让num+1;任务开始");
- return num + 1;
- }).thenAcceptBoth(future,(w,s)->{
- System.out.println("任务一的结果==>"+w);
- System.out.println("任务二的结果==>"+s);
- System.out.println( "我是两个任务的合并"+(w+s)+"但是我没有返回值");
- });
- System.out.println("任务一的结果==>"+future.get());
- // 不采用链式写法,任务二实际上是有返回值,大家看业务场景写即可
- System.out.println("任务二后接上thenAcceptBoth方法的结果==>"+future1.get());
- }
- }
- /**
- 主线程开始
- 第一个任务:让num+10;任务开始
- 第二个任务:让num+1;任务开始
- 任务一的结果==>21
- 任务二的结果==>20
- 我是两个任务的合并41但是我没有返回值
- 任务一的结果==>20
- 任务二后接上thenAcceptBoth方法的结果==>null
- */
- 复制代码
runAfterBoth
- public static void test2(){
- //第一步加 10
- CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
- System.out.println("第一个任务:让num+10;任务开始");
- num += 10;
- return num;
- });
- CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
- System.out.println("第一个任务:让num+10;任务开始");
- num += 10;
- return num;
- });
- future2.runAfterBoth(future,()->{
- System.out.println("不会把执行结果当做方法入参,也没有返回值");
- });
-
- }
- 复制代码
除了这些外,CompletableFuture还有我之前案例中就已经用到的allof和anyOf
allOf: 一系列独立的 future任务,等其所有的任务执行完后做一些事情.
- public class CompletableFutureDemo9 {
-
- private static Integer num = 10;
-
-
- public static void main(String[] args) throws Exception{
- System.out.println("主线程开始");
- List<CompletableFuture> list = new ArrayList<>();
-
- CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
- System.out.println("加 10 任务开始");
- num += 10;
- return num;
- });
- list.add(job1);
-
- CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
- System.out.println("乘以 10 任务开始");
- num = num * 10;
- return num;
- });
- list.add(job2);
-
- CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
- System.out.println("减以 10 任务开始");
- num = num - 10;
- return num;
- });
- list.add(job3);
-
- CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
- System.out.println("除以 10 任务开始");
- num = num / 10;
- return num;
- });
- list.add(job4);
-
- //多任务合并
- List<Integer> collect =
- list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
- System.out.println(collect);
- }
-
- }
- /**主线程开始
- 乘以 10 任务开始
- 加 10 任务开始
- 减以 10 任务开始
- 除以 10 任务开始
- [110, 100, 100, 10]
- */
- 复制代码
allof的除了在合并结果时经常用到以外,像我们今天案例它也用到了它的get()方法,在那里使用的作用时,阻塞式的等待所有的任务结束,才结束方法的调用。
anyOf: 只要在多个 future里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束
- public class CompletableFutureDemo10 {
-
- private static Integer num = 10;
- /**
- * 先对一个数加 10,然后取平方
- * @param args
- */
- public static void main(String[] args) throws Exception{
- System.out.println("主线程开始");
-
- CompletableFuture<Integer>[] futures = new CompletableFuture[4];
- CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
- try{
- Thread.sleep(5000);
- System.out.println("加 10 任务开始");
- num += 10;
- return num;
- }catch (Exception e){
- return 0;
- }
- });
- futures[0] = job1;
- CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
- try{
- Thread.sleep(2000);
- System.out.println("乘以 10 任务开始");
- num = num * 10;
- return num;
- }catch (Exception e){
- return 1;
- }
- });
- futures[1] = job2;
- CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
- try{
- Thread.sleep(3000);
- System.out.println("减以 10 任务开始");
- num = num - 10;
- return num;
- }catch (Exception e){
- return 2;
- }
- });
- futures[2] = job3;
- CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
- try{
- Thread.sleep(4000);
- System.out.println("除以 10 任务开始");
- num = num / 10;
- return num;
- }catch (Exception e){
- return 3;
- }
- });
- futures[3] = job4;
- CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
- System.out.println(future.get());
- }
- }
- //主线程开始
- //乘以 10 任务开始
- //100
- 复制代码
1、一般来讲,如果要使用线程的话,都应该是自定义线程,这点阿里Java开发规范中也有说到。
2、自定义线程池的话,一定要把参数设置合理,这点没啥可说的,都得测,空谈都是大话,
3、今天的案例,我在最后调用了 get()方法,一直阻塞到所有任务完成,所以你在编排的时候,一定要注意你需不需要任务的返回结果,不然很可能会产生一些数据方面问题。
4、关于异常我写到后面心有些浮躁,写的不是非常精细。获取异常信息,future需要获取返回值,才能获取异常信息。
今天最想说的就是 “温故而知新”
这方面的知识在去年,我其实已经学过一遍,但应用场景一少,你就会慢慢忘记它的存在。
另外想要说明的是基础我觉得是十分重要的。