• CompletableFuture和ListenableFuture


    前置知识:Future及其唯一实现类FutureTask的作用: 对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

    详情可查看之前我的博文->链接

    但是有一些弊端

    它有可能已经完成了计算并返回结果,也有可能至今还没完成。 我们只能手动的判断时候处理完成,以及处理完成后,怎么做。比如:有结果对结果怎么处理?如果出现异常怎么处理?等等。

    由此就引入了CompletableFuture和ListenableFuture。

    CompletableFuture

    主动查询

    • public T get()

    该方法为阻塞方法,会等待计算结果完成,会抛出三种异常

    ​ CancellationException -如果这个future被取消
    ​ ExecutionException -如果这个future异常完成

    ​ InterruptedException -如果当前线程在等待时被中断

    • public T get(long timeout,TimeUnit unit)

    有时间限制的阻塞方法,会抛出四种异常:

    ​ CancellationException -如果这个future被取消
    ​ ExecutionException -如果这个future异常完成

    ​ InterruptedException -如果当前线程在等待时被中断
    ​ TimeoutException -如果等待超时

    • public T getNow(T valueIfAbsent)

    立即获取方法结果,如果没有计算结束则返回传的值

    • public T join()

    完成时返回结果值,或在异常完成时抛出(未检查的)异常。为了更好地使用常见的函数形式,如果在CompletableFuture的完成过程中涉及的计算抛出了一个异常,此方法将抛出一个(未检查的) CompletionException,其原因是底层异常。

    • public boolean complete(T value)

      立即完成计算,并把结果设置为传的值,返回是否设置成功

      如果 CompletableFuture 没有关联任何的Callback、异步任务等,如果调用get方法,那会一直阻塞下去,可以使用complete方法主动完成计算

    • public boolean completeExceptionally(Throwable ex)
      如果尚未完成,则会导致调用get()和相关方法抛出给定的异常。

    demo

    public class Test {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //demo1
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 10 / 1;
            });
           System.out.println(future.join());//10
           System.out.println(future.get());//10
            System.out.println(future.getNow(10));//10
    
            future.complete(10);
            System.out.println(future.get());//10
            try {
                future.completeExceptionally(new TimeoutException("超时了!"));
                future.get();//引起超时异常
    
            }catch (Exception e){
                System.out.println(e.getMessage());
            }
        }
    }
    
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    异步执行任务

    public static CompletableFuture runAsync(Runnable runnable)

    public static CompletableFuture runAsync(Runnable runnable, Executor executor)

    public static CompletableFuture supplyAsync(Supplier supplier)

    public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

    补:1.run表示执行没有返回值的线程,以 Runable 类型为参数 。

    ​ 2.有Executor 参数的表示可以传入自己线程池,否则默认使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码 。 ForkJoinPool始自JDK7,叫做分支/合并框架。可以通过将一个任务递归分成很多分子任务,形成不同的流,进行并行执行,同时还伴随着强大的工作窃取算法。极大的提高效率。

    demo:

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("runAsync"));
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "supplyAsync");
            
            System.out.println(future1.get());
            System.out.println(future2.get());
        }
    
    //输出:
    runAsync
    null
    supplyAsync
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    结果处理

    • public CompletableFuture whenComplete(BiConsumer action)

    • public CompletableFuture whenCompleteAsync(BiConsumer action)

    • public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor)

    • public CompletableFuture exceptionally(Function fn)

    • public CompletableFuture handle(BiFunction fn)

    • public CompletableFuture handleAsync(BiFunction fn)

    • public CompletableFuture handleAsync(BiFunction fn, Executor executor)

      补:1.whenCompleteAsync:可以获取异步任务的返回值和抛出的异常信息,但是不能修改返回结果

      2.但是handleAsync可以获取异步任务的返回值和抛出的异常信息,而且可以显示的修改返回的结果

      3.exceptionally当异步任务跑出了异常后会触发的方法,如果没有抛出异常该方法不会执行,可以执行返回值

    demo

       public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            ThreadUtil.sleep(100);
            return 20;
        }).whenCompleteAsync((v, e) -> {
    		//v是上面返回的结果,无返回值v为null
            //e是上面抛出的异常
            System.out.println(v);
            System.out.println(e);
        });
        System.out.println(future.get());
    }
    
    
    //也可以写
        public static void main(String[] args) throws Exception {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                ThreadUtil.sleep(100);
                return 10 / 0;
            }).whenCompleteAsync((v, e) -> {
                System.out.println(v);
                System.out.println(e);
            }).exceptionally((e) -> {
                System.out.println(e.getMessage());
                return 30;
            });
            System.out.println(future.get());
        }
    
    //run
    public static void main(String[] args) throws ExecutionException, InterruptedException {
            
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程开始了");
                int i = 100 / 10;
                System.out.println("线程结束了");
                return i;
            }, executor).handleAsync((v, e) -> {
                System.out.println("res = " + v+"  throwable="+e);
                return res*10;//这个既可以获得上面执行的结果 也可以修改 返回值
                
            });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    链式操作

    thenApply

    • public CompletableFuture thenApply(Function fn)

    • public CompletableFuture thenApplyAsync(Function fn)

    • public CompletableFuture thenApplyAsync(Function fn, Executor executor)

      补充: 这些方法不是马上执行的,也不会阻塞,而是前一个执行完成后继续执行下一个。 和 handle 方法的区别是,handle 会处理正常计算值和异常,不会抛出异常。而 thenApply 只会处理正常计算值,有异常则抛出。

    demo:

     public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Integer> future = CompletableFuture
                    .supplyAsync(() -> 1)
                    .thenApply((a) -> {
                        System.out.println(a);//1
                        return a * 10;
                    }).thenApply((a) -> {
                        System.out.println(a);//10
                        return a + 10;
                    }).thenApply((a) -> {
                        System.out.println(a);//20
                        return a - 5;
                    });
            System.out.println(future.get());//15
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    thenAccept

    • public CompletableFuture thenAccept(Consumer action)

    • public CompletableFuture thenAcceptAsync(Consumer action)

    • public CompletableFuture thenAcceptAsync(Consumer action, Executor executor)

    **补:**和thenApply相比, 其单纯的去消费结果而不会返回新的值,因些计算结果为 Void;。

    • public CompletableFuture thenAcceptBoth(CompletionStage other, BiConsumer action)

    • public CompletableFuture thenAcceptBothAsync(CompletionStage other, BiConsumer action)

    • public CompletableFuture thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor)

    • public CompletableFuture runAfterBoth(CompletionStage other, Runnable action)

    **补:**runAfterBoth和thenAcceptBoth不同的是, 传一个 Runnable 类型的参数,不接收上一级的返回值

    • public CompletableFuture thenRun(Runnable action)

    • public CompletableFuture thenRunAsync(Runnable action)

    • public CompletableFuture thenRunAsync(Runnable action, Executor executor)

      **补:**run的方法都不接收参数,且是void类型

    demo:

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            
            CompletableFuture<Void> future = CompletableFuture
                    .supplyAsync(() -> 1)
                    .thenAccept(System.out::println) //消费 上一级返回值 1
                    .thenAcceptAsync(System.out::println); //上一级没有返回值 输出null
                    
            System.out.println(future.get()); //消费函数没有返回值 输出null
        }
    
    
       public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture
                    .supplyAsync(() -> 1)
                //第一个参数是当前CompletableFuture,后边的函数式上面接收的CompletableFuture和当前CompletableFuture的返回结果
                    .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2), (a, b) -> {
                        System.out.println(a);
                        System.out.println(b);
                    }).get();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    组合操作

    compose

    • public CompletableFuture thenCompose(Function> fn)

    • public CompletableFuture thenComposeAsync(Function> fn)

    • public CompletableFuture thenComposeAsync(Function> fn, Executor executor)

    demo:

       public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture<Integer> future = CompletableFuture
                    .supplyAsync(() -> 1)
                    .thenApply((a) -> {
                        ThreadUtil.sleep(1000);
                        return a + 10;
                    })
                    .thenCompose((s) -> {
                        System.out.println(s); //11
                        return CompletableFuture.supplyAsync(() -> s * 5);
                    });
    
            System.out.println(future.get());//55
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    combine

    • public CompletableFuture thenCombine(CompletionStage other, BiFunction fn)

    • public CompletableFuture thenCombineAsync(CompletionStage other, BiFunction fn)

    • public CompletableFuture thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor)

    demo:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            Random random = new Random();
        //注意:supplyAsync和thenCombine 不一定哪个先哪个后,执行   
            CompletableFuture<Integer> future = CompletableFuture
                    .supplyAsync(() -> {
                        ThreadUtil.sleep(random.nextInt(1000));
                        System.out.println("supplyAsync");
                        return 2;
                    }).thenApply((a) -> {
                        ThreadUtil.sleep(random.nextInt(1000));
                        System.out.println("thenApply");
                        return a * 3;
                    })
                    .thenCombine(CompletableFuture.supplyAsync(() -> {
                        ThreadUtil.sleep(random.nextInt(1000));
                        System.out.println("thenCombineAsync");
                        return 10;
                    }), (a, b) -> {
                        System.out.println(a);
                        System.out.println(b);
                        return a + b;
                    });
    
            System.out.println(future.get());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    Either

    • public CompletableFuture acceptEither(CompletionStage other, Consumer action)

    • public CompletableFuture acceptEitherAsync(CompletionStage other, Consumer action)

    • public CompletableFuture acceptEitherAsync(CompletionStage other, Consumer action, Executor executor)

      补: acceptEither方法是当任意一个 CompletionStage 完成的时候,action 这个消费者就会被执行。

    demo

    有时输出A,有时输出B,哪个Future先执行完就会根据它的结果计算。

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Random random = new Random();
            CompletableFuture
                    .supplyAsync(() -> {
                        ThreadUtil.sleep(random.nextInt(1000));
                        return "A";
                    })
                    .acceptEither(CompletableFuture.supplyAsync(() -> {
                        ThreadUtil.sleep(random.nextInt(1000));
                        return "B";
                    }), System.out::println)
                    .get();
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    any/all

    • public static CompletableFuture allOf(CompletableFuture… cfs)
    • public static CompletableFuture anyOf(CompletableFuture… cfs)

    demo:

    //这个方法的意思是把有方法都执行完才往下执行,没有返回值   
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            Random random = new Random();
            CompletableFuture.allOf(
                    CompletableFuture.runAsync(() -> {
                        ThreadUtil.sleep(random.nextInt(1000));
                        System.out.println(1);
                    }),
                    CompletableFuture.runAsync(() -> {
                        ThreadUtil.sleep(random.nextInt(1000));
                        System.out.println(2);
                    }))
                    .get();
    
        }
    //有返回的例子
    public static void main(String[] args) throws ExecutionException, InterruptedException {
       CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                return "f1";
            });
    
            f1.whenCompleteAsync((s, throwable) -> System.out.println(System.currentTimeMillis() + ":" + s));
    
            CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                return "f2";
            });
    
            f2.whenCompleteAsync((s, throwable) -> System.out.println(System.currentTimeMillis() + ":" + s));
    
            CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
    
            //阻塞,直到所有任务结束。
            System.out.println(System.currentTimeMillis() + ":阻塞");
            all.join();
            System.out.println(System.currentTimeMillis() + ":阻塞结束");
    
            //一个需要耗时2秒,一个需要耗时3秒,只有当最长的耗时3秒的完成后,才会结束。
            System.out.println("任务均已完成。");
    
    }
    //输出结果有时为1 有时间为 2
      public static void main(String[] args) throws ExecutionException, InterruptedException {
            Random random = new Random();
    
            Object obj = CompletableFuture.anyOf(
                    CompletableFuture.supplyAsync(() -> {
                        ThreadUtil.sleep(random.nextInt(1000));
                        return 1;
                    }),
                    CompletableFuture.supplyAsync(() -> {
                        ThreadUtil.sleep(random.nextInt(1000));
                        return 2;
                    })).get();
    
            System.out.println(obj);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    ListenableFuture

    ListenableFuture可以让你注册一个回调函数,一旦计算完毕,就会执行它。或者,这个任务早已经执行完毕,那就立刻执行这个回调函数。ListenableFuture增加了这一项简单的功能,就可以高效的支持到许多基础的Future无法支持的操作。

    ListenableFuture的基本操作就是addListener(Runnable, Executor)方法,它指定了当这个Future代表的计算执行完成,指定的Runnable将会被指定的Executor运行。

    更加常用的是:

     Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor) 
    
    • 1

    创建:

    • 基于Future

    根据JDK的 ExecutorService.submit(Callable)这个方法的返回,可以初始化一个异步的计算future,Guava 提供了ListeningExecutorService接口,这个接口无论在ExecutorService的哪里返回一个正常的Future,都会返回一个ListenableFuture,将ExecutorService 转化为 ListeningExecutorService,很简单:MoreExecutors.listeningDecorator(ExecutorService).

    • 基于FutureTask

      如果你是基于FutureTask来转化的的,你可以选择Guava提供的ListenableFutureTask.create(Callable)ListenableFutureTask.create(Runnable, V)

    demo

    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
    ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
      public Explosion call() {
        return pushBigRedButton();
      }
    });
    Futures.addCallback(explosion, new FutureCallback<Explosion>() {
      // we want this handler to run immediately after we push the big red button!
      public void onSuccess(Explosion explosion) {
        walkAwayFrom(explosion);
      }
      public void onFailure(Throwable thrown) {
        battleArchNemesis(); // escaped the explosion!
      }
    });
    //函数表达式写法
    Futures.addCallback(result->{},ex->{})
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    参考博文:https://www.jianshu.com/p/220d05525f27

    https://juejin.cn/post/6844903892728168461

    https://cloud.tencent.com/developer/article/1706034

  • 相关阅读:
    语雀停服8小时,P0级事故,故障原因和补偿来了。
    Java多线程 信号量和屏障实现控制并发线程数量,主线程等待所有线程执行完毕2
    通过深度可分离卷积神经网络对七种表情进行区分
    代码随想录算法训练营第四十四天 | 416. 分割等和子集
    手撸promise【二、Promise源码】【代码详细注释/测试案例完整】
    【vim 学习系列文章 8 -- vim中 has 函数和 let g:介绍】
    获取Class类的实例的几种方式
    『亚马逊云科技产品测评』活动征文|AWS 数据库产品类别及其适用场景详细说明
    java 实现建造者模式
    机器学习之KNN —— K最近邻分类算法
  • 原文地址:https://blog.csdn.net/weixin_43604021/article/details/126405858