• Java并发-CompletableFuture的详解


    目录

    1 前言

     2 常用方法

    3 测试

    3.1 runAsync:无返回值 和 SupplyAsync:有返回值

     3.2 串行执行

    3.3 任务3等待等任务1和任务2都执行完毕后执行

    3. 4 任务3等待等任务1或者任务2执行完毕后执行

    3.5 handleAsync

    3.6 多任务执行


    1 前言

    CompletableFuture 是对 Future 的扩展和增强。CompletableFuture 实现了Future和CompletionStage接口,其中最重要的是CompletionStage,它定义了CompletableFuture 实现任务编排的一些基本方法。只能说非常好用。

    其继承结构如下图所示,在IDEA中使用ctrl + alt + U 实现

     CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的,默认线程池是ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池

     2 常用方法

    主要有同步和异步两大类,这里只讲异步方法。同步方法可以类比。

    创建类型的方法

    • runAsync:无返回值
    • SupplyAsync:有返回值

    中间处理类型的方法

    • 任务与任务之间串行执行
      • thenRunAsync:不能获取上一次的返回值,本次没有返回值。
      • thenAccpetAsync:获取上一次的返回值,但是本次没有返回值。
      • thenApplyAsync:获取上一次的返回值,本次返回值。
    • 任务3等待等任务1和任务2都执行完毕后执行
      • runAfterBothAsync:不能获取任务1和任务2的返回值,本次没有返回值。
      • thenAcceptBothAsync:获取任务1和任务2的返回值,但是本次没有返回值。
      • thenCombineAsync:获取任务1和任务2的返回值,本次也有返回值。
    • 任务3等待任务1或者任务2其中一个执行完毕、
      • runAfterEitherAsync:不能获取任务1或任务2的返回值,本次没有返回值
      • acceptEitherAsync:获取任务1或任务2的返回值,本次没有返回值
      • applyToEither:获取任务1或任务2的返回值,本次有返回值
    • 多任务
      • allof:当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
      • anyof:当任何一个给定的CompletableFuture完成时,返回一个新的CompletableFuture

    结果处理类型的方法

    • whenCompleteAsync:获取结果和异常,但是没有返回值
    • exceptionally:在whenCompleteAsync后时候,获取异常并且有返回值
    • handleAsync:相当于whenCompleteAsync和exceptionally的结合体。

    3 测试

    3.1 runAsync:无返回值 和 SupplyAsync:有返回值

    • runAsync 相当于开启一个异步线程执行任务,主线程不等待结果返回。
    • supplyAsync,开启一个异步线程执行任务,主线程要等待结果返回。
    1. package cn.itcast.n6.c3;
    2. import java.util.concurrent.*;
    3. /**
    4. * @author : msf
    5. * @date : 2022/12/4
    6. * completable 讲解。
    7. */
    8. public class CompletableFutureD1 {
    9. static ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10. 5,50,10, TimeUnit.SECONDS,
    11. new LinkedBlockingQueue<>(100),Executors.defaultThreadFactory()
    12. ,new ThreadPoolExecutor.AbortPolicy()
    13. );
    14. public static void main(String[] args) throws ExecutionException, InterruptedException {
    15. System.out.println("主线程 start ...");
    16. CompletableFuture future = CompletableFuture.runAsync(()->{
    17. System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
    18. },executor);
    19. CompletableFuture futureInt = CompletableFuture.supplyAsync(() -> {
    20. System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
    21. return 10;
    22. }, executor);
    23. System.out.println(futureInt.get());
    24. System.out.println("主线程 end ...");
    25. }
    26. }

    上述代码执行结果:

     3.2 串行执行

    • thenRunAsync:不能获取上一次的返回值,本次没有返回值。
    • thenAccpetAsync:获取上一次的返回值,但是本次没有返回值。
    • thenApplyAsync:获取上一次的返回值,本次返回值

    1. package cn.itcast.n6.c3;
    2. import java.util.concurrent.*;
    3. /**
    4. * @author : msf
    5. * @date : 2022/12/4
    6. * completable 讲解--串行编排
    7. */
    8. public class CompletableFutureD4 {
    9. static ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10. 5, 50, 10, TimeUnit.SECONDS,
    11. new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
    12. , new ThreadPoolExecutor.AbortPolicy()
    13. );
    14. public static void main(String[] args) throws ExecutionException, InterruptedException {
    15. System.out.println("主线程 start ...");
    16. CompletableFuture future = CompletableFuture.runAsync(() -> {
    17. System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
    18. try {
    19. Thread.sleep(1000);
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. }, executor).thenRunAsync(()->{
    24. // 等任务1 执行完毕后,任务2执行
    25. System.out.println("任务2执行了" +Thread.currentThread().getName());
    26. },executor);
    27. CompletableFuture.supplyAsync(() -> {
    28. System.out.println("任务3子线程执行..." + Thread.currentThread().getName());
    29. try {
    30. Thread.sleep(1000);
    31. } catch (InterruptedException e) {
    32. e.printStackTrace();
    33. }
    34. return 100;
    35. }, executor).thenAcceptAsync((t)->{
    36. System.out.println("任务4子线程执行..." + Thread.currentThread().getName());
    37. System.out.println("t = " + t);
    38. },executor);
    39. CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    40. System.out.println("任务5子线程执行..." + Thread.currentThread().getName());
    41. try {
    42. Thread.sleep(1000);
    43. } catch (InterruptedException e) {
    44. e.printStackTrace();
    45. }
    46. return 100;
    47. }, executor).thenApplyAsync((t) -> {
    48. System.out.println("任务6子线程执行..." + Thread.currentThread().getName());
    49. System.out.println("t = " + t);
    50. return 120;
    51. }, executor);
    52. System.out.println("主线程 end ..." + future1.get());
    53. }
    54. }

    上述代码执行结果:

    3.3 任务3等待等任务1和任务2都执行完毕后执行

    开启一个任务1,然后开启任务2,我们需要的是,当任务1和2都执行结束后任务3再启动。

    场景:例如两个人从不同地方去吃饭,都有两个人都到了然后开始点餐。

    1. package cn.itcast.n6.c3;
    2. import java.util.concurrent.*;
    3. /**
    4. * @author : msf
    5. * @date : 2022/12/4
    6. * completable 讲解-- 两个一起完成,或者两者其中之一完成
    7. */
    8. public class CompletableFutureD5 {
    9. static ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10. 5, 50, 10, TimeUnit.SECONDS,
    11. new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
    12. , new ThreadPoolExecutor.AbortPolicy()
    13. );
    14. public static void main(String[] args) throws ExecutionException, InterruptedException {
    15. System.out.println("主线程 start ...");
    16. CompletableFuture future = CompletableFuture.runAsync(() -> {
    17. System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
    18. try {
    19. Thread.sleep(500);
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. }, executor);
    24. CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    25. System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
    26. try {
    27. Thread.sleep(600);
    28. } catch (InterruptedException e) {
    29. e.printStackTrace();
    30. }
    31. return 100;
    32. }, executor);
    33. future.runAfterBothAsync(future1, () -> {
    34. System.out.println("任务3..." + Thread.currentThread().getName());
    35. try {
    36. Thread.sleep(1000);
    37. } catch (InterruptedException e) {
    38. e.printStackTrace();
    39. }
    40. System.out.println("任务3.... end " + Thread.currentThread().getName());
    41. }, executor);
    42. System.out.println("主线程 end ..." + future1.get());
    43. }
    44. }

    上述代码执行结果: 

    3. 4 任务3等待等任务1或者任务2执行完毕后执行

    场景:当两个人都出去吃饭,其中一个人到了可以先点餐。

    1. package cn.itcast.n6.c3;
    2. import java.util.concurrent.*;
    3. /**
    4. * @author : msf
    5. * @date : 2022/12/4
    6. * completable 讲解-- 两个一起完成,或者两者其中之一完成
    7. */
    8. public class CompletableFutureD6 {
    9. static ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10. 5, 50, 10, TimeUnit.SECONDS,
    11. new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
    12. , new ThreadPoolExecutor.AbortPolicy()
    13. );
    14. public static void main(String[] args) throws ExecutionException, InterruptedException {
    15. System.out.println("主线程 start ...");
    16. CompletableFuture future = CompletableFuture.runAsync(() -> {
    17. System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
    18. try {
    19. Thread.sleep(500);
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. System.out.println("任务1子线程执行...end " + Thread.currentThread().getName());
    24. }, executor);
    25. CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    26. System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
    27. try {
    28. Thread.sleep(600);
    29. } catch (InterruptedException e) {
    30. e.printStackTrace();
    31. }
    32. System.out.println("任务2子线程执行...end " + Thread.currentThread().getName());
    33. return 100;
    34. }, executor);
    35. future.runAfterEitherAsync(future1, () -> {
    36. System.out.println("任务3..." + Thread.currentThread().getName());
    37. }, executor);
    38. System.out.println("主线程 end ..." + future1.get());
    39. }
    40. }

    上述代码执行如下: 

    3.5 handleAsync

    handleAsync:相当于whenCompleteAsync和exceptionally的结合体,可以收集结果和异常信息然后进行下一步处理。

    1. package cn.itcast.n6.c3;
    2. import java.util.concurrent.*;
    3. /**
    4. * @author : msf
    5. * @date : 2022/12/4
    6. * completable 讲解。
    7. */
    8. public class CompletableFutureD3 {
    9. static ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10. 5, 50, 10, TimeUnit.SECONDS,
    11. new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
    12. , new ThreadPoolExecutor.AbortPolicy()
    13. );
    14. public static void main(String[] args) throws ExecutionException, InterruptedException {
    15. System.out.println("主线程 start ...");
    16. CompletableFuture future = CompletableFuture.runAsync(() -> {
    17. System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
    18. }, executor).handleAsync((res, exec) -> {
    19. System.out.println("res1 = " + res);
    20. System.out.println("exec1 = " + exec);
    21. return null;
    22. });
    23. CompletableFuture futureInt = CompletableFuture.supplyAsync(() -> {
    24. System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
    25. int i = 10 / 0;
    26. return 10;
    27. }, executor).handleAsync((res, exec) -> {
    28. System.out.println("res2 = " + res);
    29. System.out.println("exec2 = " + exec);
    30. return 55;
    31. });
    32. System.out.println(futureInt.get());
    33. System.out.println("主线程 end ...");
    34. }
    35. }

    上述代码执行结果:

    3.6 多任务执行

    模拟:三人吃饭,只要一个人到了就可以点餐,但是只有所有人到了,并且厨子把菜做好了才能上菜;

    1. package cn.itcast.n6.c3;
    2. import java.util.concurrent.CompletableFuture;
    3. import java.util.concurrent.ExecutionException;
    4. /**
    5. * @author : msf
    6. * @date : 2022/12/5
    7. * 模拟三人点餐,只要有一个人到了就可以点餐。然后交给厨子进行做饭
    8. */
    9. public class Test {
    10. public static void main(String[] args) throws ExecutionException, InterruptedException {
    11. long begin = System.currentTimeMillis();
    12. CompletableFuture client1 = CompletableFuture.runAsync(() -> {
    13. System.out.println("客户1开始出发了.....1小时后到");
    14. try {
    15. Thread.sleep(1000);
    16. } catch (InterruptedException e) {
    17. e.printStackTrace();
    18. }
    19. System.out.println("客户1到了");
    20. });
    21. CompletableFuture client2 = CompletableFuture.runAsync(()->{
    22. System.out.println("客户2开始出发了.....半小时后到");
    23. try {
    24. Thread.sleep(500);
    25. } catch (InterruptedException e) {
    26. e.printStackTrace();
    27. }
    28. System.out.println("客户2到了");
    29. });
    30. CompletableFuture client3 = CompletableFuture.runAsync(()->{
    31. System.out.println("客户3开始出发了.....1.5小时后到");
    32. try {
    33. Thread.sleep(1500);
    34. } catch (InterruptedException e) {
    35. e.printStackTrace();
    36. }
    37. System.out.println("客户3到了");
    38. });
    39. CompletableFuture result = CompletableFuture.anyOf(client1, client2, client3);
    40. System.out.println("已经有客户到了开始点餐" +result.get());
    41. CompletableFuture cook = CompletableFuture.runAsync(() -> {
    42. System.out.println("厨师开始做饭");
    43. try {
    44. Thread.sleep(2000);
    45. } catch (InterruptedException e) {
    46. e.printStackTrace();
    47. }
    48. System.out.println("厨师做完饭了");
    49. });
    50. CompletableFuture reuslt1 = CompletableFuture.allOf(client1, client2, client3, cook);
    51. reuslt1.join();
    52. System.out.println("人全部来齐了,厨师也做好饭了,上菜!");
    53. long end = System.currentTimeMillis();
    54. System.out.println("总共时间盲猜2.5秒, 真实时间" + (end - begin) );
    55. }
    56. }
    57. 相关阅读:
      经济师十大专业通过人数分析!选专业有谱了!
      2023高教社杯 国赛数学建模D题思路 - 圈养湖羊的空间利用率
      c# wpf template ItemTemplate 简单试验
      VLAN和VLAN间路由
      【LeetCode刷题】1两数之和
      C专家编程 第11章 你懂得C,所以C++不再话下 11.7 如何调用成员函数
      C#每天复习一个重要小知识day2:有参与无参构造函数
      【JavaScript】掌握BOM浏览器对象模型
      Python Flask框架 入门详解与进阶
      基于TCP传输的网络编程异常处理
    58. 原文地址:https://blog.csdn.net/abc123mma/article/details/128183712