• CompletableFuture用法


    简介

    CompletableFuture实现了CompletionStage接口和Future接口,并在此基础上进行了丰富的扩展,完美弥补了Future的局限性,同时CompletableFuture实现了对任务编排的能力,增加了异步回调、流式处理、组合处理的能力。

    CompletableFuture构建

    CompletableFuture提供了4种构建方式,如下

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
    
    • 1
    • 2
    • 3
    • 4

    supplyAsync

    // 异步执行一个任务,带返回结果
    supplyAsync(Supplier<U> supplier)
    // 异步执行一个任务,可以指定一个线程池,带返回结果
    supplyAsync(Supplier<U> supplier, Executor executor)
    
    • 1
    • 2
    • 3
    • 4

    使用方式

    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        System.out.println("异步执行任务");
        return "异步执行结果";
    });
    // 阻塞等待执行结果
    System.out.println(future.get());
    
    // 也可以指定一个线程池
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        System.out.println("异步执行任务");
        return "异步执行结果";
    }, executorService);
    // 阻塞等待执行结果
    System.out.println(future.get());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    runAsync

    // 异步执行一个任务,不带返回结果
    runAsync(Runnable runnable)
    // 异步执行一个任务,可以指定一个线程池,不带返回结果
    runAsync(Runnable runnable, Executor executor)
    
    • 1
    • 2
    • 3
    • 4

    使用方式

    CompletableFuture future = CompletableFuture.runAsync(() -> {
        System.out.println("异步执行任务");
    });
    // 阻塞等待执行结果
    future.get();
    
    // 也可以指定一个线程池
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    CompletableFuture future = CompletableFuture.runAsync(() -> {
        System.out.println("异步执行任务");
    }, executorService);
    // 阻塞等待执行结果
    future.get();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    complete

    主动完成一个任务,并告诉它执行结果,可以用来异步数据传递

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture future = new CompletableFuture();
        // CompletableFuture没有执行任务,调用get方法会阻塞
        new Thread(new TestCompletable(future)).start();
        future.complete("传递一个数据");
    }
    
    static class TestCompletable implements Runnable {
    
        private CompletableFuture future;
    
        public TestCompletable(CompletableFuture future) {
            this.future = future;
        }
    
        @Override
        public void run() {
            System.out.println("等待唤醒并输出异步结果");
            try {
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    join

    同步等待一个异步执行结果,与get方法一样,只是抛出的异常不一样

    completeExceptionally

    传递一个异常信息,与complete类似

    public boolean completeExceptionally(Throwable ex)
    
    • 1

    allOf

    传递多个CompletableFuture,等待所有传入的CompletableFuture执行完成后,返回一个没有返回值的CompletableFuture

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
    
    • 1

    anyOf

    传递多个CompletableFuture,任何一个传入的CompletableFuture执行完成后,返回完成的那个CompletableFuture

    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
    
    • 1

    thenAccept

    依赖一个异步执行结果

    CompletableFuture future = CompletableFuture.supplyAsync(() -> "异步执行结果");
    // 接收一个异步执行结果
    future.thenAccept((rs) -> {
        System.out.println("接收一个异步执行结果: " + rs);
    });
    // 阻塞等待执行结果
    System.out.println(future.get());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    还可以使用链式风格

    CompletableFuture.supplyAsync(() -> "异步执行结果").thenAccept((rs) -> {
        System.out.println("接收一个异步执行结果: " + rs);
    }).get();
    
    • 1
    • 2
    • 3

    thenAcceptBoth

    依赖两个任务执行结果

    CompletableFuture.supplyAsync(() -> "future1")
            .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "future2"), (f1, f2) -> {
        System.out.println("执行结果:" + f1 + f2);
    }).get();
    
    • 1
    • 2
    • 3
    • 4

    acceptEither

    依赖任意一个任务的执行结果

    CompletableFuture.supplyAsync(() -> "future1")
           .acceptEither(CompletableFuture.supplyAsync(() -> "future2"), (f) -> {
        System.out.println("执行结果: " + f);
    });
    
    • 1
    • 2
    • 3
    • 4

    thenApply

    依赖一个异步执行结果,并且返回新的执行结果

    CompletableFuture future = CompletableFuture.supplyAsync(() -> "future1")
            .thenApply(f -> f + "apply");
    System.out.println(future.get());
    
    • 1
    • 2
    • 3

    thenCombineAsync

    依赖两个任务执行结果,并且返回新的执行结果

    CompletableFuture future = CompletableFuture.supplyAsync(() -> "future1")
            .thenCombineAsync(CompletableFuture.supplyAsync(() -> "furure2"), (f1, f2) -> f1 + f2);
    System.out.println(future.get());
    
    • 1
    • 2
    • 3

    异常处理

    // 出现异常时,后面的所有任务都无法执行
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("异常");
    }).runAfterBoth(CompletableFuture.supplyAsync(() -> "future"), () -> {
        System.out.println("任务执行");
    });
    System.out.println(future.get());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可以通过whenComplete捕获前置任务异常

    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("异常");
    }).whenComplete((r, e) -> {
       if(null != e) {
           System.out.println("出现异常");
       } else{
           System.out.println(r);
       }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    也可以使用handleAsync,不管前置任务是否异常都会执行

    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("异常");
    }).handleAsync((r, e) -> null == e ? r : null);
    System.out.println(future.get());
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    FPGA解析B码----连载4
    新库上线 | CnOpenDataA股上市公司IPO申报发行文本数据
    大厂标配 | 百亿级并发系统设计 | 学完薪资框框涨
    论文笔记:HG-SL 面向假新闻早期发现的全局和本地用户传播行为联合学习
    一个计算密集小程序在不同CPU下的表现
    [附源码]计算机毕业设计springboot基于vue+mysql开发的考试系统
    http调用 采用 Basic Auth 进行请求
    基于51单片机火灾监测自动灭火装置Proteus仿真
    什么是“缓存和数据库一致性“问题?
    每日三题 9.06
  • 原文地址:https://blog.csdn.net/u010825190/article/details/126030696