• 【JUC】CompletableFuture


    1. Future接口

    • Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等
    • 比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其它事情或者先执行完,过了一会才去获取子任务的执行结果或变更的任务状态
    • 三个特点:
      • 多线程
      • 有返回
      • 异步任务
    • FutureTask基本使用:
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            FutureTask<String> futureTask = new FutureTask<>(() ->{
                System.out.println("我进来了");
                return "老六是我";
            });
            Thread t1 = new Thread(futureTask, "t1");
            t1.start();
            System.out.println(futureTask.get());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • FutureTask缺点:
      • get()阻塞,一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞(可设置超时时间,超时会抛出超时异常)
      • isDone()轮询
        • 轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果
        • 如果想要异步获得结果,通常都会以轮询的方式去获取结果,尽量不要阻塞
      • 结论:Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果

    2. CompletableFuture的出现

    • get()方法在Future 计算完成之前会一直处在阻寒状态下

    • isDone()方法容易耗费CPU资源

    • 对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果

    • 阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源

      因此,JDK8设计出CompletableFuture

    • CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方

    3. CompletableFuture四大静态方法

    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    
    • 1
    • 2
    • 3
    • 4

    参数说明:

    • 没有指定executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码
    • 指定线程池则使用指定的线程池执行代码

    4.使用演示

    它是Future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

        public static void main(String[] args) {
            CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
                System.out.println(1/0);
                return "卧槽,外挂";
            }).whenComplete((res, e) -> {
                if (e != null) {
                    System.out.println("有异常");
                    e.printStackTrace();
                } else {
                    System.out.println("计算结果:" + res);
                }
            }).exceptionally(e -> {
                System.out.println("出异常了");
                e.printStackTrace();
                return null;
            });
    
            System.out.println("主线程:"+ Thread.currentThread().getName());
            try {
                // ForkJoinPool线程池是守护线程,必须要保持存在用户线程
                TimeUnit.SECONDS.sleep(2);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    运行结果

    ForkJoinPool.commonPool-worker-9
    主线程:main
    有异常
    出异常了
    java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
    Caused by: java.lang.ArithmeticException: / by zero
    	at Main.lambda$main$0(Main.java:20)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    	... 5 more
    java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
    Caused by: java.lang.ArithmeticException: / by zero
    	at Main.lambda$main$0(Main.java:20)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    	... 5 more
    
    Process finished with exit code 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    5. 常用方法

    以下方法带Async的对应方法会使用默认线程池ForkJoinPool进行线程管理

    5.1 获得结果和触发计算

    public T get() // 阻塞
    public T get(long timeout, TimeUnit unit) // 阻塞,超时则抛出异常
    public T join() // 阻塞,代码中无需抛出编译异常
    public T getNow(T valueIfAbsent) // 获得当前计算结果,如线程已经计算完成则返回计算结果;如当前未计算出结果则返回valueIfAbsent;不会打断线程的执行
    public boolean complete(T value) // 返回值表示是否有打断get方法,有打断则后续join获取到的为括号内的值,未打断则获取到的是线程执行完的结果值
    
    • 1
    • 2
    • 3
    • 4
    • 5

    5.2 对计算结果进行处理

    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) // 计算结果存在依赖关系,线程串行化执行;有异常不执行下一步
    public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) // 同上,但是有异常时会带着异常到下一步
    
    • 1
    • 2

    5.3 对计算结果进行消费

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) // 接收任务的处理结果,并消费处理,无返回结果
    
    • 1

    5.4 对计算速度进行选用

    // 调用该方法的返回值的join会返回速度执行快的线程的执行结果,同时执行慢的线程也会继续执行而不会中断
    public <U> CompletableFuture<U> applyToEither(
            CompletionStage<? extends T> other, Function<? super T, U> fn) 
    
    • 1
    • 2
    • 3

    5.5 对计算结果进行合并

    // 两个CompletionStage任务都完成后,最终把两个任务的结果一起交给thenCombine来处理,先完成的先等着,等待其它分支任务
    public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
    
    • 1
    • 2
    • 3
  • 相关阅读:
    【付费推广】常见问题合集,焦点展台与任务管理
    YoloV7改进策略:SwiftFormer,全网首发,独家改进的高效加性注意力用于实时移动视觉应用的模型,重构YoloV7
    美客多选品趋势分析,美客多选品时的注意事项
    RPA应用于电力行业的优势:来自3大应用场景的解读
    mybatis学习(16):不使用接口的方式
    go中网络流量分析gopacket库的使用
    【泛函分析】距离空间的完备性
    Android 10+ wifi使用相关权限问题
    周总结【java项目】
    家庭记账的最简单方法
  • 原文地址:https://blog.csdn.net/xxx1276063856/article/details/133825000