• CompletableFuture详解


    Java 8 , 新增加了一个包含 50 个方法左右的类 : CompletableFuture, 实现了 Future 接口,提供了非常强大的Future 的扩展功能,可以简化异步编程的复杂性,提供了函数式编程的能力。可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。
    1.创建异步对象
    runAsync方法无返回值,supplyAsync方法有返回值
    1. import java.util.concurrent.CompletableFuture;
    2. import java.util.concurrent.ExecutionException;
    3. import java.util.concurrent.ExecutorService;
    4. import java.util.concurrent.Executors;
    5. public class CompletableFutureTest {
    6. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
    7. public static void main(String[] args) {
    8. System.out.println("Main method is running");
    9. /**
    10. * public static CompletableFuture runAsync(Runnable runnable) {
    11. * return asyncRunStage(asyncPool, runnable);
    12. * }
    13. * public static CompletableFuture runAsync(Runnable runnable,
    14. * Executor executor) {
    15. * return asyncRunStage(screenExecutor(executor), runnable);
    16. * }
    17. */
    18. // CompletableFuture future = CompletableFuture.runAsync(() -> {
    19. // System.out.println("Current thread is " + Thread.currentThread().getId());
    20. // int i = 8 / 2;
    21. // System.out.println("Running result is " + i);
    22. // }, executorService);
    23. /**
    24. *public static CompletableFuture supplyAsync(Supplier supplier) {
    25. * return asyncSupplyStage(asyncPool, supplier);
    26. *}
    27. *public static CompletableFuture supplyAsync(Supplier supplier,
    28. * Executor executor) {
    29. * return asyncSupplyStage(screenExecutor(executor), supplier);
    30. *}
    31. */
    32. CompletableFuture future = CompletableFuture.supplyAsync(()->{
    33. System.out.println("Current thread is " + Thread.currentThread().getId());
    34. int i = 8 / 2;
    35. System.out.println("Running result is " + i);
    36. return i;
    37. },executorService);
    38. try {
    39. Integer i = future.get();
    40. System.out.println("i is "+i);
    41. } catch (InterruptedException | ExecutionException e) {
    42. e.printStackTrace();
    43. }
    44. System.out.println("Main method is end");
    45. }
    46. }

    2.提供回调方法

    1. CompletableFuture future = CompletableFuture.supplyAsync(()->{
    2. System.out.println("Current thread is " + Thread.currentThread().getId());
    3. int i = 8 / 2;
    4. System.out.println("Running result is " + i);
    5. return i;
    6. },executorService).whenComplete((result,exception)->{
    7. //虽然能得到异常信息,但不能修改返回结果
    8. System.out.println("The result is "+result+" ,the exception is "+exception);
    9. }).exceptionally(throwable -> {
    10. //可以感知异常当出现错误时,默认返回-1
    11. return -1;
    12. });
    13. try {
    14. Integer i = future.get();
    15. System.out.println("i is "+i);
    16. } catch (InterruptedException | ExecutionException e) {
    17. e.printStackTrace();
    18. }
    可得到结果:
    The result is 4 ,the exception is null
    i is 4
    把8/2改成8/0,则可得结果:
    The result is null ,the exception is java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    i is -1

    3.handle方法

    1. //方法执行完后的处理
    2. CompletableFuture future = CompletableFuture.supplyAsync(()->{
    3. System.out.println("Current thread is " + Thread.currentThread().getId());
    4. int i = 8 / 2;
    5. System.out.println("Running result is " + i);
    6. return i;
    7. },executorService).handle((result,throwable)->{
    8. if(result!=null){
    9. return result;
    10. }
    11. if(throwable!=null){
    12. return 0;
    13. }
    14. return -1;
    15. });
    16. try {
    17. Integer i = future.get();
    18. System.out.println("i is "+i);
    19. } catch (InterruptedException | ExecutionException e) {
    20. e.printStackTrace();
    21. }

    得到结果:

    Running result is 4
    i is 4

    将8/2改成8/0,得到结果

    i is 0

    4.线程串行化

    thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前
    任务的返回值。
    thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
    thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun ,只是处理完任务后,执行
    thenRun 的后续操作
    带有 Async 默认是异步执行的。同之前。
    以上都要前置任务成功完成。
    Function
    T :上一个任务返回结果的类型 U :当前任务的返回值类型
    1. //不能获取到上一步的执行结果
    2. CompletableFuture future1 = CompletableFuture.supplyAsync(()->{
    3. System.out.println("Current thread is " + Thread.currentThread().getId());
    4. int i = 8 / 2;
    5. System.out.println("Running result is " + i);
    6. return i;
    7. /**
    8. * public CompletableFuture thenRun(Runnable action) {
    9. * return uniRunStage(null, action);
    10. * }
    11. *
    12. * public CompletableFuture thenRunAsync(Runnable action) {
    13. * return uniRunStage(asyncPool, action);
    14. * }
    15. *
    16. * public CompletableFuture thenRunAsync(Runnable action,
    17. * Executor executor) {
    18. * return uniRunStage(screenExecutor(executor), action);
    19. * }
    20. */
    21. },executorService).thenRunAsync(()->{
    22. System.out.println("Task 2 starts");
    23. },executorService);
    1. //能接受上一步的结果,但无返回值
    2. CompletableFuture future1 = CompletableFuture.supplyAsync(()->{
    3. System.out.println("Current thread is " + Thread.currentThread().getId());
    4. int i = 8 / 2;
    5. System.out.println("Running result is " + i);
    6. return i;
    7. /**
    8. * public CompletableFuture thenAccept(Consumer action) {
    9. * return uniAcceptStage(null, action);
    10. * }
    11. *
    12. * public CompletableFuture thenAcceptAsync(Consumer action) {
    13. * return uniAcceptStage(asyncPool, action);
    14. * }
    15. *
    16. * public CompletableFuture thenAcceptAsync(Consumer action,
    17. * Executor executor) {
    18. * return uniAcceptStage(screenExecutor(executor), action);
    19. * }
    20. */
    21. },executorService).thenAcceptAsync((result)->{
    22. System.out.println("Task 2 starts,the result of the previous step is "+result);
    23. },executorService);
    1. //能接受到上一步的结果,并且本步骤也有返回值
    2. CompletableFuture future1 = CompletableFuture.supplyAsync(()->{
    3. System.out.println("Current thread is " + Thread.currentThread().getId());
    4. int i = 8 / 2;
    5. System.out.println("Running result is " + i);
    6. return i;
    7. /**
    8. * public CompletableFuture thenApply(
    9. * Function fn) {
    10. * return uniApplyStage(null, fn);
    11. * }
    12. *
    13. * public CompletableFuture thenApplyAsync(
    14. * Function fn) {
    15. * return uniApplyStage(asyncPool, fn);
    16. * }
    17. *
    18. * public CompletableFuture thenApplyAsync(
    19. * Function fn, Executor executor) {
    20. * return uniApplyStage(screenExecutor(executor), fn);
    21. * }
    22. */
    23. },executorService).thenApplyAsync((result)->{
    24. System.out.println("Task 2 starts,the result of the previous step is "+result);
    25. return "This step has result too,"+result;
    26. },executorService);
    27. try {
    28. System.out.println(future1.get());
    29. } catch (InterruptedException | ExecutionException e) {
    30. e.printStackTrace();
    31. }

    5.两任务组合 - 都要完成

    两个任务必须都完成,触发该任务。
    thenCombine :组合两个 future ,获取两个 future 的返回结果,并返回当前任务的返回值
    thenAcceptBoth :组合两个 future ,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
    runAfterBoth :组合两个 future ,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务
    1. CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    2. System.out.println("Thread01 is " + Thread.currentThread().getId());
    3. int i = 8 / 2;
    4. System.out.println("Running result01 is " + i);
    5. return i;
    6. }, executorService);
    7. CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    8. System.out.println("Thread02 is " + Thread.currentThread().getId());
    9. System.out.println("Running result02 is ");
    10. return "hello";
    11. }, executorService);
    12. //得不到上两个任务的结果,无返回值
    13. // future1.runAfterBothAsync(future2,()->{
    14. // System.out.println("task 3 is running");
    15. // },executorService);
    16. //得到上两个任务的结果,无返回值
    17. // future1.thenAcceptBothAsync(future2,(result1,result2)->{
    18. // System.out.println("Get previous results "+result1+","+result2);
    19. // },executorService);
    20. //得到上两个任务的结果,有返回值
    21. CompletableFuture future = future1.thenCombineAsync(future2, (result1, result2) -> {
    22. return result1 + result2;
    23. }, executorService);
    24. try {
    25. System.out.println("The result obtained by combining the two results "+future.get());
    26. } catch (InterruptedException | ExecutionException e) {
    27. e.printStackTrace();
    28. }

    6.两任务组合 - 完成其中一个即可

    当两个任务中,任意一个 future 任务完成的时候,执行任务。
    applyToEither :两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
    acceptEither :两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
    runAfterEither :两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值
    1. CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    2. System.out.println("Thread01 is " + Thread.currentThread().getId());
    3. int i = 8 / 2;
    4. System.out.println("Running result01 is " + i);
    5. return i;
    6. }, executorService);
    7. CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    8. System.out.println("Thread02 is " + Thread.currentThread().getId());
    9. int i = 10 / 2;
    10. System.out.println("Running result02 is ");
    11. return i;
    12. }, executorService);
    13. // future1.runAfterEitherAsync(future2,()->{
    14. // System.out.println("task 3 is running");
    15. // },executorService);
    16. // future1.acceptEitherAsync(future2,(result)->{
    17. // System.out.println("Get previous results "+result);
    18. // },executorService);
    19. CompletableFuture future = future1.applyToEitherAsync(future2, (result) -> {
    20. return result;
    21. }, executorService);
    22. try {
    23. System.out.println("The result obtained by combining the result "+future.get());
    24. } catch (InterruptedException | ExecutionException e) {
    25. e.printStackTrace();
    26. }

    7.多任务组合

    allOf :等待所有任务完成
    anyOf :只要有一个任务完成
    1. CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    2. System.out.println("Thread01 is " + Thread.currentThread().getId());
    3. int i = 8 / 2;
    4. System.out.println("Running result01 is " + i);
    5. return i;
    6. }, executorService);
    7. CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    8. System.out.println("Thread02 is " + Thread.currentThread().getId());
    9. int i = 10 / 2;
    10. System.out.println("Running result02 is " + i);
    11. return i;
    12. }, executorService);
    13. CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
    14. System.out.println("Thread03 is " + Thread.currentThread().getId());
    15. int i = 12 / 2;
    16. System.out.println("Running result03 is " + i);
    17. return i;
    18. }, executorService);
    19. CompletableFuture anyOf = CompletableFuture.anyOf(future1, future2, future3);
    20. //CompletableFuture allOf = CompletableFuture.allOf(future1, future2, future3);
    21. try {
    22. anyOf.get();//只要有一个执行成功即可
    23. System.out.println(anyOf.get());
    24. //allOf.get();//等待所有结果完成
    25. //System.out.println(future1.get()+","+future2.get()+","+future3.get());
    26. } catch (InterruptedException | ExecutionException e) {
    27. e.printStackTrace();
    28. }
    29. 相关阅读:
      jvm性能监控、故障处理命令行工具详解(jps、jstat、jinfo、jmap、jhat、jstack)(宝藏博文)
      172基于matlab的MPPT智能算法
      ise使用ChipScope时报错NgdBuild:604
      1.Python 设计模式
      OpenCV 05(图像的算术与位运算)
      cad怎么转换成黑白的pdf图纸?分享3个常用的软件!
      漏洞分析|Apache Airflow Pinot Provider 命令注入漏洞
      系统架构设计:9 论软件系统架构评估及其应用
      LeetCode 494.目标和 (动态规划 + 性能优化)二维数组 压缩成 一维数组
      电商新趋势:阿里巴巴1688.item_password API引领智能分享新风尚
    30. 原文地址:https://blog.csdn.net/xushuai2333333/article/details/126775017