• CompletableFuture用法详解


    Future接口的局限性

    当我们得到包含结果的Future时,我们可以使用get方法等待线程完成并获取返回值,Future的get() 方法会阻塞主线程。Future获取得线程执行结果前,我们的主线程get()得到结果需要一直阻塞等待

     Future无法实现下面的场景

    • 将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
    • 等待Future集合种的所有任务都完成。
    • 仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。
    • 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
    • 应对Future的完成时间(即当Future的完成时间完成时会收到通知,并能使用Future的计算结果进行下一步的的操作,不只是简单地阻塞等待操作的结果)

    下面介绍 CompletableFuture API中常用方法

    实例化

    一种是supply开头的方法,一种是run开头的方法

    • supply开头:这种方法,可以返回异步线程执行之后的结果
    • run开头:这种不会返回结果,就只是执行线程任务
    • 无参构造器(一般不用)
    1. public static CompletableFuture supplyAsync(Supplier supplier);
    2. public static CompletableFuture supplyAsync(Supplier supplier, Executor executor);
    3. public static CompletableFuture runAsync(Runnable runnable);
    4. public static CompletableFuture runAsync(Runnable runnable, Executor executor);
    5. CompletableFuture completableFuture = new CompletableFuture();

    注意:

    异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离

    当不传递线程池时,会使用ForkJoinPool.commonPool()系统级公共线程池,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰

    获取结果

    1. /**
    2. * Waits if necessary for this future to complete,
    3. * and then returns its result.
    4. **/
    5. public T get()
    6. /**
    7. * Waits if necessary for at most the given time for this future
    8. * to complete, and then returns its result, if available.
    9. **/
    10. public T get(long timeout, TimeUnit unit)
    11. /**
    12. * Returns the result value (or throws any encountered exception)
    13. * if completed, else returns the given valueIfAbsent.
    14. **/
    15. public T getNow(T valueIfAbsent)
    16. /**
    17. *Returns the result value when complete, or throws an
    18. * (unchecked) exception if completed exceptionally. To better
    19. * conform with the use of common functional forms, if a
    20. * computation involved in the completion of this
    21. * CompletableFuture threw an exception, this method throws an
    22. * (unchecked) {@link CompletionException} with the underlying
    23. * exception as its cause.
    24. **/
    25. public T join()

    get() 方法同样会阻塞直到任务完成,主线程会一直阻塞或等待参数指定的时间,主线程为完成时状态会一直是not completed 

    getNow() 直接获取结果不等待,参数valueIfAbsent的意思是当计算结果不存在或者Now时刻没有完成任务,给定一个确定的值

     join()get() 区别在于join() 返回计算的结果或抛出一个unchecked异常(CompletionException),而get() 返回一个具体的异常

    计算完成后续操作1——complete

    1. public CompletableFuture whenComplete(BiConsumersuper T,? super Throwable> action)
    2. public CompletableFuture whenCompleteAsync(BiConsumersuper T,? super Throwable> action)
    3. public CompletableFuture whenCompleteAsync(BiConsumersuper T,? super Throwable> action, Executor executor)

    方法1和2的区别在于是否使用异步处理,2和3的区别在于是否使用自定义的线程池,前三个方法都会提供一个返回结果和可抛出异常,我们可以使用lambda表达式的来接收这两个参数,然后自己处理。

    1. CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    2. return = "result";
    3. });
    4. future.whenComplete((result, error) -> {
    5. log.info("result:{}",result);
    6. error.printStackTrace();
    7. });

    计算完成后续操作2——handle

    1. public CompletableFuture handle(BiFunctionsuper T,Throwable,? extends U> fn)
    2. public CompletableFuture handleAsync(BiFunctionsuper T,Throwable,? extends U> fn)
    3. public CompletableFuture handleAsync(BiFunctionsuper T,Throwable,? extends U> fn, Executor executor)

    handle方法集和上面的complete方法集没有区别,同样有两个参数一个返回结果和可抛出异常,区别就在于返回值,没错,虽然同样返回CompletableFuture类型,但是里面的参数类型,handle方法是可以自定义的

    1. // 注意这里的两个CompletableFuture包含的返回类型不同
    2. CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    3. List list = new ArrayList<>();
    4. list.add("语文");
    5. list.add("数学");
    6. // 获取得到今天的所有课程
    7. return list;
    8. });
    9. // 使用handle()方法接收list数据和error异常
    10. CompletableFuture future2 = future.handle((list,error)-> {
    11. // 如果报错,就打印出异常
    12. error.printStackTrace();
    13. // 如果不报错,返回一个包含Integer的全新的CompletableFuture
    14. return list.size();
    15. });

    计算完成的后续操作3——apply

    1. public CompletableFuture thenApply(Functionsuper T,? extends U> fn)
    2. public CompletableFuture thenApplyAsync(Functionsuper T,? extends U> fn)
    3. public CompletableFuture thenApplyAsync(Functionsuper T,? extends U> fn, Executor executor)

    apply方法和handle方法一样,都是结束计算之后的后续操作,唯一的不同是,handle方法会给出异常,可以让用户自己在内部处理,而apply方法只有一个返回结果,如果异常了,会被直接抛出,交给上一层处理。 如果不想每个链式调用都处理异常,那么就使用apply吧

    计算完成的后续操作4——accept

    1. public CompletableFuture thenAccept(Consumersuper T> action)
    2. public CompletableFuture thenAcceptAsync(Consumersuper T> action)
    3. public CompletableFuture thenAcceptAsync(Consumersuper T> action, Executor executor)

    accept()三个方法只做最终结果的消费,注意此时返回的CompletableFuture是空返回。只消费,无返回,有点像流式编程的终端操作

    捕获中间产生的异常——exceptionally

    public CompletableFuture exceptionally(Function fn)

    exceptionally() 可以帮我们捕捉到所有中间过程的异常,方法会给我们一个异常作为参数,我们可以处理这个异常,同时返回一个默认值,跟服务降级 有点像,默认值的类型和上一个操作的返回值相同

    1. CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    2. // 返回null
    3. return null;
    4. });
    5. CompletableFuture exceptionally = future.thenApply(result -> {
    6. // 制造一个空指针异常NPE
    7. int i = result;
    8. return i;
    9. }).exceptionally(error -> {
    10. // 我们选择在这里打印出异常
    11. error.printStackTrace();
    12. // 并且当异常发生的时候,我们返回一个默认的值
    13. return "发生了异常";
    14. });

    组合两个completableFuture

    thenApply()

    将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果

    1. CompletableFuture> future = CompletableFuture.supplyAsync(() -> {
    2. // 根据学生姓名获取学生信息
    3. return StudentService.getStudent(name);
    4. }).thenApply(student -> {
    5. // 再根据学生信息获取今天的课程
    6. return LessonsService.getLessons(student);
    7. });

    thenCombine()

    将两个异步计算合并为一个,这两个异步计算之间相互独立,互不依赖

    1. CompletableFuture> painting = CompletableFuture.supplyAsync(() -> {
    2. // 第一个任务获取美术课需要带的东西,返回一个list
    3. List stuff = new ArrayList<>();
    4. stuff.add("画笔");
    5. stuff.add("颜料");
    6. return stuff;
    7. });
    8. CompletableFuture> handWork = CompletableFuture.supplyAsync(() -> {
    9. // 第二个任务获取劳技课需要带的东西,返回一个list
    10. List stuff = new ArrayList<>();
    11. stuff.add("剪刀");
    12. stuff.add("折纸");
    13. return stuff;
    14. });
    15. CompletableFuture> total = painting
    16. // 传入handWork列表,然后得到两个CompletableFuture的参数Stuff1和2
    17. .thenCombine(handWork, (stuff1, stuff2) -> {
    18. // 合并成新的list
    19. List totalStuff = Stream.of(stuff1, stuff1)
    20. .flatMap(Collection::stream)
    21. .collect(Collectors.toList());
    22. return totalStuff;
    23. });

    获取所有完成结果——allOf

    当所有给定的任务完成后,返回一个全新的已完成CompletableFuture

    1. final ExecutorService executorService = Executors.newFixedThreadPool(100);
    2. List futures = Lists.newArrayList();
    3. futures.add(CompletableFuture.runAsync(() -> {
    4. // do something
    5. },executorService).thenRunAsync(() -> {
    6. long end = System.currentTimeMillis();
    7. log.info("combine.combine.time2:{}",(end - start));
    8. },executorService));
    9. futures.add(CompletableFuture.runAsync(() -> {
    10. // do something
    11. },executorService).thenRunAsync(() -> {
    12. long end = System.currentTimeMillis();
    13. log.info("combine.combine.time2:{}",(end - start));
    14. },executorService));
    15. CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenRun(() -> {
    16. long end = System.currentTimeMillis();
    17. log.info("combine.combine.time3:{}",(end - start));
    18. }).join();

    获取率先完成的任务结果——anyOf

    仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。 小贴士 :如果最快完成的任务出现了异常,也会先返回异常,如果害怕出错可以加个exceptionally() 去处理一下可能发生的异常并设定默认返回值

    1. CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    2. return 1;
    3. });
    4. CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    5. return 2;
    6. });
    7. CompletableFuture anyOf = CompletableFuture
    8. .anyOf(future1, future2)
    9. .exceptionally(error -> {
    10. error.printStackTrace();
    11. return 2;
    12. });
    13. 相关阅读:
      Linux-本地日志服务管理(rsyslog基础)
      虚拟机磁盘扩容(纯命令行)
      Spring Boot 资源索引
      GGTalk 开源即时通讯系统源码剖析之:数据库设计
      ansible配置文件介绍
      智能自动化输送分拣集成设备|快递自动分拣设备厂家告诉您如何提高10倍分拣效率
      整理的最新版的K8S安装教程,看完还不会,请你吃瓜
      数据库的增删改(DML)
      1045 Favorite Color Stripe
      Docker安装MySQL
    14. 原文地址:https://blog.csdn.net/qq_36042938/article/details/126040736