• CompletableFuture 异步编排、案例及应用小案例


    前言

    今天的话,就来以一个应用场景来进行一步一步的推导,在实现案例的过程中,将CompletableFuture相关的知识点逐步讲述明白。

    应用场景如下

    我们在查询掘金文章页面数据为应用场景,如何使用异步编程进行优化。

    以掘金展示页面为例,点进文章页面时,大致需要渲染的数据为以下几点:

    1.  //1. 文章内容信息 1s
    2.  //2. 作者相关信息 0.5s 依赖于1的查询结果
    3.  //3. 文章评论信息 0.5s 依赖于1的查询结果
    4.  //4. 文章分类信息 0.5s 依赖于1的查询结果
    5.  //5. 文章专栏信息 0.5s 依赖于1的查询结果
    6.  //6. 相关文章信息 0.5s 依赖于1的查询结果
    7.  //...
    8. 复制代码

    补充:这是我随意拆分的,里面具体的表结构和接口请求先后的关系,以及具体的请求时间都是比较随意的,具体想要陈述的就是同步和异步编程的关系。

    那么我们就要根据这个组装一个视图数据来进行返回。

    注意:实际上并非是如此,掘金文章页面内容的数据是多个接口返回的,我只是为了模拟内容,这么写罢了,切勿当真,真正应用还需要分业务场景,或者应用场景中可异步编排,到那个时候希望大家能应用上。


    现在来说:按照以前我们以前串行化的执行方式,那么总花费的时间就是3.5s,也就是从一开始执行到六,无疑这样是非常慢的,并且 2,3,4,5,6都是依赖于 1的结果查询,但2,3,4,5,6并不互相依赖,此时我们可以将他们从串行化变成异步执行,自己准备一个线程池,然后在执行的时候,将它们放进线程池中异步运行。

    如此总耗费时间就从原来的 3.5s 变成了 1.5s,编程思想的改变,对于性能还是有一定程度的提高的

    接下来我们就开始接触CompletableFuture

    一、CompletableFuture 引入之前

    再讲CompletableFuture之前,我还是秉承着一贯的理念,先讲述一些之前的东西,然后再将它引入进来,不至于让大家对于它的出现处于一种非常迷茫的状态。


    在之前我们如果只是普通异步的执行一个任务,无需返回结果的话,只要将一个任务实现 Runnable接口,然后将放进线程池即可。

    如果需要返回结果,就让任务实现Callable接口,但实际上Callable与 Thread 并没有任何关系,Callable 还需要使用Future与线程建立关系,然后再让线程池执行,最后通过futureTask.get()方法来获取执行的返回结果。

    futureTaskFuture接口的一个基本实现。

    (Callable 类似于Runnable 接口,但 Runnable 接口中的 run()方法不会返回结果,并且也无法抛出经过检查的异常,但是 Callable中 call()方法能够返回计算结果,并且也能够抛出经过检查的异常。)

    一个小案例:

    1.  /**
    2.   * @description:
    3.   * @author: Yihui Wang
    4.   * @date: 2022082111:34
    5.   */
    6.  public class Demo {
    7.      public static ExecutorService executorService = new ThreadPoolExecutor(
    8.              10,
    9.              100,
    10.              30L,
    11.              TimeUnit.SECONDS,
    12.              new LinkedBlockingQueue<Runnable>(100),
    13.              Executors.defaultThreadFactory(),
    14.              new ThreadPoolExecutor.DiscardOldestPolicy());
    15.  ​
    16.      public static void runnableTest(){
    17.          RunnableTest runnableTest = new RunnableTest();
    18.          executorService.submit(runnableTest);
    19.     }
    20.  ​
    21.      public static void callableTest() throws ExecutionException, InterruptedException, TimeoutException {
    22.          CallableAndFutureTest callableAndFutureTest = new CallableAndFutureTest();
    23.          FutureTask<String> task = new FutureTask<>(callableAndFutureTest);
    24.          // 采用线程池执行完程序并不会结束, 如果是想测试一次性的那种 可以采用
    25.          // new Thread(task).start();
    26.          executorService.submit(task);
    27.          //System.out.println("尝试取消任务,传true表示取消任务,false则不取消任务::"+task.cancel(true));
    28.          System.out.println("判断任务是否已经完成::"+task.isDone());
    29.          //结果已经计算出来,则立马取出来,如若摸没有计算出来,则一直等待,直到结果出来,或任务取消或发生异常。
    30.          System.out.println("阻塞式获取结果::"+task.get());
    31.          System.out.println("在获取结果时,给定一个等待时间,如果超过等待时间还未获取到结果,则会主动抛出超时异常::"+task.get(2, TimeUnit.SECONDS));
    32.     }
    33.      public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
    34.          runnableTest();
    35.          callableTest();
    36.     }
    37.  ​
    38.  }
    39.  ​
    40.  class RunnableTest implements Runnable{
    41.      @Override
    42.      public void run() {
    43.          System.out.println("我是Runnable执行的结果,我无法返回结果");
    44.     }
    45.  }
    46.  class CallableAndFutureTest implements Callable<String> {
    47.      @Override
    48.      public String call() throws Exception {
    49.          String str = "";
    50.          for (int i = 0; i < 10; i++) {
    51.              str += String.valueOf(i);
    52.              Thread.sleep(100);
    53.         }
    54.          return str;
    55.     }
    56.  }
    57.  ​
    58. 复制代码

    看起来Callable搭配Future好像已经可以实现我们今天要实现的效果了,从结果的意义上来说,确实可以,但是并不优雅,也会存在一些问题。

    如果多个线程之间存在依赖组合,该如何呢?

    这个时候就轮到 CompletableFuture出现了~

    二、CompletableFuture 案例

    我先直接将实现应用场景的效果代码写出来,然后再接着慢慢的去讲

    1.  package com.nzc;
    2.  ​
    3.  import lombok.Data;
    4.  ​
    5.  import java.util.concurrent.*;
    6.  ​
    7.  /**
    8.   * @description:
    9.   * @author: Yihui Wang
    10.   * @date: 2022082111:48
    11.   */
    12.  public class CompletableFutureDemo {
    13.  ​
    14.      public static ExecutorService executorService = new ThreadPoolExecutor(
    15.              10,
    16.              100,
    17.              30L,
    18.              TimeUnit.SECONDS,
    19.              new LinkedBlockingQueue<Runnable>(100),
    20.              Executors.defaultThreadFactory(),
    21.              new ThreadPoolExecutor.DiscardOldestPolicy());
    22.  ​
    23.      public static ArticleVO asyncReturn(){
    24.          ArticleVO article=new ArticleVO();
    25.          long startTime=System.currentTimeMillis();
    26.          CompletableFuture<ArticleVO> articleContent = CompletableFuture.supplyAsync(() -> {
    27.              try {
    28.                  article.setId(1L);
    29.                  article.setContent("我是宁在春写的文章内容");
    30.                  Thread.sleep(1000);
    31.             } catch (Exception e) {
    32.                  e.printStackTrace();
    33.             }
    34.              return article;
    35.         },executorService);
    36.  ​
    37.          // 这里的res 就是第一个个 CompletableFuture 执行完返回的结果
    38.          // 如果要测试它们的异步性,其实应该在下面的所有执行中,都让它们沉睡一会,这样效果会更加明显
    39.          // executorService 是放到我们自己创建的线程池中运行
    40.          CompletableFuture<Void> author = articleContent.thenAcceptAsync((res) -> {
    41.              res.setAuthor(res.getId()+"的作者是宁在春");
    42.         },executorService);
    43.  ​
    44.          CompletableFuture<Void> articleComment = articleContent.thenAcceptAsync((res) -> {
    45.              res.setComment(res.getId()+"的相关评论");
    46.         },executorService);
    47.  ​
    48.          CompletableFuture<Void> articleCategory = articleContent.thenAcceptAsync((res) -> {
    49.              res.setCategory(res.getId()+"的分类信息");
    50.         },executorService);
    51.  ​
    52.          CompletableFuture<Void> articleColumn = articleContent.thenAcceptAsync((res) -> {
    53.              res.setColumn(res.getId()+"的文章专栏信息");
    54.         },executorService);
    55.  ​
    56.          CompletableFuture<Void> recommend = articleContent.thenAcceptAsync((res) -> {
    57.              res.setRecommend(res.getId()+"的文章推荐信息");
    58.         },executorService);
    59.  ​
    60.          CompletableFuture<Void> futureAll = CompletableFuture.allOf(articleContent, author, articleComment, articleCategory, articleColumn, recommend);
    61.  ​
    62.          try {
    63.              // get() 是一个阻塞式方法 这里是阻塞式等待所有结果返回
    64.              // 因为要等待所有结果返回,才允许方法的结束,否则一些还在执行中,但是方法已经返回,就会造成一些错误。
    65.              futureAll.get();
    66.         } catch (InterruptedException e) {
    67.              e.printStackTrace();
    68.         } catch (ExecutionException e) {
    69.              e.printStackTrace();
    70.         }
    71.          long endTime=System.currentTimeMillis();
    72.          System.out.println("耗费的总时间===>"+(endTime-startTime));
    73.  ​
    74.          // 所有任务执行完成后,将构建出来的视图结果进行返回
    75.          return article;
    76.     }
    77.  ​
    78.      public static void main(String[] args) {
    79.          ArticleVO articleVO = asyncReturn();
    80.          System.out.println(articleVO);
    81.     }
    82.  ​
    83.  }
    84.  ​
    85.  @Data
    86.  class ArticleVO {
    87.      private Long id;
    88.      private String content;
    89.      private String author;
    90.      private String comment;
    91.      private String category;
    92.      private String column;
    93.      private String recommend;
    94.  }
    95. 复制代码

    这里就是对应着应用场景里的那几步,引入以下lombok包就可以直接测试了。

    为了更好的看出效果,也可以在执行某个任务的时候,让它睡上一会。

    三、CompletableFuture 详解

    看完上面的例子,算是看到他是如何的啦,接下来还是需要详细说一说的,思维导图如下:

    3.1、通过 CompletableFuture 创建普通异步任务

    CompletableFuture.runAsync()创建无返回值的简单异步任务 Executor表示线程池~

    1.  package com.nzc;
    2.  ​
    3.  import java.util.concurrent.*;
    4.  ​
    5.  /**
    6.   * @description:
    7.   * @author: Yihui Wang
    8.   * @date: 2022082115:58
    9.   */
    10.  public class AsyncDemo {
    11.  ​
    12.      public static ExecutorService executorService = new ThreadPoolExecutor(
    13.              10,
    14.              100,
    15.              30L,
    16.              TimeUnit.SECONDS,
    17.              new LinkedBlockingQueue<Runnable>(100),
    18.              Executors.defaultThreadFactory(),
    19.              new ThreadPoolExecutor.DiscardOldestPolicy());
    20.  ​
    21.      public static void main(String[] args) throws ExecutionException, InterruptedException {
    22.          System.out.println("主线程开始");
    23.          CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
    24.              try {
    25.                  Thread.sleep(500L);
    26.             } catch (InterruptedException e) {
    27.                  e.printStackTrace();
    28.             }
    29.              System.out.println("通过CompletableFuture.runAsync创建一个简单的异步任务~");
    30.              // 另外此处还可以填写第二个参数,放进自定义线程池中执行
    31.         },executorService);
    32.          //runAsync.isDone() 可以判断任务是否已经 完成
    33.          System.out.println("任务是否完成==>" + runAsync.isDone());
    34.          //这里是阻塞式等待任务完成
    35.          runAsync.get();
    36.          System.out.println("主线程结束");
    37.          System.out.println("任务是否完成==>" + runAsync.isDone());
    38.     }
    39.  }
    40.  ​
    41. 复制代码

    CompletableFuture.supplyAsync()创建有返回值的简单异步任务

    1.  public static void main(String[] args) throws ExecutionException, InterruptedException {
    2.      CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
    3.          return "我是由宁在春创建的有返回结果的异步任务";
    4.     }, executorService);
    5.      // 如果只有一条返回语句,还可以写的更加简便
    6.      //CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "我是有返回结果的异步任务", executorService);
    7.  ​
    8.      //这里同样也是阻塞式的
    9.      String result = supplyAsync.get();
    10.      System.out.println("异步任务执行的回调结果:==>"+result);
    11.  }
    12. 复制代码

    3.2、thenRun/thenRunAsync

    简单说就是,这两个方法就是将执行任务的线程排起来,执行完一个,接着再执行第二个。并且它不需要接收上一个任务的结果,也不会返回结果,一定程度上来说,它的应用场景并不是特别高。

    1.  public static ExecutorService executorService = new ThreadPoolExecutor(
    2.      10,
    3.      100,
    4.      30L,
    5.      TimeUnit.SECONDS,
    6.      new LinkedBlockingQueue<Runnable>(100),
    7.      Executors.defaultThreadFactory(),
    8.      new ThreadPoolExecutor.DiscardOldestPolicy());
    9.  /**
    10.       * @param args
    11.       */
    12.  public static void main(String[] args) throws Exception {
    13.      thenRun();
    14.      thenRunAsync();
    15.  }
    16.  ​
    17.  public static void thenRun() throws ExecutionException, InterruptedException {
    18.      long startTime = System.currentTimeMillis();
    19.      System.out.println("主线程开始1");
    20.      CompletableFuture<String> future =
    21.          CompletableFuture.supplyAsync(() -> {
    22.              System.out.println("我是一个无需传参也没有返回值的简单异步任务1");
    23.              return "我是宁在春";
    24.         });
    25.      CompletableFuture<Void> thenRun = future.thenRun(() -> {
    26.          try {
    27.              Thread.sleep(500L);
    28.         } catch (InterruptedException e) {
    29.              e.printStackTrace();
    30.         }
    31.          System.out.println("等待任务1执行完后,我再执行任务2");
    32.     });
    33.      CompletableFuture<Void> thenRun1 = future.thenRun(() -> {
    34.          try {
    35.              Thread.sleep(500L);
    36.         } catch (InterruptedException e) {
    37.              e.printStackTrace();
    38.         }
    39.          System.out.println("等待任务1执行完后,我再执行任务3");
    40.     });
    41.      CompletableFuture<Void> thenRun2 = future.thenRun(() -> {
    42.          try {
    43.              Thread.sleep(500L);
    44.         } catch (InterruptedException e) {
    45.              e.printStackTrace();
    46.         }
    47.          System.out.println("等待任务1执行完后,我再执行任务4");
    48.     });
    49.      future.get();
    50.      long endTime = System.currentTimeMillis();
    51.      System.out.println("主线程结束1,耗费时间为:"+(endTime-startTime));
    52.  }
    53.  ​
    54.  public static void thenRunAsync() throws ExecutionException, InterruptedException {
    55.      long startTime = System.currentTimeMillis();
    56.      System.out.println("主线程开始2");
    57.      CompletableFuture<String> future =
    58.          CompletableFuture.supplyAsync(() -> {
    59.              System.out.println("我是一个无需传参也没有返回值的简单异步任务 一");
    60.              return "我是宁在春";
    61.         },executorService);
    62.      CompletableFuture<Void> thenRunAsync1 = future.thenRunAsync(() -> {
    63.          try {
    64.              Thread.sleep(500L);
    65.         } catch (InterruptedException e) {
    66.              e.printStackTrace();
    67.         }
    68.          System.out.println("等待任务一执行完后,我再执行任务二");
    69.     },executorService);
    70.  ​
    71.      CompletableFuture<Void> thenRunAsync2 = future.thenRunAsync(() -> {
    72.          try {
    73.              Thread.sleep(500L);
    74.         } catch (InterruptedException e) {
    75.              e.printStackTrace();
    76.         }
    77.          System.out.println("等待任务一执行完后,我再执行任务三");
    78.     },executorService);
    79.  ​
    80.      CompletableFuture<Void> thenRunAsync3 = future.thenRunAsync(() -> {
    81.          try {
    82.              Thread.sleep(500L);
    83.         } catch (InterruptedException e) {
    84.              e.printStackTrace();
    85.         }
    86.          System.out.println("等待任务一执行完后,我再执行任务四");
    87.     },executorService);
    88.  ​
    89.      // 这里是让所有的阻塞,等待所有任务完成,才结束整个任务
    90.      CompletableFuture<Void> allOf = CompletableFuture.allOf(future,thenRunAsync1, thenRunAsync2, thenRunAsync3);
    91.      allOf.get();
    92.      long endTime = System.currentTimeMillis();
    93.      System.out.println("主线程结束2,耗费时间为:"+(endTime-startTime));
    94.  }
    95.  }
    96. 复制代码

    小结

    浅显的说它们两个的区别的话,其实就是thenRunAsync可异步执行,thenRun不可异步执行,不过都可以异步的阻塞式等待结果的返回。

    在案例中我是自己手动创建了线程池,但其实就算我没有手动创建线程池,当调用thenRunAsync方法,它也是放在异步线程中执行的。

    源码比较:

    1.  public CompletableFuture<Void> thenRun(Runnable action) {
    2.      return uniRunStage(null, action);
    3.  }
    4.  ​
    5.  public CompletableFuture<Void> thenRunAsync(Runnable action) {
    6.      return uniRunStage(asyncPool, action);
    7.  }
    8.  ​
    9.  public CompletableFuture<Void> thenRunAsync(Runnable action,
    10.                                              Executor executor) {
    11.      return uniRunStage(screenExecutor(executor), action);
    12.  }
    13.  ​
    14.  /**
    15.       * Default executor -- ForkJoinPool.commonPool() unless it cannot
    16.       * support parallelism.
    17.       */
    18.  private static final Executor asyncPool = useCommonPool ?
    19.      ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    20.  ​
    21. 复制代码
    • thenRun它是同第一个任务是同一个线程,所以当第一个任务结束后,它才会开始执行任务。
    • thenRunAsync它则是不一样的,如果我传入我自定义的线程池,它就是放入我们自定义的线程池进行运行,如果传线程池这个参数的话,就是默认使用ForkJoin线程池

    之后的类比区别也是同样的,总共是三组这样的方法。

    3.3、thenApply和thenApplyAsync

    thenApply 和 thenApplyAsync 让线程成为了一种串行化的关系,第一个任务执行完的返回值会作为第二个的任务的入参.

    案例的话,比较简单.

    1.  package com.nzc;
    2.  ​
    3.  import java.util.concurrent.*;
    4.  ​
    5.  /**
    6.   * @description:
    7.   * @author: Yihui Wang
    8.   * @date: 2022082116:32
    9.   */
    10.  public class ThenApplyAndAsyncDemo {
    11.  ​
    12.  ​
    13.      public static ExecutorService executorService = new ThreadPoolExecutor(
    14.              10,
    15.              100,
    16.              30L,
    17.              TimeUnit.SECONDS,
    18.              new LinkedBlockingQueue<Runnable>(100),
    19.              Executors.defaultThreadFactory(),
    20.              new ThreadPoolExecutor.DiscardOldestPolicy());
    21.  ​
    22.      /**
    23.       * @param args
    24.       */
    25.      public static void main(String[] args) throws ExecutionException, InterruptedException {
    26.          thenApply();
    27.          thenApplyAsync();
    28.     }
    29.  ​
    30.  ​
    31.      /**
    32.       * 线程串行化
    33.       * 1、我进入商场
    34.       * 2、找到了我要买的商品
    35.       * 3、准备付款结账
    36.       * 4、拿着东西回家!!!
    37.       * 你会发现这是一步扣一步的在走,其实业务场景中也有很多这样的场景,希望大家在遇到的时候能够想到
    38.       *
    39.       * @return
    40.       * @throws ExecutionException
    41.       * @throws InterruptedException
    42.       */
    43.      public static String thenApply() throws ExecutionException, InterruptedException {
    44.          System.out.println("主线程开始1");
    45.  //       CompletableFuture<String> future =
    46.  //               CompletableFuture.supplyAsync(() -> {
    47.  //                   return "我进入商场, ";
    48.  //               });
    49.  //       CompletableFuture<String> future1 = future.thenApply(res -> {
    50.  //           return res += "找到了我要买的商品,";
    51.  //       });
    52.  //       future.thenApply(res->{
    53.  //           return res+="准备付款结账,";
    54.  //       }).thenApply(res->{
    55.  //           return res+="拿着东西回家!!!";
    56.  //       });
    57.  ​
    58.          // 上面那种分开写和下面这种链式写法是一样的
    59.          CompletableFuture<String> future =
    60.                  CompletableFuture.supplyAsync(() -> {
    61.                      System.out.println(Thread.currentThread().getId());
    62.                      return "我进入商场, ";
    63.  ​
    64.                 }).thenApply(res -> {
    65.                      System.out.println(Thread.currentThread().getId());
    66.                      return res += "找到了我要买的商品,";
    67.                 }).thenApply(res -> {
    68.                      System.out.println(Thread.currentThread().getId());
    69.                      return res += "准备付款结账,";
    70.                 }).thenApply(res -> {
    71.                      return res += "拿着东西回家!!!";
    72.                 });
    73.          String result = future.get();
    74.          System.out.println("主线程1结束, 子线程的结果为:" + result);
    75.          return result;
    76.     }
    77.  ​
    78.      /**
    79.       * 这里因为是异步的原因,它们之间倒是没有一个顺序上的规范
    80.       *
    81.       * @return
    82.       * @throws ExecutionException
    83.       * @throws InterruptedException
    84.       */
    85.      public static String thenApplyAsync() throws ExecutionException, InterruptedException {
    86.          System.out.println("主线程2开始");
    87.          CompletableFuture<String> future =
    88.                  CompletableFuture.supplyAsync(() -> {
    89.                      return "我进入商场, ";
    90.                 },executorService).thenApplyAsync(res -> {
    91.                      System.out.println(Thread.currentThread().getId());
    92.                      return res += "找到了我要买的商品,";
    93.                 },executorService).thenApplyAsync(res -> {
    94.                      try {
    95.                          System.out.println(Thread.currentThread().getId());
    96.                          Thread.sleep(1000L);
    97.                     } catch (InterruptedException e) {
    98.                          e.printStackTrace();
    99.                     }
    100.  ​
    101.                      return res += "准备付款结账,";
    102.                 },executorService).thenApplyAsync(res -> {
    103.                      System.out.println(Thread.currentThread().getId());
    104.  ​
    105.                      return res += "拿着东西回家!!!";
    106.                 });
    107.          String result = future.get();
    108.          System.out.println("主线程2结束, 子线程的结果为:" + result);
    109.          return result;
    110.     }
    111.  }
    112. 复制代码

    小结:

    thenApply 和 thenApplyAsync 本质上就是将它们串起来了,必须要先完成第一个任务,才能接着做下面的任务

    这里的本质区别和前面和之前说的还是一样

    但是你如果仔细看了案例代码,你会发现我在里面打印了线程ID. 并且我在测试的时候,尝试将放入自定义线程池和不放入两种情况,实际上 thenApplyAsync 执行的任务线程确实不是一个.

    但效果其实和 thenApply 是一样的,都需要等待上一个任务完成。

    注意我说的是效果,并非是内部的执行机制。再说就又得进去看源码了...

    3.4、thenAccept 和 thenAcceptAsync

    如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept()thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。

    thenAccept消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。

    thenAcceptAsync则是异步的消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。

    1.  package com.nzc;
    2.  ​
    3.  import java.util.concurrent.*;
    4.  import java.util.function.Consumer;
    5.  ​
    6.  /**
    7.   * @description:
    8.   * @author: Yihui Wang
    9.   * @date: 2022082117:21
    10.   */
    11.  public class ThenAcceptDemo {
    12.  ​
    13.      public static ExecutorService executorService = new ThreadPoolExecutor(
    14.              10,
    15.              100,
    16.              30L,
    17.              TimeUnit.SECONDS,
    18.              new LinkedBlockingQueue<Runnable>(100),
    19.              Executors.defaultThreadFactory(),
    20.              new ThreadPoolExecutor.DiscardOldestPolicy());
    21.  ​
    22.  ​
    23.      public static void main(String[] args) throws Exception {
    24.          thenAccept();
    25.          thenAcceptAsync();
    26.     }
    27.  ​
    28.      private static String action1 = "";
    29.  ​
    30.      public static void thenAccept() {
    31.          System.out.println("主线程开始");
    32.          CompletableFuture.supplyAsync(() -> {
    33.              try {
    34.                  action1 = "逛jd,想买台电脑~ ";
    35.             } catch (Exception e) {
    36.                  e.printStackTrace();
    37.             }
    38.              return action1;
    39.         }).thenApply(string -> {
    40.              return action1 + "选中了,付款,下单成功!!";
    41.         }).thenApply(String -> {
    42.              return action1 + "等待快递到来";
    43.         }).thenAccept((res) -> {
    44.              System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + res);
    45.         });
    46.     }
    47.  ​
    48.      private static String action2 = "";
    49.  ​
    50.      public static void thenAcceptAsync() {
    51.          System.out.println("主线程开始");
    52.          CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    53.              try {
    54.                  action2 = "逛jd,想买台电脑~ ";
    55.             } catch (Exception e) {
    56.                  e.printStackTrace();
    57.             }
    58.              return action2;
    59.         }).thenApply(string -> {
    60.              return action2 + "选中了,付款,下单成功!!";
    61.         }).thenApply(String -> {
    62.              return action2 + "等待快递到来";
    63.         });
    64.          
    65.          // 这里不采用链式写法,是因为thenAcceptAsync 无返回值,
    66.          // 第二个thenAcceptAsync 连接在第一个thenAcceptAsync后,会没有入参值
    67.          // 就都拿出来了。
    68.          future.thenAcceptAsync((res) -> {
    69.              System.out.println("线程ID"+Thread.currentThread().getId()+"拿到依任务一二的返回结果,===>异步的执行任务三,晚饭时间了,打算一边看电影");
    70.         },executorService);
    71.          future.thenAcceptAsync((res) -> {
    72.              System.out.println("线程ID"+Thread.currentThread().getId()+"拿到依任务一二的返回结果,===>异步的执行任务四,一边干饭~");
    73.         },executorService);
    74.     }
    75.  }
    76.  ​
    77. 复制代码

    thenAcceptAsync也是我们今天文章开头中用到的,异步编排式的组合视图结果集。

    这一部分平时用的倒是不少,也比较方便~

    上面说了这么多,但是万一我们在执行某个任务的时候出现异常该如何处理呢?

    别慌,它也封装好了的。

    3.5、exceptionally 和 handle

    exceptionally 异常处理,出现异常时触发,可以回调给你一个从原始Future中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。

    一般而言,exceptionally都是写到方法调用的末尾,以来出来中间过程中会出现的异常。

    另外就是 handle 也可以用来处理异常。

    1.  public class ExceptionallyDemo {
    2.      public static void main(String[] args) throws Exception{
    3.          System.out.println("主线程开始");
    4.          CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    5.              int i= 1/0;
    6.              System.out.println("子线程执行中");
    7.              return i;
    8.         }).exceptionally(ex -> {
    9.              System.out.println(ex.getMessage());
    10.              return -1;
    11.         });
    12.          System.out.println(future.get());
    13.     }
    14.  }
    15.  ​
    16.  //主线程开始
    17.  //java.lang.ArithmeticException: / by zero
    18.  //-1
    19. 复制代码
    1.  public static void main(String[] args) throws Exception {
    2.      CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    3.          System.out.println("任务开始");
    4.          int i = 0 / 1;
    5.          return i;
    6.     }).handle((i, ex) -> {
    7.          System.out.println("进入 handle 方法");
    8.          if (ex != null) {
    9.              System.out.println("发生了异常,内容为:" + ex.getMessage());
    10.              return -1;
    11.         } else {
    12.              System.out.println("正常完成,内容为: " + i);
    13.              return i;
    14.         }
    15.     });
    16.  }
    17. 复制代码

    handle是有入参和带返回值的,入参是之前任务执行的结果。

    当然一切具体的使用还是要看业务场景啦

    3.6、结果合并

    thenCompose 合并两个有依赖关系的 CompletableFutures的执行结果,有入参有返回值。

    它的入参是第一个future和第一二两个的任何的返回结果。

    thenAcceptBoth则是会将两个任务的执行结果作为方法入参,传递到指定方法中,但无返回值

    runAfterBoth 则是不会把执行结果当做方法入参,也没有返回值。

    1.  package com.nzc;
    2.  ​
    3.  import java.util.WeakHashMap;
    4.  import java.util.concurrent.*;
    5.  ​
    6.  /**
    7.   * @description:
    8.   * @author: Yihui Wang
    9.   * @date: 2022082117:53
    10.   */
    11.  public class ThenCombineDemo {
    12.  ​
    13.      public static void main(String[] args) throws Exception {
    14.          test();
    15.     }
    16.  ​
    17.      private static Integer num = 10;
    18.      public static void test() throws Exception {
    19.          System.out.println("主线程开始");
    20.          //第一步加 10
    21.          CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    22.              System.out.println("第一个任务:让num+10;任务开始");
    23.              num += 10;
    24.              return num;
    25.         });
    26.          CompletableFuture<String > future1 = CompletableFuture.supplyAsync(() -> {
    27.              System.out.println("第二个任务:让num+1;任务开始");
    28.              return num + 1;
    29.              //它的入参是第一个future和第一二两个的任何的返回结果。
    30.         }).thenCombine(future,(w,s)->{
    31.              System.out.println("任务一的结果==>"+w);
    32.              System.out.println("任务二的结果==>"+s);
    33.              return "我是两个任务的合并"+(w+s);
    34.         });
    35.          System.out.println(future.get());
    36.          System.out.println(future1.get());
    37.     }
    38.  }
    39.  /**
    40.   * 主线程开始
    41.   * 第一个任务:让num+10;任务开始
    42.   * 第二个任务:让num+1;任务开始
    43.   * 任务一的结果==>21
    44.   * 任务二的结果==>20
    45.   * 20
    46.   * 我是两个任务的合并41
    47.   */
    48. 复制代码

    thenAcceptBoth

    1.  package com.nzc;
    2.  ​
    3.  import java.util.WeakHashMap;
    4.  import java.util.concurrent.*;
    5.  ​
    6.  /**
    7.   * @description:
    8.   * @author: Yihui Wang
    9.   * @date: 2022082117:53
    10.   */
    11.  public class ThenCombineDemo {
    12.  ​
    13.      public static void main(String[] args) throws Exception {
    14.          test();
    15.     }
    16.  ​
    17.      private static Integer num = 10;
    18.      public static void test() throws Exception {
    19.          System.out.println("主线程开始");
    20.          //第一步加 10
    21.          CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    22.              System.out.println("第一个任务:让num+10;任务开始");
    23.              num += 10;
    24.              return num;
    25.         });
    26.          CompletableFuture<Void > future1 = CompletableFuture.supplyAsync(() -> {
    27.              System.out.println("第二个任务:让num+1;任务开始");
    28.              return num + 1;
    29.         }).thenAcceptBoth(future,(w,s)->{
    30.              System.out.println("任务一的结果==>"+w);
    31.              System.out.println("任务二的结果==>"+s);
    32.              System.out.println( "我是两个任务的合并"+(w+s)+"但是我没有返回值");
    33.         });
    34.          System.out.println("任务一的结果==>"+future.get());
    35.          // 不采用链式写法,任务二实际上是有返回值,大家看业务场景写即可
    36.          System.out.println("任务二后接上thenAcceptBoth方法的结果==>"+future1.get());
    37.     }
    38.  }
    39.  /**
    40.   主线程开始
    41.   第一个任务:让num+10;任务开始
    42.   第二个任务:让num+1;任务开始
    43.   任务一的结果==>21
    44.   任务二的结果==>20
    45.   我是两个任务的合并41但是我没有返回值
    46.   任务一的结果==>20
    47.   任务二后接上thenAcceptBoth方法的结果==>null
    48.   */
    49. 复制代码

    runAfterBoth

    1.  public static  void test2(){
    2.      //第一步加 10
    3.      CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    4.          System.out.println("第一个任务:让num+10;任务开始");
    5.          num += 10;
    6.          return num;
    7.     });
    8.      CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    9.          System.out.println("第一个任务:让num+10;任务开始");
    10.          num += 10;
    11.          return num;
    12.     });
    13.      future2.runAfterBoth(future,()->{
    14.          System.out.println("不会把执行结果当做方法入参,也没有返回值");
    15.     });
    16.  ​
    17.  }
    18. 复制代码

    除了这些外,CompletableFuture还有我之前案例中就已经用到的allofanyOf

    3.7、allof 合并多个任务结果

    allOf: 一系列独立的 future任务,等其所有的任务执行完后做一些事情.

    1.  public class CompletableFutureDemo9 {
    2.  ​
    3.      private static Integer num = 10;
    4.  ​
    5.  ​
    6.      public static void main(String[] args) throws Exception{
    7.          System.out.println("主线程开始");
    8.          List<CompletableFuture> list = new ArrayList<>();
    9.  ​
    10.          CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
    11.              System.out.println("加 10 任务开始");
    12.              num += 10;
    13.              return num;
    14.         });
    15.          list.add(job1);
    16.  ​
    17.          CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
    18.              System.out.println("乘以 10 任务开始");
    19.              num = num * 10;
    20.              return num;
    21.         });
    22.          list.add(job2);
    23.  ​
    24.          CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
    25.              System.out.println("减以 10 任务开始");
    26.              num = num - 10;
    27.              return num;
    28.         });
    29.          list.add(job3);
    30.  ​
    31.          CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
    32.              System.out.println("除以 10 任务开始");
    33.              num = num / 10;
    34.              return num;
    35.         });
    36.          list.add(job4);
    37.  ​
    38.          //多任务合并
    39.          List<Integer> collect =
    40.              list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
    41.          System.out.println(collect);
    42.     }
    43.  ​
    44.  }
    45.  /**主线程开始
    46.   乘以 10 任务开始
    47.   加 10 任务开始
    48.   减以 10 任务开始
    49.   除以 10 任务开始
    50.   [110, 100, 100, 10]
    51.  */
    52. 复制代码

    allof的除了在合并结果时经常用到以外,像我们今天案例它也用到了它的get()方法,在那里使用的作用时,阻塞式的等待所有的任务结束,才结束方法的调用。

    3.8、anyof

    anyOf: 只要在多个 future里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束

    1.  public class CompletableFutureDemo10 {
    2.  ​
    3.      private static Integer num = 10;
    4.      /**
    5.       * 先对一个数加 10,然后取平方
    6.       * @param args
    7.       */
    8.      public static void main(String[] args) throws Exception{
    9.          System.out.println("主线程开始");
    10.  ​
    11.          CompletableFuture<Integer>[] futures = new CompletableFuture[4];
    12.          CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
    13.              try{
    14.                  Thread.sleep(5000);
    15.                  System.out.println("加 10 任务开始");
    16.                  num += 10;
    17.                  return num;
    18.             }catch (Exception e){
    19.                  return 0;
    20.             }
    21.         });
    22.          futures[0] = job1;
    23.          CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
    24.              try{
    25.                  Thread.sleep(2000);
    26.                  System.out.println("乘以 10 任务开始");
    27.                  num = num * 10;
    28.                  return num;
    29.             }catch (Exception e){
    30.                  return 1;
    31.             }
    32.         });
    33.          futures[1] = job2;
    34.          CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
    35.              try{
    36.                  Thread.sleep(3000);
    37.                  System.out.println("减以 10 任务开始");
    38.                  num = num - 10;
    39.                  return num;
    40.             }catch (Exception e){
    41.                  return 2;
    42.             }
    43.         });
    44.          futures[2] = job3;
    45.          CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
    46.              try{
    47.                  Thread.sleep(4000);
    48.                  System.out.println("除以 10 任务开始");
    49.                  num = num / 10;
    50.                  return num;
    51.             }catch (Exception e){
    52.                  return 3;
    53.             }
    54.         });
    55.          futures[3] = job4;
    56.          CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
    57.          System.out.println(future.get());
    58.     }
    59.  }
    60.  //主线程开始
    61.  //乘以 10 任务开始
    62.  //100
    63. 复制代码

    3.9、注意的小问题

    1、一般来讲,如果要使用线程的话,都应该是自定义线程,这点阿里Java开发规范中也有说到。

    2、自定义线程池的话,一定要把参数设置合理,这点没啥可说的,都得测,空谈都是大话,

    3、今天的案例,我在最后调用了 get()方法,一直阻塞到所有任务完成,所以你在编排的时候,一定要注意你需不需要任务的返回结果,不然很可能会产生一些数据方面问题。

    4、关于异常我写到后面心有些浮躁,写的不是非常精细。获取异常信息,future需要获取返回值,才能获取异常信息。

    后记

    今天最想说的就是 “温故而知新

    这方面的知识在去年,我其实已经学过一遍,但应用场景一少,你就会慢慢忘记它的存在。

    另外想要说明的是基础我觉得是十分重要的

  • 相关阅读:
    JVM监控及诊断工具-GUI篇
    visual code 下的node.js的hello world
    redis的持久化
    python subprocess.cal调用wkhtmltohtml中遇到的问题
    【TB作品】MSP430G2553,单片机,口袋板, 烘箱温度控制器
    AI帮60年代老技术解决面料数字化难题,王华民团队新方法只需3分钟数据采集复刻面料真实效果...
    哪个电气工程专家知道这个东西的用途?
    MacOS(M1)交叉编译安卓版本OpenCV(附带FFmpeg)
    Flet教程之 12 Stack 重叠组建图文混合 基础入门(教程含源码)
    【数学建模】MATLAB应用实战系列(105)-Logistic回归——二分类(附MATLAB代码)
  • 原文地址:https://blog.csdn.net/Trouvailless/article/details/126465601