• CountDownLatch翻车后,我建议用CompletableFuture改造下


    CompletableFuture改造

    我先直接分享一下我是如何使用CompletableFuture的吧

    1. // 下载文件总数,初始化
    2. List<Integer> resultList = new ArrayList<>(1000);
    3. ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();
    4. IntStream.range(0,1000).forEach(resultList::add);
    1. public List<R> sendAsyncBatch(List<P> list, Executor executor, TaskLoader<R,P> loader) {
    2. List<R> resultList = new CopyOnWriteArrayList<>();
    3. if (CollectionUtils.isNotEmpty(list)) {
    4. Executor finalExecutor = executor;
    5. // 将任务拆分分成每50个为一个任务
    6. CollUtil.split(list, 50)
    7. .forEach(tempList -> {
    8. CompletableFuture[] completableFutures = tempList.stream()
    9. .map(p -> CompletableFuture.supplyAsync(() -> {
    10. try {
    11. return loader.load(p);
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. return null;
    16. }, finalExecutor)
    17. .handle((result, throwable) -> {
    18. if (Objects.nonNull(throwable)) {
    19. //log.error("async error:{}", throwable.getMessage());
    20. } else if (Objects.nonNull(result)) {
    21. //log.info("async success:{}", result);
    22. } else {
    23. //log.error("async result is null");
    24. }
    25. return result;
    26. }).whenComplete((r, ex) -> {
    27. if (Objects.nonNull(r)) {
    28. resultList.add((R) r);
    29. }
    30. })
    31. ).toArray(CompletableFuture[]::new);
    32. CompletableFuture.allOf(completableFutures).join();
    33. System.out.println(resultList.size());
    34. });
    35. }
    36. return resultList;
    37. }
    1. // 具体业务逻辑实现接口
    2. @FunctionalInterface
    3. public interface TaskLoader {
    4. T load(P p) throws InterruptedException;
    5. }
    1. // 自定义启动器
    2. ExecutorService executorService = BaseThreadPoolExector.queueExecutor(new ArrayBlockingQueue<>(5));
    3. AsyncTask<Integer, Integer> asyncTask = new AsyncTask();
    4. List<Integer> list = asyncTask.sendAsyncBatch(resultList, executorService, new TaskLoadImpl());

    我先说一下,为什么要CountDownLatch替换掉

    1. CompletableFuture为我们提供更直观、更优美的API。
    2. 在“多个任务等待完成状态”这个应用场景,在遇到异常的情况下我们不需要去手动的抛异常,以免错误处理细节导致阻塞
    3. CompletableFuture也可以定制执行器

    但是他也是有缺点的,我个人感觉他的API有点多,看的时候让人眼花。

    短短十几行的代码,看到了很多API supplyAsync、handle、whenComplete、allOf

    之后我们还会用到runAsync、 thenApply、thenCompose等等其他的。

    什么是CompletableFuture?

    异步编程利用多线程优化性能这个核心方案得以实施的基础

    他的目的也很简单,同一个CPU上执行几个松耦合的任务,充分利用CPU核数,实现最大化吞吐量,避免因为阻塞造成等待时间过长;

    1. 要区分并发与并行的区别

    我们还需要特别的注意这两个概念不能混淆

    并发:在一个CPU上串行执行

    并行:多个CPU上同时执行任务

    2. Future接口

    CompletableFuture主要继承了Future接口,但是他比Future接口丰富的很多

    1. // 取消
    2. boolean cancel(boolean mayInterruptIfRunning);
    3. // 判断是否取消
    4. boolean isCancelled();
    5. //是否异步计算是否已经结束
    6. boolean isDone();
    7. // 获取计算结果
    8. V get() throws InterruptedException, ExecutionException;
    9. // 设置最长计算时间,返回计算结果
    10. V get(long timeout, TimeUnit unit)
    11. throws InterruptedException, ExecutionException, TimeoutException;

    可以看到Future接口的局限性,主要是用起来不省事 举个例子:A线程执行完之后通知B线程执行

    1. ExecutorService executorService = BaseThreadPoolExector.calculateExecutor();
    2. Future<String> futureA = executorService.submit(() -> Thread.currentThread().getName());
    3. System.out.println(futureA.get());
    4. if (futureA.isDone()){
    5. Future<String> futureB = executorService.submit(() -> Thread.currentThread().getName());
    6. System.out.println(futureB.get());
    7. }
    8. executorService.shutdown();

    这里我们就需要查询futureA.isDone()结果,然后再去执行B线程的业务

    而 CompletableFuture 操作起来就便捷很多了

    1. CompletableFuture<String> completableFuture = CompletableFuture
    2. .supplyAsync(() -> Thread.currentThread().getName(), executorService)
    3. .thenApply(s -> Thread.currentThread().getName());
    4. System.out.println(completableFuture.get());
    5. 准备执行
    6. 计划执行
    7. supplyAsync result pool-1-thread-1, thenApply result main
    8. 线程退出
    9. 复制代码

    supplyAsync执行完成之后,再去执行thenApply

    没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;

    3. 错误处理细节,避免造成阻塞

    1. CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
    2. new Thread(() ->{
    3. try {
    4. int kk = 10/0;
    5. System.out.println(kk);
    6. }catch (Exception ex){
    7. //ex.printStackTrace();
    8. completableFuture.completeExceptionally(ex);
    9. }
    10. }).start();
    11. try {
    12. System.out.println(completableFuture.get());
    13. } catch (InterruptedException | ExecutionException e) {
    14. e.printStackTrace();
    15. }

    注意到catch里面的completeExceptionally函数了吧,

    这个主要的作用就是为了抛出异常,

    如果缺少了他,就会造成completableFuture.get()一直处于等待造成阻塞,

    与此同时,没有为我们抛出异常信息。

    所以CompletableFuture的API优美之处又要体现出来了

    1. CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    2. int kk = 10 / 0;
    3. return kk;
    4. }).handle((result, throwable) -> {
    5. System.out.println(result);
    6. System.out.println(throwable.getMessage());
    7. return result;
    8. }).whenComplete((result ,throwable) -> System.out.println(result));

    supplyAsync配合着 handle 和 whenComplete,将异常和结果进行处理.

    handle 和 whenComplete的区别

    1. whenComplete
    2. public CompletableFuture<T> whenComplete(
    3. BiConsumer<? super T, ? super Throwable> action) {
    4. return uniWhenCompleteStage(null, action);
    5. }
    6. handle
    7. public <U> CompletableFuture<U> handle(
    8. BiFunction<? super T, Throwable, ? extends U> fn) {
    9. return uniHandleStage(null, fn);
    10. }

    whenComplete是BiConsumer也就是直接消费不返回值,

    handle是BiFunction也就是需要返回值

    了解API

    欲善其功,必先利其器

    我们主要从这三种关系下手去了解和使用API 涉及接口

    1. CompletionStage thenApply(fn);
    2. CompletionStage thenApplyAsync(fn);
    3. CompletionStage thenAccept(consumer);
    4. CompletionStage thenAcceptAsync(consumer);
    5. CompletionStage thenRun(action);
    6. CompletionStage thenRunAsync(action);
    7. CompletionStage thenCompose(fn);
    8. CompletionStage thenComposeAsync(fn);

    thenApply函数里参数入参Function fn,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply函数出参的是CompletionStage

    thenAccept类型函数入参Consumer action是一个消费类型的,回参是CompletionStage所以thenAccept类型函数不会有返回值。

    thenRun函数入参Runnable action,回参CompletionStage,所以既不能接收参数也不支持返回值。

    thenCombine函数入参CompletionStage other, BiFunction fn,回参CompletableFuture是支持返回值的,他的作用主要使用BiFunction处理两个阶段的结果

    我们只需要注意他的入参、回参和函数后缀就能够区分出他们的不同

    1. CompletableFuture中的串行化关系

    1. CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() ->{
    2. //int kk = 10/0;
    3. return Thread.currentThread().getName() + ":小郭";
    4. },executorService).thenApply(s -> {
    5. return s + "拿茶叶";
    6. }).thenApply(a ->{
    7. return a + ",泡茶去";
    8. }).handle((result, ex) ->{
    9. if (ex != null){
    10. System.out.println(ex.getMessage());
    11. }
    12. return result;
    13. }).whenComplete((r, ex) ->{
    14. System.out.println(r);
    15. });
    16. task1.join();

    执行结果:

    1. 准备执行
    2. 计划执行
    3. pool-1-thread-1:小郭拿茶叶,泡茶去

    可以看到,是按照之上而下的顺序去执行的supplyAsync、thenApply、thenApply 如果第二阶段任务没有拿到第一阶段的结果,他就会等待

    2. CompletableFuture中的汇聚AND关系

    1. CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() ->{
    2. int t = new Random().nextInt(30);
    3. try {
    4. Thread.sleep(10000);
    5. } catch (InterruptedException e) {
    6. e.printStackTrace();
    7. }
    8. System.out.println("task1=" + t);
    9. return t;
    10. });
    11. CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() ->{
    12. int t = new Random().nextInt(30);
    13. try {
    14. Thread.sleep(t);
    15. } catch (InterruptedException e) {
    16. e.printStackTrace();
    17. }
    18. System.out.println("task2=" + t);
    19. return t;
    20. });
    21. CompletableFuture<Integer> task3 = task1.thenCombineAsync(task2, Integer::sum);
    22. task3.join();

    等待task1和task2执行完成,task再进行处理

    执行结果

    1. task1=1
    2. task2=3
    3. 4

    3. CompletableFuture中的汇聚OR关系

    1. CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() ->{
    2. int t = new Random().nextInt(5);
    3. try {
    4. Thread.sleep(t * 1000);
    5. } catch (InterruptedException e) {
    6. e.printStackTrace();
    7. }
    8. System.out.println("task1=" + t);
    9. return t;
    10. });
    11. CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() ->{
    12. int t = new Random().nextInt(5);
    13. try {
    14. Thread.sleep(t * 1000);
    15. } catch (InterruptedException e) {
    16. e.printStackTrace();
    17. }
    18. System.out.println("task2=" + t);
    19. return t;
    20. });
    21. CompletableFuture<Integer> task3 = task1.applyToEither(task2, s ->s);
    22. task3.join();

    谁先执行完先输出谁,如果相同时间执行完,则一起数据

    执行结果

    1. 我快我先来 task2=2
    2. 我快我先来 task1=2
    3. 2
    4. 我快我先来 task2=0
    5. 0

    实现List任务并行执行的方式

    1. 并行流进行操作
    2. 使用CompletableFuture发起异步请求,最后使用join等待所有异步操作结束

    为了更好的发挥出CompletableFuture,需要采用定制的执行器

    那这两个如何选择?

    1. 进行计算密集型,并且没有I/O操作,推荐使用Sream并行流,没必要创建更多的线程,线程过多反而是一种浪费
    2. 涉及I/O等待的操作,CompletableFuture的灵活性会更高

    现在回过头看一下,我上面的改造方法,是不是就感觉清晰了许多,不足的地方大家提出来

    总结

    1. 章我主要是根据大家的建议,使用了Java8的CompletableFuture 来进行了原来的业务功能改造.
    2. 在执行比较耗时的业务操作时候可以使用异步编程来提高性能,加快程序的处理速度
    3. 在处理异常机制的时候,往往是让我们很头痛的,担心线程中出现的异常没有及时捕获,造成程序的阻塞或者其他方面的影响,CompletableFuture 提供了优秀的异常管理机制。
    4. CompletableFuture 还提供了 串行、聚合、优先输出的函数,更贴切业务需求做出最好的选择。
  • 相关阅读:
    Dockerfile
    SpringBoot篇
    Spring Security对接OIDC(OAuth2)外部认证
    CSS的媒体查询:响应式布局的利器
    python最优化算法实战---线性规划之内点法
    计算机毕业设计ssm高校图书馆网站m7o77系统+程序+源码+lw+远程部署
    OPENGL 不同绘制图元的类型,顶点索引的解释方式不同
    Java+SSM+JSP实现医院预约挂号系统
    C++调用构造函数,后面用冒号添加变量的作用
    vue单向以及双向数据绑定(v-bind和v-model使用)
  • 原文地址:https://blog.csdn.net/Java_ttcd/article/details/126482816