• 【业务功能篇92】微服务-springcloud-多线程-异步处理-异步编排-CompletableFutrue


    三、CompletableFutrue

    一个商品详情页

    • 展示SKU的基本信息 0.5s
    • 展示SKU的图片信息 0.6s
    • 展示SKU的销售信息 1s
    • spu的销售属性 1s
    • 展示规格参数 1.5s
    • spu详情信息 1s

    1.ComplatableFuture介绍

      Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用 isDone方法检查计算是否完成,或者使用 get阻塞住调用线程,直到计算完成返回结果,你也可以使用 cancel方法停止任务的执行。

      虽然 Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

      很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future接口,提供了 addListener等多个扩展方法;Google guava也提供了通用的扩展Future;Scala也提供了简单易用且功能强大的Future/Promise异步编程模式。

      作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?

      在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

      CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过 get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

      CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

    image.png

    2.创建异步对象

    CompletableFuture 提供了四个静态方法来创建一个异步操作。

    static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    
    • 1
    • 2
    • 3
    • 4

    方法分为两类:

    • runAsync 没有返回结果
    • supplyAsync 有返回结果
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5
                ,50
                ,10
                , TimeUnit.SECONDS
                ,new LinkedBlockingQueue<>(100)
                , Executors.defaultThreadFactory()
                ,new ThreadPoolExecutor.AbortPolicy()
        );
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            System.out.println("main -- 线程开始了...");
            // 获取CompletableFuture对象
            CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
                System.out.println("线程开始了...");
                int i = 100/50;
                System.out.println("线程结束了...");
            },executor);
            System.out.println("main -- 线程结束了...");
    
            System.out.println("------------");
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程开始了...");
                int i = 100 / 50;
                System.out.println("线程结束了...");
                return i;
            }, executor);
            System.out.println("获取的线程的返回结果是:" + future.get() );
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    3.whenXXX和handle方法

      当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

    public CompletableFuture whenComplete(BiConsumer action);
    public CompletableFuture whenCompleteAsync(BiConsumer action);
    public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor);
    
    public CompletableFuture exceptionally(Function fn);
    
    public  CompletableFuture handle(BiFunction fn) ;
    public  CompletableFuture handleAsync(BiFunction fn) ;
    public  CompletableFuture handleAsync(BiFunction fn, Executor executor) ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    相关方法的说明:

    • whenComplete 可以获取异步任务的返回值和抛出的异常信息,但是不能修改返回结果
    • execptionlly 当异步任务跑出了异常后会触发的方法,如果没有抛出异常该方法不会执行
    • handle 可以获取异步任务的返回值和抛出的异常信息,而且可以显示的修改返回的结果
    /**
     * CompletableFuture的介绍
     */
    public class CompletableFutureDemo2 {
    
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5
                ,50
                ,10
                , TimeUnit.SECONDS
                ,new LinkedBlockingQueue<>(100)
                , Executors.defaultThreadFactory()
                ,new ThreadPoolExecutor.AbortPolicy()
        );
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程开始了...");
                int i = 100 / 5;
                System.out.println("线程结束了...");
                return i;
            }, executor).handle((res,exec)->{
                System.out.println("res = " + res + ":exec="+exec);
                return res * 10;
            });
            // 可以处理异步任务之后的操作
            System.out.println("获取的线程的返回结果是:" + future.get() );
        }
    
     /*   public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture future = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程开始了...");
                int i = 100 / 5;
                System.out.println("线程结束了...");
                return i;
            }, executor).whenCompleteAsync((res,exec)->{
                System.out.println("res = " + res);
                System.out.println("exec = " + exec);
            }).exceptionally((res)->{ // 在异步任务显示的抛出了异常后才会触发的方法
                System.out.println("res = " + res);
                return 10;
            });
            // 可以处理异步任务之后的操作
            System.out.println("获取的线程的返回结果是:" + future.get() );
        }*/
    
    /*    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture future = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程开始了...");
                int i = 100 / 0;
                System.out.println("线程结束了...");
                return i;
            }, executor).whenCompleteAsync((res,exec)->{
                System.out.println("res = " + res);
                System.out.println("exec = " + exec);
            });
            // 可以处理异步任务之后的操作
            System.out.println("获取的线程的返回结果是:" + future.get() );
        }*/
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    4.线程串行方法

    thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

    thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

    thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

    带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
    
    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
    
    public CompletionStage<Void> thenRun(Runnable action);
    public CompletionStage<Void> thenRunAsync(Runnable action);
    public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    /**
     * CompletableFuture的介绍
     */
    public class CompletableFutureDemo3 {
    
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5
                ,50
                ,10
                , TimeUnit.SECONDS
                ,new LinkedBlockingQueue<>(100)
                , Executors.defaultThreadFactory()
                ,new ThreadPoolExecutor.AbortPolicy()
        );
    
        /**
         * 线程串行的方法
         * thenRun:在前一个线程执行完成后,开始执行,不会获取前一个线程的返回结果,也不会返回信息
         * thenAccept:在前一个线程执行完成后,开始执行,获取前一个线程的返回结果,不会返回信息
         * thenApply: 在前一个线程执行完成后。开始执行,获取前一个线程的返回结果,同时也会返回信息
         * @param args
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程开始了..." + Thread.currentThread().getName());
                int i = 100 / 5;
                System.out.println("线程结束了..." + Thread.currentThread().getName());
                return i;
            }, executor).thenApply(res -> {
                System.out.println("res = " + res);
                return res * 100;
            });
            // 可以处理异步任务之后的操作
            System.out.println("获取的线程的返回结果是:" + future.get() );
        }
        /*public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程开始了..." + Thread.currentThread().getName());
                int i = 100 / 5;
                System.out.println("线程结束了..." + Thread.currentThread().getName());
                return i;
            }, executor).thenAcceptAsync(res -> {
                System.out.println(res + ":" + Thread.currentThread().getName());
            }, executor);
            // 可以处理异步任务之后的操作
            //System.out.println("获取的线程的返回结果是:" + future.get() );
        }*/
        /*public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程开始了..."+Thread.currentThread().getName());
                int i = 100 / 5;
                System.out.println("线程结束了..."+Thread.currentThread().getName());
                return i;
            }, executor).thenRunAsync(() -> {
                System.out.println("线程开始了..."+Thread.currentThread().getName());
                int i = 100 / 5;
                System.out.println("线程结束了..."+Thread.currentThread().getName());
            }, executor);
            // 可以处理异步任务之后的操作
            //System.out.println("获取的线程的返回结果是:" + future.get() );
        }*/
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    5.两个都完成

      上面介绍的相关方法都是串行的执行,接下来看看需要等待两个任务执行完成后才会触发的几个方法

    • thenCombine :可以获取前面两线程的返回结果,本身也有返回结果
    • thenAcceptBoth:可以获取前面两线程的返回结果,本身没有返回结果
    • runAfterBoth:不可以获取前面两线程的返回结果,本身也没有返回结果
    /**
         * @param args
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1 线程开始了..." + Thread.currentThread().getName());
                int i = 100 / 5;
                System.out.println("任务1 线程结束了..." + Thread.currentThread().getName());
                return i;
            }, executor);
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2 线程开始了..." + Thread.currentThread().getName());
                int i = 100 /10;
                System.out.println("任务2 线程结束了..." + Thread.currentThread().getName());
                return i;
            }, executor);
    
            // runAfterBothAsync 不能获取前面两个线程的返回结果,本身也没有返回结果
            CompletableFuture<Void> voidCompletableFuture = future1.runAfterBothAsync(future2, () -> {
                System.out.println("任务3执行了");
            },executor);
    
            // thenAcceptBothAsync 可以获取前面两个线程的返回结果,本身没有返回结果
            CompletableFuture<Void> voidCompletableFuture1 = future1.thenAcceptBothAsync(future2, (f1, f2) -> {
                System.out.println("f1 = " + f1);
                System.out.println("f2 = " + f2);
            }, executor);
    
            // thenCombineAsync: 既可以获取前面两个线程的返回结果,同时也会返回结果给阻塞的线程
            CompletableFuture<String> stringCompletableFuture = future1.thenCombineAsync(future2, (f1, f2) -> {
                return f1 + ":" + f2;
            }, executor);
    
            // 可以处理异步任务之后的操作
            System.out.println("获取的线程的返回结果是:" + stringCompletableFuture.get() );
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    6.两个任务完成一个

      在上面5个基础上我们来看看两个任务只要有一个完成就会触发任务3的情况

    • runAfterEither:不能获取完成的线程的返回结果,自身也没有返回结果
    • acceptEither:可以获取线程的返回结果,自身没有返回结果
    • applyToEither:既可以获取线程的返回结果,自身也有返回结果
    /**
         * @param args
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1 线程开始了..." + Thread.currentThread().getName());
                int i = 100 / 5;
                System.out.println("任务1 线程结束了..." + Thread.currentThread().getName());
                return i;
            }, executor);
            CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2 线程开始了..." + Thread.currentThread().getName());
                int i = 100 /10;
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务2 线程结束了..." + Thread.currentThread().getName());
                return i+"";
            }, executor);
            // runAfterEitherAsync 不能获取前面完成的线程的返回结果,自身也没有返回结果
            future1.runAfterEitherAsync(future2,()->{
                System.out.println("任务3执行了....");
            },executor);
    
            // acceptEitherAsync 可以获取前面完成的线程的返回结果  自身没有返回结果
            future1.acceptEitherAsync(future2,(res)->{
                System.out.println("res = " + res);
            },executor);
    
            // applyToEitherAsync 既可以获取完成任务的线程的返回结果  自身也有返回结果
            CompletableFuture<String> stringCompletableFuture = future1.applyToEitherAsync(future2, (res) -> {
                System.out.println("res = " + res);
                return res + "-->OK";
            }, executor);
            // 可以处理异步任务之后的操作
            System.out.println("获取的线程的返回结果是:" + stringCompletableFuture.get() );
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    7.多任务组合

    allOf:等待所有任务完成

    anyOf:只要有一个任务完成

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
    
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
    
    • 1
    • 2
    • 3
    /**
         * @param args
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1 线程开始了..." + Thread.currentThread().getName());
                int i = 100 / 5;
                System.out.println("任务1 线程结束了..." + Thread.currentThread().getName());
                return i;
            }, executor);
            CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2 线程开始了..." + Thread.currentThread().getName());
                int i = 100 /10;
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务2 线程结束了..." + Thread.currentThread().getName());
                return i+"";
            }, executor);
    
            CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务3 线程开始了..." + Thread.currentThread().getName());
                int i = 100 /10;
                System.out.println("任务3 线程结束了..." + Thread.currentThread().getName());
                return i+"";
            }, executor);
    
            CompletableFuture anyOf = CompletableFuture.anyOf(future1, future2, future3);
            anyOf.get();
            System.out.println("主任务执行完成..." + anyOf.get());
    
            CompletableFuture allOf = CompletableFuture.allOf(future1, future2, future3);
            allOf.get();// 阻塞在这个位置,等待所有的任务执行完成
            System.out.println("主任务执行完成..." + future1.get() + " :" + future2.get() + " :" + future3.get());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
  • 相关阅读:
    [书籍翻译]12周撰写期刊文章 学术出版成功指南——第 6 周:加强结构
    MySQL复合查询
    网络爬虫的架构
    基于linux系统的CAN总线移动机器人- 板子烧入linux系统
    【Django】开发日报_13_Day:手机号码管理系统(11)
    3D视觉应用案例:引导板件定位抓取
    链表中LinkList L与LinkList *L( & * L.elem L->elem)
    JavaScript数据结构与算法-排序全详解
    Linux 回顾总结
    创立一年就估值5亿美金,这个项目是怎么做到的?
  • 原文地址:https://blog.csdn.net/studyday1/article/details/132610270