• completablefuture使用案例代码


    就是将回调方法转为CompletableFuture,然后再依赖CompletableFure的能力进行调用编排

    1. @FunctionalInterface
    2. public interface ThriftAsyncCall {
    3. void invoke() throws TException;
    4. }
    5. /**
    6. * OctoThriftCallback 为thrift回调方法
    7. * ThriftAsyncCall 为自定义函数,用来表示一次thrift调用(定义如上)
    8. */
    9. public static CompletableFuture toCompletableFuture(final OctoThriftCallback callback , ThriftAsyncCall thriftCall) {
    10. //新建一个未完成的CompletableFuture
    11. CompletableFuture resultFuture = new CompletableFuture<>();
    12. //监听回调的完成,并且与CompletableFuture同步状态
    13. callback.addObserver(new OctoObserver() {
    14. @Override
    15. public void onSuccess(T t) {
    16. resultFuture.complete(t);
    17. }
    18. @Override
    19. public void onFailure(Throwable throwable) {
    20. resultFuture.completeExceptionally(throwable);
    21. }
    22. });
    23. if (thriftCall != null) {
    24. try {
    25. thriftCall.invoke();
    26. } catch (TException e) {
    27. resultFuture.completeExceptionally(e);
    28. }
    29. }
    30. return resultFuture;
    31. }

    线程池使用

    1. ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
    2. CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    3. System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());
    4. //业务操作
    5. return "";
    6. }, threadPool1);
    7. //此时,如果future1中的业务操作已经执行完毕并返回,则该thenApply直接由当前main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。
    8. future1.thenApply(value -> {
    9. System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());
    10. return value + "1";
    11. });
    12. //使用ForkJoinPool中的共用线程池CommonPool
    13. future1.thenApplyAsync(value -> {
    14. //do something
    15. return value + "1";
    16. });
    17. //使用指定线程池
    18. future1.thenApplyAsync(value -> {
    19. //do something
    20. return value + "1";
    21. }, threadPool1);

    线程池循环引用会导致死锁

    1. public Object doGet() {
    2. ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
    3. CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
    4. //do sth
    5. return CompletableFuture.supplyAsync(() -> {
    6. System.out.println("child");
    7. return "child";
    8. }, threadPool1).join();//子任务
    9. }, threadPool1);
    10. return cf1.join();
    11. }

     如上代码块所示,doGet方法第三行通过supplyAsync向threadPool1请求线程,并且内部子任务又向threadPool1请求线程。threadPool1大小为10,当同一时刻有10个请求到达,则threadPool1被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复。

    为了修复该问题,需要将父任务与子任务做线程池隔离,两个任务请求不同的线程池,避免循环依赖导致的阻塞

    服务异步化后很多步骤都会依赖于异步RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应

    自定义的工具类ExceptionUtils

    1. public class ExceptionUtils {
    2. public static Throwable extractRealException(Throwable throwable) {
    3. //这里判断异常类型是否为CompletionException、ExecutionException,如果是则进行提取,否则直接返回。
    4. if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
    5. if (throwable.getCause() != null) {
    6. return throwable.getCause();
    7. }
    8. }
    9. return throwable;
    10. }
    11. }
    12. remarkResultFuture
    13. .thenApply(result -> {//这里增加了一个回调方法thenApply,如果发生异常thenApply内部会通过new CompletionException(throwable) 对异常进行包装
    14. //这里是一些业务操作
    15. })
    16. .exceptionally(err -> {//通过exceptionally 捕获异常,这里的err已经被thenApply包装过,因此需要通过Throwable.getCause()提取异常
    17. log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, ExceptionUtils.extractRealException(err));
    18. return 0;
    19. });

    大部分异常会封装成CompletionException后抛出,真正的异常存储在cause属性中,因此如果调用链中经过了回调方法处理那么就需要用Throwable.getCause()方法提取真正的异常

    常用工具类代码

    1. /**
    2. * 设置CF状态为失败
    3. */
    4. public static CompletableFuture failed(Throwable ex) {
    5. CompletableFuture completableFuture = new CompletableFuture<>();
    6. completableFuture.completeExceptionally(ex);
    7. return completableFuture;
    8. }
    9. /**
    10. * 设置CF状态为成功
    11. */
    12. public static CompletableFuture success(T result) {
    13. CompletableFuture completableFuture = new CompletableFuture<>();
    14. completableFuture.complete(result);
    15. return completableFuture;
    16. }
    17. /**
    18. * 将List> 转为 CompletableFuture>
    19. */
    20. public static CompletableFuture> sequence(Collection> completableFutures) {
    21. return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
    22. .thenApply(v -> completableFutures.stream()
    23. .map(CompletableFuture::join)
    24. .collect(Collectors.toList())
    25. );
    26. }
    27. /**
    28. * 将List>> 转为 CompletableFuture>
    29. * 多用于分页查询的场景
    30. */
    31. public static CompletableFuture> sequenceList(Collection>> completableFutures) {
    32. return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
    33. .thenApply(v -> completableFutures.stream()
    34. .flatMap( listFuture -> listFuture.join().stream())
    35. .collect(Collectors.toList())
    36. );
    37. }
    38. /*
    39. * 将List>> 转为 CompletableFuture>
    40. * @Param mergeFunction 自定义key冲突时的merge策略
    41. */
    42. public static CompletableFuture> sequenceMap(
    43. Collection>> completableFutures, BinaryOperator mergeFunction) {
    44. return CompletableFuture
    45. .allOf(completableFutures.toArray(new CompletableFuture[0]))
    46. .thenApply(v -> completableFutures.stream().map(CompletableFuture::join)
    47. .flatMap(map -> map.entrySet().stream())
    48. .collect(Collectors.toMap(Entry::getKey, Entry::getValue, mergeFunction)));
    49. }
    50. /**
    51. * 将List> 转为 CompletableFuture>,并过滤调null值
    52. */
    53. public static CompletableFuture> sequenceNonNull(Collection> completableFutures) {
    54. return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
    55. .thenApply(v -> completableFutures.stream()
    56. .map(CompletableFuture::join)
    57. .filter(e -> e != null)
    58. .collect(Collectors.toList())
    59. );
    60. }
    61. /**
    62. * 将List>> 转为 CompletableFuture>,并过滤调null值
    63. * 多用于分页查询的场景
    64. */
    65. public static CompletableFuture> sequenceListNonNull(Collection>> completableFutures) {
    66. return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
    67. .thenApply(v -> completableFutures.stream()
    68. .flatMap( listFuture -> listFuture.join().stream().filter(e -> e != null))
    69. .collect(Collectors.toList())
    70. );
    71. }
    72. /**
    73. * 将List>> 转为 CompletableFuture>
    74. * @Param filterFunction 自定义过滤策略
    75. */
    76. public static CompletableFuture> sequence(Collection> completableFutures,
    77. Predicatesuper T> filterFunction) {
    78. return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
    79. .thenApply(v -> completableFutures.stream()
    80. .map(CompletableFuture::join)
    81. .filter(filterFunction)
    82. .collect(Collectors.toList())
    83. );
    84. }
    85. /**
    86. * 将List>> 转为 CompletableFuture>
    87. * @Param filterFunction 自定义过滤策略
    88. */
    89. public static CompletableFuture> sequenceList(Collection>> completableFutures,
    90. Predicatesuper T> filterFunction) {
    91. return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
    92. .thenApply(v -> completableFutures.stream()
    93. .flatMap( listFuture -> listFuture.join().stream().filter(filterFunction))
    94. .collect(Collectors.toList())
    95. );
    96. }
    97. /**
    98. * 将CompletableFuture>的list转为 CompletableFuture>。 多个map合并为一个map。 如果key冲突,采用新的value覆盖。
    99. */
    100. public static CompletableFuture> sequenceMap(
    101. Collection>> completableFutures) {
    102. return CompletableFuture
    103. .allOf(completableFutures.toArray(new CompletableFuture[0]))
    104. .thenApply(v -> completableFutures.stream().map(CompletableFuture::join)
    105. .flatMap(map -> map.entrySet().stream())
    106. .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> b)));
    107. }}

  • 相关阅读:
    Note_First:Hadoop安装部署与测试
    线代 | 秒杀方法与技巧
    自制了一个开发团队展示控制器来为成员引流【前端】
    java集合概述:ArrayList[67]
    离线地图二次开发(一套代码支持所有地图源)
    花了一周时间,更新了下软考云题库Web版
    解决telnet不是内部或外部以及验证某个端口是否开放
    【云原生之Docker实战】使用Docker部署phpMyAdmin数据库管理工具
    SpringMVC基础入门及工作流程---全方面详细介绍
    记录一次移动app的性能测试案例
  • 原文地址:https://blog.csdn.net/qq_36042938/article/details/126244457