• JUC并发编程-CompletableFuture


    CompletableFuture基本介绍

    • 阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture

    核心的四个静态方法(分为两组)

    利用核心的四个静态方法创建一个异步操作 | 不建议用new

    关键就是 |有没有返回值|是否用了线程池|

    参数说明:

    没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。

    如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。

    1.runAsync无返回值

    1. public class CompletableFutureBuildDemo {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
    4. System.out.println(Thread.currentThread().getName());
    5. //停顿几秒线程
    6. try {
    7. TimeUnit.SECONDS.sleep(1);
    8. } catch (InterruptedException e) {
    9. e.printStackTrace();
    10. }
    11. });
    12. System.out.println(voidCompletableFuture.get());
    13. }
    14. }
    15. //ForkJoinPool.commonPool-worker-9 //默认的线程池
    16. //null --- 没有返回值

    2.runAsync+线程池

    1. public class CompletableFutureBuildDemo {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池
    4. CompletableFuture voidCompletableFuture = CompletableFuture.runAsync(() -> {
    5. System.out.println(Thread.currentThread().getName());
    6. //停顿几秒线程
    7. try {
    8. TimeUnit.SECONDS.sleep(1);
    9. } catch (InterruptedException e) {
    10. e.printStackTrace();
    11. }
    12. },executorService);
    13. System.out.println(voidCompletableFuture.get());
    14. executorService.shutdown();
    15. }
    16. }
    17. //pool-1-thread-1 ----指定的线程池
    18. //null ----没有返回值

    supplyAsync有返回值

    3 supplyAsync

    1. public class CompletableFutureBuildDemo {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池
    4. CompletableFuture objectCompletableFuture = CompletableFuture.supplyAsync(()->{
    5. System.out.println(Thread.currentThread().getName());
    6. try {
    7. TimeUnit.SECONDS.sleep(1);
    8. } catch (InterruptedException e) {
    9. e.printStackTrace();
    10. }
    11. return "helllo supplyasync";
    12. });
    13. System.out.println(objectCompletableFuture.get());
    14. }
    15. }
    16. //ForkJoinPool.commonPool-worker-9---------默认的线程池
    17. //helllo supplyasync-------------supplyasync有返回值了

    4 supplyAsync+线程池

    1. public class CompletableFutureBuildDemo {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池
    4. CompletableFuture objectCompletableFuture = CompletableFuture.supplyAsync(()->{
    5. System.out.println(Thread.currentThread().getName());
    6. try {
    7. TimeUnit.SECONDS.sleep(1);
    8. } catch (InterruptedException e) {
    9. e.printStackTrace();
    10. }
    11. return "helllo supplyasync";
    12. },executorService);
    13. System.out.println(objectCompletableFuture.get());
    14. executorService.shutdown();
    15. }
    16. }
    17. //pool-1-thread-1
    18. //helllo supplyasync-------------supplyasync有返回值了

     减少阻塞和轮询whenComplete

     CompletableFuture通过whenComplete来减少阻塞和轮询(自动回调)

    1. public class CompletableFutureUseDemo {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. CompletableFuture.supplyAsync(()->{
    4. System.out.println(Thread.currentThread().getName()+"--------副线程come in");
    5. int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
    6. try {
    7. TimeUnit.SECONDS.sleep(1);
    8. } catch (InterruptedException e) {
    9. e.printStackTrace();
    10. }
    11. return result;
    12. }).whenComplete((v,e) -> {//没有异常,v是值,e是异常
    13. if(e == null){
    14. System.out.println("------------------计算完成,更新系统updataValue"+v);
    15. }
    16. }).exceptionally(e->{//有异常的情况
    17. e.printStackTrace();
    18. System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
    19. return null;
    20. });
    21. //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
    22. //ForkJoinPool 类似于守护线程mian线程结束的太快,CompletableFuture还没执行完也会结束
    23. System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
    24. try {
    25. TimeUnit.SECONDS.sleep(3);
    26. } catch (InterruptedException e) {
    27. e.printStackTrace();
    28. }
    29. }
    30. }
    31. //ForkJoinPool.commonPool-worker-9--------副线程come in(这里用的是默认的ForkJoinPool)
    32. //main线程先去忙其他任务
    33. //------------------计算完成,更新系统updataValue3

    假如换用自定义线程池/异常情况的展示,设置一个异常 int i = 10 / 0 ;

    1. public class CompletableFutureUseDemo
    2. {
    3. public static void main(String[] args) throws ExecutionException, InterruptedException
    4. {
    5. ExecutorService threadPool = Executors.newFixedThreadPool(3);
    6. try
    7. {
    8. CompletableFuture.supplyAsync(() -> {
    9. System.out.println(Thread.currentThread().getName() + "----come in");
    10. int result = ThreadLocalRandom.current().nextInt(10);
    11. try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
    12. System.out.println("-----1秒钟后出结果:" + result);
    13. if(result > 2)
    14. {
    15. int i=10/0;
    16. }
    17. return result;
    18. },threadPool).whenComplete((v,e) -> {
    19. if (e == null) {
    20. System.out.println("-----计算完成,更新系统UpdateValue:"+v);
    21. }
    22. }).exceptionally(e -> {
    23. e.printStackTrace();
    24. System.out.println("异常情况:"+e.getCause()+"\t"+e.getMessage());
    25. return null;
    26. });
    27. System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");
    28. }catch (Exception e){
    29. e.printStackTrace();
    30. }finally {
    31. threadPool.shutdown();
    32. }
    33. //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
    34. //try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
    35. }
    36. }

    join和get对比

    • 功能几乎一样,区别在于编码时是否需要抛出异常

      • get()方法需要抛出异常

      • join()方法不需要抛出异常

    1. public class Chain {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常
    3. CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
    4. return "hello 12345";
    5. });
    6. System.out.println(completableFuture.get());
    7. }
    8. }
    9. public class Chain {
    10. public static void main(String[] args) {//不需要抛出异常
    11. CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
    12. return "hello 12345";
    13. });
    14. System.out.println(completableFuture.join());
    15. }
    16. }

    CompletableFuture常用API

    getNow调用的时候如果计算完了,就拿取这个计算完的值;否则就拿备胎值
    1.获得结果和触发计算
         获取结果

       public T get() 不见不散,容易阻塞

       public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常

       public T join() 类似于get(),区别在于是否需要抛出异常

       public T getNow(T valueIfAbsent) 没有计算完成的情况下,给一个替代结果

    立即获取结果不阻塞

      计算完,返回计算完成后的结果

      没算完,返回设定的valueAbsent(直接返回了备胎值xxx)

    主动触发计算

    public boolean complete(T value) 是否立即打断get()方法返回括号值

    (执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc)
     

    1. public class CompletableFutureAPIDemo {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. CompletableFuture uCompletableFuture = CompletableFuture.supplyAsync(() -> {
    4. try {
    5. TimeUnit.SECONDS.sleep(2);//执行需要2秒
    6. } catch (InterruptedException e) {
    7. e.printStackTrace();
    8. }
    9. return "abc";
    10. });
    11. try {
    12. TimeUnit.SECONDS.sleep(1);//等待需要1秒
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. // System.out.println(uCompletableFuture.getNow("xxx"));//执2-等1 返回xxx
    17. //执2-等1 返回true+备胎值1111111
    18. // 反之 则是 false 输出 abc
    19. System.out.println(uCompletableFuture.complete("1111111")+"\t"+uCompletableFuture.get());
    20. }
    21. }

    对计算结果进行处理

     thenApply 

        计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。

    1. public class CompletableFutureDemo2
    2. {
    3. public static void main(String[] args) throws ExecutionException, InterruptedException
    4. {
    5. ExecutorService threadPool = Executors.newFixedThreadPool(3);
    6. //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
    7. CompletableFuture.supplyAsync(() -> {
    8. //暂停几秒钟线程
    9. try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
    10. System.out.println("111");
    11. return 1024;
    12. },threadPool ).thenApply(f -> {
    13. System.out.println("222");
    14. return f + 1;
    15. }).thenApply(f -> {
    16. //int age = 10/0; // 异常情况:那步出错就停在那步。
    17. System.out.println("333");
    18. return f + 1;
    19. }).whenCompleteAsync((v,e) -> {
    20. System.out.println("*****v: "+v);
    21. }).exceptionally(e -> {
    22. e.printStackTrace();
    23. return null;
    24. });
    25. System.out.println("-----主线程结束,END");
    26. threadPool .shutdown();
    27. }
    28. }
    29. //-----正常情况
    30. //111
    31. //222
    32. //333
    33. //----计算结果: 6
    34. //-----异常情况
    35. //111
    36. //异常.....

    handle 

     类似于thenApply,但是有异常的话仍然可以往下走一步。

    1. public class CompletableFutureDemo2
    2. {
    3. public static void main(String[] args) throws ExecutionException, InterruptedException
    4. {
    5. //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
    6. // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
    7. CompletableFuture.supplyAsync(() -> {
    8. //暂停几秒钟线程
    9. try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
    10. System.out.println("111");
    11. return 1024;
    12. }).handle((f,e) -> {
    13. int age = 10/0;//异常语句
    14. System.out.println("222");
    15. return f + 1;
    16. }).handle((f,e) -> {
    17. System.out.println("333");
    18. return f + 1;
    19. }).whenCompleteAsync((v,e) -> {
    20. System.out.println("*****v: "+v);
    21. }).exceptionally(e -> {
    22. e.printStackTrace();
    23. return null;
    24. });
    25. System.out.println("-----主线程结束,END");
    26. // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
    27. try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    28. }
    29. }
    30. //-----异常情况
    31. //111
    32. //333
    33. //异常,可以看到多走了一步333

    对计算结果进行消费

     接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口

    thenAccept

    不需要return 

    1. public static void main(String[] args) throws ExecutionException, InterruptedException
    2. {
    3. CompletableFuture.supplyAsync(() -> {
    4. return 1;
    5. }).thenApply(f -> {
    6. return f + 2;
    7. }).thenApply(f -> {
    8. return f + 3;
    9. }).thenAccept(r -> System.out.println(r));
    10. }
    11. //6
    12. //消费一下,直接得到6

    补充:Code之任务之间的顺序执行

    1.thenRun

       thenRun(Runnable runnable)

       任务A执行完执行B,并且B不需要A的结果

    2.thenAccept

       thenAccept(Consumer action)

       任务A执行完执行B,B需要A的结果,但是任务B无返回值

    3.thenApply

        thenApply(Function fn)

       任务A执行完执行B,B需要A的结果,同时任务B有返回值

    1. System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
    2. //null
    3. System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
    4. //resultA打印出来的 null因为没有返回值
    5. System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
    6. //resultAresultB 返回值

    CompleteFuture和线程池说明(非常重要)


    上面的几个方法都有普通版本和后面加Async的版本

    以 thenRun 和 thenRunAsync 为例,有什么区别?

    先看结论

          1. 没有传入自定义线程池,都用默认线程池ForkJoinPool

          2. 传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池

                1.调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池

                2.调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

          3.也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)

    2-1

    1. public class CompletableFutureAPIDemo {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. ExecutorService threadPool = Executors.newFixedThreadPool(5);
    4. CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{
    5. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    6. System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
    7. return "abcd";
    8. },threadPool).thenRun(()->{
    9. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    10. System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
    11. }).thenRun(()->{
    12. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    13. System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
    14. }).thenRun(()->{
    15. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    16. System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
    17. });
    18. }
    19. }
    20. //1号任务 pool-1-thread-1
    21. //2号任务 pool-1-thread-1
    22. //3号任务 pool-1-thread-1
    23. //4号任务 pool-1-thread-1

    2-2

    1. public class CompletableFutureAPIDemo {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. ExecutorService threadPool = Executors.newFixedThreadPool(5);
    4. CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{
    5. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    6. System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
    7. return "abcd";
    8. },threadPool).thenRunAsync(()->{
    9. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    10. System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
    11. }).thenRun(()->{
    12. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    13. System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
    14. }).thenRun(()->{
    15. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    16. System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
    17. });
    18. }
    19. }
    20. //1号任务 pool-1-thread-1
    21. //2号任务 ForkJoinPool.commonPool-worker-9---这里另起炉灶重新调用了默认的ForkJoinPool
    22. //3号任务 ForkJoinPool.commonPool-worker-9
    23. //4号任务 ForkJoinPool.commonPool-worker-9

    3

    1. public class CompletableFutureAPIDemo {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. ExecutorService threadPool = Executors.newFixedThreadPool(5);
    4. CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{
    5. // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    6. System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
    7. return "abcd";
    8. },threadPool).thenRun(()->{
    9. // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    10. System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
    11. }).thenRun(()->{
    12. // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    13. System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
    14. }).thenRun(()->{
    15. //try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    16. System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
    17. });
    18. }
    19. }
    20. //1号任务 1号任务 pool-1-thread-1
    21. //2号任务 main
    22. //3号任务 main
    23. //4号任务 main

    对计算速度进行选用

    • applyToEither方法,那个快用哪个
    1. public class CompletableFutureDemo2 {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException
    3. {
    4. CompletableFuture<String> play1 = CompletableFuture.supplyAsync(() -> {
    5. System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
    6. //暂停几秒钟线程
    7. try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    8. return "play1 ";
    9. });
    10. CompletableFuture<String> play2 = CompletableFuture.supplyAsync(() -> {
    11. System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
    12. try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
    13. return "play2";
    14. });
    15. CompletableFuture<String> thenCombineResult = play1.applyToEither(play2, f -> {//对计算速度进行选用
    16. return f + " is winner";
    17. });
    18. System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
    19. }
    20. }
    21. //ForkJoinPool.commonPool-worker-9 ---come in
    22. //ForkJoinPool.commonPool-worker-2 ---come in
    23. //main play2 is winner

    对计算结果进行合并

    • thenCombine 合并

      • 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理

      • 先完成的先等着,等待其它分支任务

    1. public class CompletableFutureDemo2
    2. {
    3. public static void main(String[] args) throws ExecutionException, InterruptedException
    4. {
    5. CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
    6. System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
    7. return 10;
    8. });
    9. CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
    10. System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
    11. return 20;
    12. });
    13. CompletableFuture thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
    14. System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
    15. return x + y;
    16. });
    17. System.out.println(thenCombineResult.get());
    18. }
    19. }
    20. //30

    • 合并版本

    1. public class CompletableFutureDemo2
    2. {
    3. public static void main(String[] args) throws ExecutionException, InterruptedException
    4. {
    5. CompletableFuture thenCombineResult = CompletableFuture.supplyAsync(() -> {
    6. System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
    7. return 10;
    8. }).thenCombine(CompletableFuture.supplyAsync(() -> {
    9. System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
    10. return 20;
    11. }), (x,y) -> {
    12. System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
    13. return x + y;
    14. }).thenCombine(CompletableFuture.supplyAsync(() -> {
    15. System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
    16. return 30;
    17. }),(a,b) -> {
    18. System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
    19. return a + b;
    20. });
    21. System.out.println("-----主线程结束,END");
    22. System.out.println(thenCombineResult.get());
    23. }
    24. }

  • 相关阅读:
    包管理工具
    vue3 快速入门系列 —— 组件通信
    神器推荐丨不可错过的10个3D模型素材库
    go语言Array 与 Slice
    IDEA 不推荐使用 @Autowired 注解的原因
    搜索二叉树实现(非递归版本)
    【短文】在Linux中怎么查看文件信息
    第九章 动态规划 part16(编辑距离专题)583. 两个字符串的删除操作 72. 编辑距离 编辑距离总结篇
    足底筋膜炎最好的恢复办法
    MyBatis配置文件(mybatis-config.xml)简介说明
  • 原文地址:https://blog.csdn.net/yuzheh521/article/details/126432178