• JUC并发编程与源码分析笔记03-CompletableFuture


    Future接口理论知识复习

    Future接口(FutureTask实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
    找到java.util.concurrent.Future,看到里面定义的方法,这些方法就是我们需要关注的方法。
    Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

    Future接口常用实现类FutureTask异步任务

    Future接口能干什么

    Future是Java5新加的一个接口,它提供了一个异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,我们就可以通过Future把这个任务放到异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
    目的:异步多线程任务执行且返回有结果,三个特点:多线程、有返回、异步任务。

    本源的Future接口相关架构

    在这里插入图片描述
    可以看到FutureTask实现了Runnable、Future接口,而且它的构造参数还支持传入Callable,所以FutureTask现在就具有多线程(Runnable)、有返回(Callable)、异步任务(Future)这3个特点了。

    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    
    public class CompletableFutureDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            FutureTask<String> futureTask = new FutureTask<>(new MyThread());// 将一个耗时的操作封装到FutureTask里
            new Thread(futureTask, "threadName").start();// 启动一个子线程执行FutureTask
            System.out.println(futureTask.get());// 获取FutureTask的返回值
        }
    }
    
    class MyThread implements Callable<String> {
        @Override
        public String call() {
            System.out.println("MyThread.call");
            return "Hello Callable";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    Future编码实战和优缺点分析

    优点

    Future结合线程池,可以显著提高程序的执行效率。

    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.FutureTask;
    
    public class FutureThreadPoolDemo {
        /**
         * 有3个任务,分别耗时500ms,300ms,300ms
         * fun1():3个任务由主线程依次执行
         * fun2():将3个任务放到线程池执行
         */
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            fun1();
            fun2();
        }
    
        private static void fun2() throws InterruptedException, ExecutionException {
            long startTime = System.currentTimeMillis();
            ExecutorService executorService = Executors.newFixedThreadPool(3);// 创建一个线程池
            FutureTask<String> futureTask1 = new FutureTask<>(() -> {// 任务1
                Thread.sleep(500);
                return "task1 over";
            });
            executorService.submit(futureTask1);
            FutureTask<String> futureTask2 = new FutureTask<>(() -> {// 任务2
                Thread.sleep(300);
                return "task2 over";
            });
            executorService.submit(futureTask2);
            FutureTask<String> futureTask3 = new FutureTask<>(() -> {// 任务3
                Thread.sleep(300);
                return "task3 over";
            });
            executorService.submit(futureTask3);
            // 这里获取返回值,主线程会阻塞等待直到拿到返回值,如果把获取结果注释掉,fun2()的执行时间会很短,因为主线程执行完毕,但是子线程依旧在跑
            System.out.println(futureTask1.get());
            System.out.println(futureTask2.get());
            System.out.println(futureTask3.get());
            long endTime = System.currentTimeMillis();
            System.out.println("fun()2总耗时:" + (endTime - startTime));
            executorService.shutdown();// 关闭线程池
        }
    
        private static void fun1() throws InterruptedException {
            long startTime = System.currentTimeMillis();
            Thread.sleep(500);// 任务1
            Thread.sleep(300);// 任务2
            Thread.sleep(300);// 任务3
            long endTime = System.currentTimeMillis();
            System.out.println("fun1()总耗时:" + (endTime - startTime));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    缺点

    get()阻塞
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class FutureAPIDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
            FutureTask<String> futureTask = new FutureTask<>(()->{
                Thread.sleep(5000);
               return "task over";
            });
            new Thread(futureTask).start();
            System.out.println("主线程正在运行");
    //        System.out.println(futureTask.get());// 阻塞,直到获取到返回结果
            System.out.println(futureTask.get(2000, TimeUnit.MILLISECONDS));// 只等待2000ms,超时未返回直接抛异常
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    get()方法会有阻塞问题,导致主线程无法继续执行,另外提供了一个带参的get()方法,超时自动放弃等待。

    isDone()轮询

    有时候,在get()阻塞期间,我们希望看到进度或者提示信息,而不是一味地等待,可以将get()改成轮询,在轮询方法里通过isDone()判断任务是否执行完毕,只是多了一些提示信息。

    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.TimeoutException;
    
    public class FutureAPIDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
            FutureTask<String> futureTask = new FutureTask<>(() -> {
                Thread.sleep(5000);
                return "task over";
            });
            new Thread(futureTask).start();
            System.out.println("主线程正在运行");
    //        System.out.println(futureTask.get());// 阻塞,直到获取到返回结果
    //        System.out.println(futureTask.get(2000, TimeUnit.MILLISECONDS));// 只等待2000ms,超时未返回直接抛异常
            while (true) {
                if (futureTask.isDone()) {
                    System.out.println(futureTask.get());
                    break;
                } else {
                    Thread.sleep(1000);
                    System.out.println("正在处理中,请稍等……");
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    这样的缺点就是:频繁的调用isDone()方法,对CPU来说是浪费资源。

    结论

    Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式获取任务结果。

    想完成一些复杂的任务

    对于简单的业务场景,Future是完全可以胜任的。

    回调通知

    对于Future的完成时间,我们希望,完成之后可以通知主线程。

    创建异步任务

    Future结合线程池。

    多个任务前后依赖可以组合处理(水煮鱼)

    希望将多个异步任务的结果组合起来,后一个异步任务的计算需要前一个任务的值。
    希望将多个异步计算合成成一个异步计算,这几个异步计算相互独立,同时,后一个的计算依赖前一个的计算结果。

    对计算速度选最快

    当Future集合中有多个任务的时候,处理最快的一个完成,返回第一个处理的结果。

    CompletableFuture对Future的改进

    CompletableFuture为什么会出现

    Future中get()方法和isDone()方法都存在问题,对于真正的异步处理,我们希望可以通过回调函数,在Future结束之后,自动调用回调函数,这样就不用等待返回结果了。
    阻塞的方式和异步编程的设计理念相违背,轮询会额外耗费CPU资源,因此JDK8设计出了CompletableFuture,它提供了一个类似观察者模式的机制,任务完成之后,通知监听的一方。

    CompletableFuture和CompletionStage源码分别介绍

    类架构说明

    在这里插入图片描述

    接口CompletionStage

    • CompletionStage代表异步计算过程中的某个阶段,一个阶段完成后可能会触发另一个阶段,有些类似Linux系统管道分隔传参数
    • 一个阶段的执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.println(x)).thenRun(() -> System.out.println())
    • 一个阶段的执行可能被单个阶段的完成触发,有可能是由多个阶段一起触发

    类CompletableFuture

    • 在Java8中,CompletableFuture提供了非常强大的Future扩展功能,可以帮助我们简化异步编程复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,提供了转换和组合CompletableFuture的方法
    • 它可能代表一个明确完成的Future,也可能带一个完成阶段(CompletionStage),它支持在计算完成后触发一些函数或者执行某些动作
    • 它实现了Future和CompletionStage接口

    核心的四个静态方法,创建一个异步任务

    runAsync:无返回值

    public static CompletableFuture runAsync(Runnable runnable)
    public static CompletableFuture runAsync(Runnable runnable, Executor executor)

    supplyAsync:有返回值

    public static CompletableFuture supplyAsync(Supplier supplier)
    public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

    上述Executor参数说明

    没有指定线程池executor的时候,使用的是默认的ForkJoinPool.commonPool(),如果指定了线程池,则使用自定义的或指定的线程池。

    Code

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CompletableFutureBuildDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            fun1();
            System.out.println("----------------------------------------");
            fun2();
            System.out.println("----------------------------------------");
            fun3();
            System.out.println("----------------------------------------");
            fun4();
        }
    
        private static void fun1() throws ExecutionException, InterruptedException {
            CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            System.out.println(completableFuture.get());
        }
    
        private static void fun2() throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, executorService);
            System.out.println(completableFuture.get());
            executorService.shutdown();
        }
    
        private static void fun3() throws ExecutionException, InterruptedException {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "CompletableFutureBuildDemo.fun3";
            });
            System.out.println(completableFuture.get());
        }
    
        private static void fun4() throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "CompletableFutureBuildDemo.fun4";
            }, executorService);
            System.out.println(completableFuture.get());
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    通用演示,减少阻塞和轮询

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadLocalRandom;
    
    public class CompletableFutureUserDemo {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture.supplyAsync(() -> {
                        System.out.println(Thread.currentThread().getName());
                        int result = ThreadLocalRandom.current().nextInt();
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return result;
                    }, executorService)
                    .whenComplete((v, e) -> {
                        if (e == null) {
                            System.out.println("计算完成,计算结果:" + v);
                        }
                    })
                    .exceptionally((e) -> {
                        e.printStackTrace();
                        System.out.println("出现异常:" + e.getCause() + "\t" + e.getMessage());
                        return null;
                    });
            System.out.println(Thread.currentThread().getName() + "正在运行");
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    注意这里的一个坑,如果使用的是默认的线程池,主线程执行完毕后,CompletableFuture使用的默认线程池会立刻关闭,就会导致whenComplete方法不能被执行到,所以这里,还是推荐使用自定义线程池。
    如果在supplyAsync方法中,出现了异常,也会走whenComplte方法,而且也走execptionally方法。

    CompletableFuture的优点

    • 异步任务结束时,自动调用某个对象的方法
    • 主线程设置好回调后,不需要再关心异步任务的执行,异步任务之间可以顺序执行
    • 异步任务出错时,会自动回调某个对象的方法

    案例精讲-从电商网站的比价需求说开去

    函数式编程已经主流

    Lambda表达式、Stream流式调用、Chain链式调用、Java8函数式编程。
    函数式编程:

    @FunctionalInterface
    public interface Runnable {
        public abstract void run();
    }
    
    @FunctionalInterface
    public interface Function<T, R> {
        R apply(T t);
    }
    
    @FunctionalInterface
    public interface Consumer<T> {
        void accept(T t);
    }
    
    @FunctionalInterface
    public interface BiConsumer<T, U> {
        void accept(T t, U u);
    }
    
    @FunctionalInterface
    public interface Supplier<T> {
        T get();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    函数式接口名称方法名称参数个数返回值
    Runnablerun0
    Functionapply1
    Consumeraccept1
    BiConsumeraccept2
    Supplierget0

    还有一个常用的函数式接口,这里视频并没有提及,它是Predicate

    先说说join和get对比

    CompletableFuture的get()方法和join()方法相比,作用是一样的,区别是get()在编译阶段,会抛出checkedException,而join()不会。
    这里还学到一个Lombok的新知识:在类上添加这个注解:@Accessors(chain = true)// 开启链式写法,可以开启对象的链式写法,比如student.setId(1).setName("xxx").setMajor("yyy");,把原来竖着写的set方法,扭转成横着写了,算是个语法糖吧。

    大厂业务需求说明

    需求说明:
    同一款产品,同时搜索出同款产品在各大电商平台的售价
    同一款产品,同时搜索出本产品在同一电商平台下,各个入驻卖家售价
    输出返回:
    希望查询结果是这款产品在不同地方的价格清单列表,返回一个List
    解决方案:
    一步一步的查询,最后汇总,效率上会慢
    多线程异步任务同时查询 ,返回结果汇总,效率上很高

    一波流Java8函数式编程带走-比价案例实战Case

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.experimental.Accessors;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.stream.Collectors;
    
    public class CompletableFutureMallDemo {
        static List<Mall> mallList = new ArrayList() {{
            add(new Mall("jd", "mysql"));
            add(new Mall("tb", "mysql"));
            add(new Mall("dd", "mysql"));
        }};
    
        public static void main(String[] args) {
            long begin1 = System.currentTimeMillis();
            List<String> list1 = fun1(mallList, "mysql");
            list1.forEach(System.out::println);
            long end1 = System.currentTimeMillis();
            System.out.println("fun1用时:" + (end1 - begin1));
            System.out.println("---------------------------------------------");
            long begin2 = System.currentTimeMillis();
            List<String> list2 = fun2(mallList, "mysql");
            list2.forEach(System.out::println);
            long end2 = System.currentTimeMillis();
            System.out.println("fun2用时:" + (end2 - begin2));
        }
    
        private static List<String> fun1(List<Mall> mallList, String productName) {
            return mallList.stream().map(mall -> String.format("%s in %s price is %s", productName, mall.getMallName(), mall.calculatePrice())).collect(Collectors.toList());
        }
    
        private static List<String> fun2(List<Mall> mallList, String productName) {
            return mallList.stream().map(mall -> CompletableFuture.supplyAsync(() -> String.format("%s in %s price is %s", productName, mall.getMallName(), mall.calculatePrice())))
                    .collect(Collectors.toList()).stream()// 这里collect一下,是为了让前一个stream流完成,这样stream流里的线程就可以开始运算,如果没有这一行,就起不到并行作用,因为stream有懒惰的特性,只有执行终端操作时候,才会真正执行运算
                    .map(CompletableFuture::join).collect(Collectors.toList());
        }
    }
    
    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    @Accessors(chain = true)
    class Mall {
        private String mallName;
        private String productName;
    
        public Double calculatePrice() {
            try {
                Thread.sleep(1000);// 模拟查询时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return ThreadLocalRandom.current().nextDouble();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    CompletableFuture常用方法

    获取结果和触发计算

    获取结果

    public T get()
    public T get(long timeout, TimeUnit unit)
    public T join()
    public T getNow(T valueIfAbsent):如果在get的时候,还没有返回结果,就将valueIfAbsent的值作为返回值返回

    主动触发计算

    public boolean complete(T value):方法首先判断进程有没有执行完,如果没有执行完,进行打断,并将value赋值为线程的返回值(通过get()join()获取),如果执行完了,就不需要打断,线程的返回值就是线程里正常的返回值。
    关于complete()方法的返回值,有点不好理解,如果线程是被complete()方法触发结束的,返回true,如果线程在执行complete()方法的时候,已经结束,返回false。

    public static void main(String[] args) throws InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        });
        Thread.sleep(2000);
        System.out.println(completableFuture.complete("default") + "\t" + completableFuture.join());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    修改主线程里的sleep为1000,子线程里的sleep为2000,查看结果对比。

    对计算结果进行处理

    计算结果存在依赖关系,这两个线程串行化。
    public CompletableFuture thenApply(Function fn)
    public CompletableFuture handle(BiFunction fn)

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CompletableFutureAPIDemo {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("第一步");
                return 1;
            }, executorService).thenApply(v -> {
                System.out.println("第二步");
                int a = 1 / 0;
                return v + 1;
            }).thenApply(v -> {
                System.out.println("第三步");
                return v + 1;
            }).whenComplete((v, e) -> {
                if (e == null) {
                    System.out.println("最终结果为:" + v);
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println(e.getMessage());
                return null;
            });
            executorService.shutdown();
            System.out.println(completableFuture.join());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    thenApply换成handle,对比区别。可以看到,使用handle后第二步报错的情况下,第三步依旧执行了,而且会带着异常参数,可以根据异常参数做一些判断处理,使用thenApply的话,如果线程内部报错,后续的thenApply就不会执行了。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CompletableFutureAPIDemo {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("第一步");
                return 1;
            }, executorService).handle((v, e) -> {
                System.out.println("第二步");
                int a = 1 / 0;
                return v + 1;
            }).handle((v, e) -> {
                System.out.println("第三步");
                return v + 1;
            }).whenComplete((v, e) -> {
                if (e == null) {
                    System.out.println("最终结果为:" + v);
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println(e.getMessage());
                return null;
            });
            executorService.shutdown();
            System.out.println(completableFuture.join());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    对计算结果进行消费

    public CompletableFuture thenAccept(Consumer action)
    把线程的运算结果消费掉,没有返回值。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CompletableFutureAPIDemo {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 1;
            }, executorService).thenAccept(System.out::println);
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    任务之间的执行顺序对比:
    public CompletableFuture thenRun(Runnable action):任务A执行完执行任务B,任务B不需要任务A的结果,也无返回值
    public CompletableFuture thenAccept(Consumer action):任务A执行完执行任务B,任务B需要任务A的结果,但是任务B无返回值
    public CompletableFuture thenApply(Function fn):任务A执行完执行任务B,任务B需要任务A的结果,但是任务B有返回值
    CompletableFuture和线程池的说明:
    如果不指定线程池,默认使用ForkJoinPool,如果指定了线程池,使用指定的线程池,在调用then*()方法的时候,还有一个then*Async()方法,如果使用了then*Async()方法,这个方法内的任务和这个方法后的任务都会使用ForkJoinPool来执行,除非你在then*Async()方法里又传了自定义线程池。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CompletableFutureAPIDemo {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "";
            }, executorService).thenRun(() -> {
                System.out.println("任务2" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).thenRun(() -> {
                System.out.println("任务3" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    如果线程内执行的太快,Thread.currentThread().getName()的结果可能是main线程,比方说,把Thread.sleep()去掉,就可以复现。
    查看thenRunAsync()方法,里面有一个asyncPool变量,查看它的赋值过程,判断useCommonPool,如果为真,使用ForkJoinPool,如果为假,新建一个线程池。useCommonPool的值又取决于ForkJoinPool.getCommonPoolParallelism() > 1,通过调试发现ForkJoinPool.getCommonPoolParallelism()的值是7,所以默认情况下,使用的是ForkJoinPool。

    对计算速度进行选用

    public CompletableFuture applyToEither(CompletionStage other, Function fn)
    应用场景:一个方法需要实现多个查询服务,多个服务之间相互独立,只要有一个能返回结果,自动放弃等待其他未执行完的查询。
    又找了一个例子:我计划从A到B去,有1路车和2路车,都可以到达,而且它们路线一致,我坐哪个呢?哪个车先来坐哪个呗,这里比较的是等待时间,可以把等待时间抽象成程序处理时间,哪个快走哪个,而且放弃其他所有的,也是这个道理。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CompletableFutureAPIDemo {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "result1";
            }, executorService);
            CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "result2";
            }, executorService);
            CompletableFuture<String> completableFuture = completableFuture1.applyToEither(completableFuture2, v -> v + " is winner");
            System.out.println(completableFuture.join());
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    对计算结果进行合并

    public CompletableFuture thenCombine(CompletionStage other, BiFunction fn)
    两个CompletionStage任务都完成后,将两个任务的结果一起提交给thenCombine来处理,先完成的任务需要等待另一个任务完成。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CompletableFutureAPIDemo {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 10;
            }, executorService);
            CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 20;
            }, executorService);
            CompletableFuture<Integer> completableFuture = completableFuture1.thenCombine(completableFuture2, (v1, v2) -> v1 * v2);
            System.out.println(completableFuture.join());
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    看到这里,发现弹幕有人提到public static CompletableFuture allOf(CompletableFuture... cfs)public static CompletableFuture anyOf(CompletableFuture... cfs)老师没讲,我就自己搜索了下。
    allOf接收若干个CompletableFuture,当所有的CompletableFuture都完成后,才会执行返回CompletableFuture。
    anyOf接收若干个CompletableFuture,当任意一个任务执行完成,就返回CompletableFuture。

  • 相关阅读:
    华为HCIP Datacom H12-831 卷25
    【量化交易笔记】11.移动平均交易策略
    Go代码格式化——gofmt的使用
    微软 SQL 服务器被黑,带宽遭到破坏
    真是绝了,做了这么多年程序员第一次搞懂微服务架构的数据一致性
    2022-11-17 更高效的Cascades优化器 - Columbia Query Optimizer
    STL 乱序算法
    redis的事件处理机制
    Django+Vue中文件的上传和下载
    消失的遗传力--wiki
  • 原文地址:https://blog.csdn.net/qq_36059561/article/details/128055675