• CompletableFuture异步编排


    1.1 CompletableFuture介绍

    Future 是 Java 5 添加的类,用来描述一个异步计算的结果。你可以使用 isDone 方法检查计算是否完成,或者使用 get 阻塞住调用线程,直到计算完成返回结果,你也可以使用 cancel 方法停止任务的执行。

    虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

    很多语言,比如 Node.js,采用回调的方式实现异步编程。Java 的一些框架,比如 Netty,自己扩展了 Java 的 Future 接口,提供了 addListener 等多个扩展方法;Google guava 也提供了通用的扩展 Future;Scala 也提供了简单易用且功能强大的 Future/Promise 异步编程模式。

    作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?

    在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture ,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。
    CompletableFuture 类实现了,Future ,接口,所以你还是可以像以前一样通过 get 方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

    CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。

    1.2 创建异步对象

    CompletableFuture 提供了四个静态方法来创建一个异步操作。

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    
    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 
    
    • 1
    • 2
    • 3
    • 4
    • 5

    没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

    • runAsync方法不支持返回值。
    • supplyAsync可以支持返回值。

    1.3 计算完成时回调方法

    当 CompletableFuture 的计算结果完成,或者抛出异常的时候,可以执行特定的 Action

    主要是下面的方法:

    public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) 
    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) 
    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) 
    public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) 
    
    • 1
    • 2
    • 3
    • 4

    whenComplete 可以处理正常或异常的计算结果,exceptionally 处理异常情况。
    BiConsumer 可以定义处理业务。
    whenComplete 和 whenCompleteAsync 的区别:

    • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
    • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

    方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。

    代码示例:

    public class CompletableFutureDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
                @Override
                public Object get() {
                    System.out.println(Thread.currentThread().getName() + "\t completableFuture");
                    //int i = 10 / 0;
                    return 1024;
                }
            }).whenComplete(new BiConsumer<Object, Throwable>() {
                @Override
                public void accept(Object o, Throwable throwable) {
                    System.out.println("-------o=" + o.toString());
                    System.out.println("-------throwable=" + throwable);
                }
            }).exceptionally(new Function<Throwable, Object>() {
                @Override
                public Object apply(Throwable throwable) {
                    System.out.println("throwable=" + throwable);
                    return 6666;
                }
            });
            System.out.println(future.get());
        }
    }
    
    // 运行结果
    ForkJoinPool.commonPool-worker-9	 completableFuture
    -------o=1024
    -------throwable=null
    1024
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    1.4 线程串行化与并行化方法

    thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) 
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 
    
    • 1
    • 2
    • 3

    thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
    
    • 1
    • 2
    • 3

    thenRun 方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作。

    public CompletableFuture<Void> thenRun(Runnable action) 
    public CompletableFuture<Void> thenRunAsync(Runnable action) 
    public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
    
    • 1
    • 2
    • 3

    带有 Async 默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

    Function
    T:上一个任务返回结果的类型
    U:当前任务的返回值类型

    代码演示:

    public class CompletableFutureDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    System.out.println(Thread.currentThread().getName() + "\t completableFuture");
                    //int i = 10 / 0;
                    return 1024;
                }
            }).thenApply(new Function<Integer, Integer>() {
                @Override
                public Integer apply(Integer o) {
                    System.out.println("thenApply方法,上次返回结果:" + o);
                    return  o * 2;
                }
            }).whenComplete(new BiConsumer<Integer, Throwable>() {
                @Override
                public void accept(Integer o, Throwable throwable) {
                    System.out.println("-------o=" + o);
                    System.out.println("-------throwable=" + throwable);
                }
            }).exceptionally(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) {
                    System.out.println("throwable=" + throwable);
                    return 6666;
                }
            });
            System.out.println(future.get());
        }
       
    }
    
    // 运行结果
    ForkJoinPool.commonPool-worker-9	 completableFuture
    thenApply方法,上次返回结果:1024
    -------o=2048
    -------throwable=null
    2048
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    测试:改变线程的睡眠时间,则会交替打印,则说明是并行执行。

    public class CompletableFutureDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    50, 500, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000));
            // 线程1执行返回的结果:hello
            CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");
            // 线程2 获取到线程1执行的结果
            CompletableFuture<Void> futureB = futureA.thenAcceptAsync((s) -> {
                delaySec(1);
                printCurrTime(s + " 第一个线程");
            }, threadPoolExecutor);
    
            CompletableFuture<Void> futureC = futureA.thenAcceptAsync((s) -> {
                delaySec(1);
                printCurrTime(s + " 第二个线程");
            }, threadPoolExecutor);
    
        }
    
        private static void printCurrTime(String str) {
            System.out.println(str);
        }
    
        private static void delaySec(int i) {
            try {
                Thread.sleep(i * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    // 运行结果
    hello 第一个线程
    hello 第二个线程
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    1.5 多任务组合

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 
    
    • 1
    • 2

    allOf:等待所有任务完成;
    anyOf:只要有一个任务完成。

  • 相关阅读:
    飞桨EasyDL实操范例:工业零件划痕自动识别
    ASEMI肖特基二极管MBR30100CT特征,MBR30100CT应用
    python 控制包是否可导入
    mysql的主从复制与读写分离
    Java手写希尔排序和算法案例拓展
    人工智能专栏第十三讲——动作检测
    信息安全软考——第三章 密码学基本理论笔记 很全呀!
    PATH 与 LD_LIBRARY_PATH 的用法举例
    Springboot整合taos时序数据库TDengine
    Java面试八股文宝典:初识数据结构-数组的应用扩展之HashMap
  • 原文地址:https://blog.csdn.net/weixin_44129618/article/details/126451260