• CompletableFuture异步编程详解


    Future介绍

    先来回顾下Future,Future是JDK1.5中添加的接口,主要功能为:

    • 获取并发的任务完成后的执行结果;
    • 能够取消并发执行中的任务;
    • 判断并发任务是否执行完成;

    但Future也有着非常明显的缺点:

    1. 阻塞:调用 get() 方法会一直阻塞,直到等待直到计算完成;
    2. 异常处理:Future 没有提供任何异常处理的方式;
    3. 链式调用和结果聚合处理:在很多时候我们想链接多个 Future 来完成耗时较长的计算,此时需要合并结果并将结果发送到另一个任务中,该接口很难完成这种处理;

    CompletableFuture类

    上面介绍了Future的缺点,这些问题都可以通过CompletableFuture类解决,主要方法有:

    • thenApply():当执行完第一个异步程序,接着执行下一个;
    • thenAccept():当任务正常完成后,回调此方法;
    • exceptionally():当任务出现异常是,回调此方法;
    • anyOf():当所有的任务中,只要有一个任务完成,则主线程继续往下走;
    • allOf():所有的任务均完成后,则主线程继续往下走;
    • join():既线程合并(或者抛出一个 CompletionException 异常),阻塞当前线程,直到异步线程并发执行完成,也就是是join()方法还是异步阻塞;
    • supplyAsync():异步执行,有返回值;
    • runAsync():异步执行,无返回值;
    • thenCombine():两个线程完成之后进行合并返回;

    创建线程

    	// 有返回值
        public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
            return asyncSupplyStage(asyncPool, supplier);
        }
        public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
            return asyncSupplyStage(screenExecutor(executor), supplier);
        }
        // 无返回值
        public static CompletableFuture<Void> runAsync(Runnable runnable) {
            return asyncRunStage(asyncPool, runnable);
        }
        public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
            return asyncRunStage(screenExecutor(executor), runnable);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    可以看出,supply开头的两个方法是有返回值的,而run开头的两个方法是没有返回值的,至于第二个方法传入的Executor,这个在编码中可以自定义;

    thenAccept()方法

        public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
            return uniApplyStage(null, fn);
        }
        public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
            return uniApplyStage(asyncPool, fn);
        }
        public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {
            return uniApplyStage(screenExecutor(executor), fn);
        }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    功能:当前任务正常完成以后,可将当前任务的返回值作为参数传给下一个任务;
    执行任务A,任务A执行完成之后,将任务A返回值作为参数传入给任务B,打印结果为3:

      CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> 1);
      CompletableFuture<Integer> futureB = futureA.thenApply((a) -> a + 2);
      log.info("result:{}",futureB.get());
    
    • 1
    • 2
    • 3

    实际场景

    以工作中常见的场景举个例子,例如在A服务中,调用B、C服务的结果:

    1. 公共实体:
    /**
     * @author 岳晓鵬
     * @version 1.0
     * @date 2022-06-10 00:11
     */
    @Data
    @Builder
    public class User {
    
        private String userName;
    
        private Integer age;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    1. B、C服务代码:
        /**
         * B服务伪接口 获取用户姓名
         * @param userId
         * @return
         */
        public static String getUserName(Integer userId){
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "张三";
        }
    
        /**
         * C服务伪接口 获取用户年龄
         *
         * @param userId
         * @return
         */
        private static Integer getAge(Integer userId){
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 20;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    1. A服务获取数据:
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Long start = System.currentTimeMillis();
    
            // 异步获取 B C服务的数据
            CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> getAge(10));
            CompletableFuture<String> userNameFuture = CompletableFuture.supplyAsync(() -> getUserName(10));
    
            // 获取用户数据
            User user = User.builder()
                    .age(ageFuture.get())
                    .userName(userNameFuture.get())
                    .build();
    
            Long end = System.currentTimeMillis();
            log.info("执行时间:{}ms",end - start);
            log.info("用户:{}",user);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    1. 打印结果:
     - 执行时间:5075ms
     - 用户:User(userName=张三, age=20)
    
    • 1
    • 2

    打印结果显示执行时间为 5s+,而不是 7s+,说明异步已经生效;
    但其中get()方法还是阻塞的,如果线程执行时间较长,主线程将一直阻塞下去,另外还有一个get()方法可以添加超时时间:

    • get():阻塞获取线程执行结果;
    • get(long time):阻塞获取线程执行结果,如超过超时时间则继续向下执行;
    • getNow():阻塞获取线程执行结果,如果线程抛出异常,则返回默认值;

    上面的例子也可以使用all() + join()方式获取数据:

    CompletableFuture.allOf(ageFuture, userNameFuture).join();
    
    • 1
  • 相关阅读:
    集合导题、刷题、考试全套完整流程,专业强大的功能,提高刷题学习效率和企业的培训效率
    【Go之道】探索Go语言之旅:基础与进阶指南
    7.动态SQL
    纯钧(ChunJun,原名FlinkX)框架学习
    【Leetcode】2427. Number of Common Factors
    ENVI_IDL: 批量制作专题地图
    坚守这50个规则来实现你的目标
    第三章:人工智能深度学习教程-基础神经网络(第三节-Tensorflow 中的多层感知器学习)
    arthas查看spring bean及调用其方法
    JVM启动参数详解(含调优)
  • 原文地址:https://blog.csdn.net/xianyun1992/article/details/125245327