• 多线程 CompletableFuture(2)


    一、CompletableFuture 常用方法
    3、对计算结果进行消费
    1、对比补充

    • thenRun(Runable runable)
      • 任务A执行完任务B,并且B不需要A的结果
    • thenAccept(Consumer action)
      • 任务A执行完执行任务B,B需要A的结果,但是任务B无返回值
    • thenApply(Function fn)
      • 任务A执行完执行任务B,B需要A的结果,同时任务B有返回值
        2、CompletableFuture和线程池说明
    1. 没有传入自定义线程池,都是用默认线程池ForkJoinPool
    2. 传入 一个自定义线程池
      • 如果你执行第一个任务的时候 ,传入了一个自定义线程池
      • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务共用一个线程池
      • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoinPool线程池
    3. 备注
      • 有可能处理太快,系统优化切换原则,直接使用main线程处理
        //线程池说明
        public class CompletableFutureWithThreadPool {
        public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        try {
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        // try {
        // Thread.sleep(20);
        // } catch (InterruptedException e) {
        // e.printStackTrace();
        // }
        System.out.println(“一号任务” + Thread.currentThread().getName());
        return “abcd”;
        },threadPool).thenRun(() -> {
        try {
        Thread.sleep(10);
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        System.out.println(“二号任务” + Thread.currentThread().getName());
        }).thenRun(() -> {
        try {
        Thread.sleep(10);
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        System.out.println(“三号任务” + Thread.currentThread().getName());
        }).thenRun(() -> {
        try {
        Thread.sleep(10);
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        System.out.println(“四号任务” + Thread.currentThread().getName());
        }).thenRun(() -> {
        try {
        Thread.sleep(10);
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        System.out.println(“五号任务” + Thread.currentThread().getName());
        });
        future.get(2, TimeUnit.SECONDS);
        } catch (Exception e) {
        e.printStackTrace();
        } finally {
        threadPool.shutdown();
        }
        }
        }
        源码分析为什么会有区别
        public CompletableFuture thenRun(Runnable action) {
        return uniRunStage(null, action);
        }
        public CompletableFuture thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
        }
        private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);
        /**
        • Default executor – ForkJoinPool.commonPool() unless it cannot
        • support parallelism.
          很明显这里创建了新的线程池返回出去了 ThreadPerTaskExecutor
          */
          private static final Executor asyncPool = useCommonPool ?
          ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
          4、对计算速度进行选用
          CompletableFuture applyToEither( CompletionStage other, Function fn)
          选取两个线程中最快的线程,作为应用
          public class CompletableFutureFastDemo {
          public static void main(String[] args) {
          CompletableFuture futureA = CompletableFuture.supplyAsync(() -> {
          try {
          TimeUnit.SECONDS.sleep(2);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          return “resultA”;
          });
          CompletableFuture futureB = CompletableFuture.supplyAsync(() -> {
          try {
          TimeUnit.SECONDS.sleep(1);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          return “resultB”;
          });
          //applyToEither选取一个速度最快的作为应用
          CompletableFuture future = futureA.applyToEither(futureB, f -> {
          try {
          TimeUnit.SECONDS.sleep(1);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          return “applyC” + f;
          });
          System.out.println(Thread.currentThread().getName() + “------result:” + future.join());
          }
          }
          5、对计算结果合并
          thenCombine()两个任务CompletionStage任务都完成后,最终指向两个任务的结果一起交给thenCombine来处理
          public class CompletableFutureCombineDemo {
          public static void main(String[] args) {
          CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
          try {
          Thread.sleep(10);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          System.out.println(“任务一执行”);
          return 10;
          });
          CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
          try {
          Thread.sleep(10);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          System.out.println(“任务二执行”);
          return 20;
          });
          CompletableFuture future = future1.thenCombine(future2, (x, y) -> {
          System.out.println(“开始合并”);
          return x + y;
          });
          System.out.println(future.join());
          }
          }
  • 相关阅读:
    从两个易错的笔试题深入理解自增运算符
    终端准入控制系统,保障企业内网安全的关键防线
    【不是问题的问题】为什么复位中断服务程序里面直接调用的main函数,难道所有程序都在复位中断里面执行的?
    React技巧之导入组件
    Redis的RDB持久化配置以及数据恢复
    alibaba国际版阿里巴巴API接入说明(阿里巴巴商品详情+关键词搜索商品列表)
    Spring Cloud之负载均衡组件Ribbon原理分析
    梯度爆炸问题和梯度消失问题的一种解释和相应的解决方案
    [2023年度回顾总结]凡是过往,皆为序章
    一个文件的开源项目,开启你的开源之旅
  • 原文地址:https://blog.csdn.net/xiaosao_/article/details/127922156