• Java8新特性 CompletableFuture


    Java8新特性 CompletableFuture

    什么是CompletableFuture?

    CompletableFuture类的设计灵感来自于 Google GuavaListenableFuture 类,它实现了 FutureCompletionStage 接口并且新增了许多方法,它支持 lambda表达式,通过回调利用非阻塞方法,提升了异步编程模型。它允许我们通过在与主应用程序线程不同的线程上(也就是异步)运行任务,并向主线程通知任务的进度、完成或失败,来编写非阻塞代码。

    在这里插入图片描述

    为什么引入CompletableFuture?

    Java 的 1.5 版本引入了 Future,你可以把它简单的理解为运算结果的占位符,它提供了两个方法来获取运算结果。

    • get():调用该方法线程将会无限期等待运算结果。
    • get(long timeout, TimeUnit unit):调用该方法线程将仅在指定时间 timeout 内等待结果,如果等待超时就会抛出 TimeoutException 异常。

    Future 可以使用 RunnableCallable 实例来完成提交的任务,通过其源码可以看出,它存在如下几个问题:

    • 阻塞 调用 get() 方法会一直阻塞,直到等待直到计算完成,它没有提供任何方法可以在完成时通知,同时也不具有附加回调函数的功能。
    • 链式调用和结果聚合处理 在很多时候我们想链接多个 Future 来完成耗时较长的计算,此时需要合并结果并将结果发送到另一个任务中,该接口很难完成这种处理。
    • 异常处理 Future 没有提供任何异常处理的方式。

    而我们的CompletableFuture则成功地解决了上述的这些问题,下面将一一介绍CompletableFuture的一些常用的API方法的使用:

    CompletableFuture的API

    首先CompletableFuture实现Future接口,故Future接口存在的常见方法它本身也存在,这里不再进行讲解,处理以外,它还提供了手动完成complete()方法,判断是否完成isDon()方法,取消执行cancel()等等。

    1. 异步执行方法 runAsync/supplyAsync

    runAsync接受的参数为Runnable参数,无返回值;而supplyAsync接受的参数则为Supplier supplier,即有返回值的函数式接口,可以参考下表回顾一下函数式接口:

    在这里插入图片描述

    故我们简单编写一个方法,分别使用这两种异步任务的调用方式:

    public class TestCompletableFuture {
        public static void main(String[] args) {
          	//runAysnc
            testRunAsync();
          	//supplyAsync
            testSupplyAsync();
        }
        
        /**
         * 异步执行Code 无返回值
         */
        private static void testRunAsync() {
            CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
                System.out.println("runAsync...");
            });
        }
    
        /**
         * 异步执行Code 有返回值
         */
        private static void testSupplyAsync() {
            CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "supplyAsync...";
            });
            try {
                //异步获取结果
                String result = stringCompletableFuture.get();
                System.out.println("result:" + result);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    输出结果:

    runAsync...
    result:supplyAsync...
    
    • 1
    • 2

    2. 完成执行 whenComplete/whenCompleteAsync

    当我们定义上面的runAsync或者supplyAsync方法执行完毕以后,可以调用whenComplete或者whenCompleteAsync继续执行任务,区别在于:

    • whenComplete是有之前执行异步任务的线程继续执行任务,如果定义多个whenComplete,则它们之间是链式串行调用的关系
    • whenCompleteAsync则是将任务提交到线程池中进行执行,如果定义多个whenCompleteAsync,则它们之间则是并行调用的关系

    首先我们来看whenComplete的使用方法:

    main() {
      testWhenComplete();
    }
    private static void testWhenComplete() {
      CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello World");
      //定义两个whenComplete任务,此时会先执行01,再执行02
      future.whenComplete((res, error) -> {
        try {
          Thread.sleep(5000);
          System.out.println("whenComplete01");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      future.whenComplete((res, error) -> {
        try {
          Thread.sleep(1000);
          System.out.println("whenComplete02");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      System.out.println("completed...");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    输出结果:

    //sleep 5s
    whenComplete01
    //sleep 1s  
    whenComplete02
    completed...
    
    • 1
    • 2
    • 3
    • 4
    • 5

    然后我们再来看whenCompleteAsync的使用方法:

    main() {
      testWhenCompleteAsync();
    }
    
    private static void testWhenCompleteAsync() {
      CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello World");
      //此时定义两个whenCompleteAsync,在future执行完毕以后,两者并发执行
      future.whenCompleteAsync((res, error) -> {
        try {
          Thread.sleep(5000);
          System.out.println("whenComplete01");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      
      future.whenCompleteAsync((res, error) -> {
        try {
          Thread.sleep(1000);
          System.out.println("whenComplete02");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      
      System.out.println("completed...");
      try {
        Thread.sleep(10000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    输出结果:

    completed...
    //sleep 1s  
    whenComplete02
    //sleep 4s  
    whenComplete01
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3. 链式执行任务 thenCompose/thenApply

    两者均可将CompletableFuture连接起来,但是存在一些差异:

    • thenApply()接收的是前一个调用返回的结果,然后对该结果进行处理。
    • thenCompose()接收的是前一个调用的stage,返回flat之后的的CompletableFuture,即将上一个Future的执行结果作为下一个Future的输入

    thenApply方法

    main() {
      testThenApply();
    }
    private static void testThenApply() {
      CompletableFuture<String> completableFuture = CompletableFuture
        .supplyAsync(() -> "Hello")
        //s为上个任务的结果,可对其进行处理
        .thenApply(s -> s + "World");
      try {
        String result = completableFuture.get();
        System.out.println(result);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    输出结果:

    HelloWorld
    
    • 1

    thenCompose方法

    main(){
      testThenCompose();
    }
    private static void testThenCompose() {
      CompletableFuture<String> stringCompletableFuture = CompletableFuture
        .supplyAsync(() -> "Hello")
        //s即为上一个CompletableFuture的输出结果
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "World"));
      try {
        String result = stringCompletableFuture.get();
        System.out.println(result);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    输出结果:

    HelloWorld
    
    • 1

    4. 合并处理 thenCombine/thenAcceptBoth

    两个合并的方法本质上的区别在于thenCombine有返回值,而thenAcceptBoth无返回值

    public class TestCompletableFuture {
        public static void main(String[] args) {
            //有返回值的合并操作
            testThenCombine();
            //无返回值的合并操作
            testThenAcceptBoth();
        }
    
        private static void testThenAcceptBoth() {
            CompletableFuture.supplyAsync(() -> "Hello")
                    .thenAcceptBoth(
                        CompletableFuture.supplyAsync(() -> "World"), (s1, s2) -> System.out.println(s1 + s2)
            );
        }
    
        private static void testThenCombine() {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
                    .thenCombine(
                            CompletableFuture.supplyAsync(() -> "World"), (c1, c2) -> c1 + c2
                    );
            try {
                String result = completableFuture.get();
                System.out.println(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    输出结果:

    HelloWorld
    HelloWorld
    
    • 1
    • 2

    5. 聚合处理 allOf/anyOf

    我们需要并行执行任务时,通常我们需要等待所有的任务都执行完毕再去处理其他的任务,那么我们可以用到allOf,等同于 CountDownLatch闭锁。而anyOf则是任务中有一个完成则直接去处理其他的任务,无需再等待其他任务执行完毕。

    allOf方法:

    public class TestCompletableFuture {
        public static void main(String[] args) {
            testAllOf();
        }
    
        private static void testAllOf() {
            //定义三个任务
            CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("task1 running"));
            CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> System.out.println("task2 running"));
            CompletableFuture<Void> future3 = CompletableFuture.runAsync(() ->
            {
                try {
                    //模拟延迟任务
                    Thread.sleep(5000);
                    System.out.println("task3 running");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            //合并三个任务
            CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2, future3);
            System.out.println("task waiting...");
            //等待所有任务执行完毕
            future.join();
            System.out.println("task completed");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    输出结果:

    task1 running
    task2 running
    task waiting...
    //waiting about 5s
    task3 running
    task completed
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    anyOf方法:

    public class TestCompletableFuture {
        public static void main(String[] args) {
            testAnyOf();
        }
    
        private static void testAnyOf() {
            CompletableFuture<Void> future1 = CompletableFuture.runAsync(
                    () -> {
                        try {
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("task1 running...");
                    }
            );
            CompletableFuture<Void> future2 = CompletableFuture.runAsync(
                    () -> {
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("task2 running...");
                    }
            );
            CompletableFuture<Void> future3 = CompletableFuture.runAsync(
                    () -> {
                        try {
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("task3 running...");
                    }
            );
            //只要有一个任务执行完毕,即不再等待...
            CompletableFuture<Object> completableFuture = CompletableFuture.anyOf(future1, future2, future3);
            System.out.println("task waiting...");
            completableFuture.join();
            System.out.println("task completed");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    输出结果:任务2完毕继续其他任务,不再等待任务1和任务3

    task waiting...
    task2 running...
    task completed
    
    • 1
    • 2
    • 3

    6. 多任务返回值联合处理 join

    public class TestCompletableFuture {
        public static void main(String[] args) {
            testJoin();
        }
    
        private static void testJoin() {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
            CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Future");
            //对三个任务的返回值进行join处理
            String result = Stream.of(future1, future2, future3)
                    .map(CompletableFuture::join)
                    .collect(Collectors.joining(" "));
            System.out.println(result);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    输出结果:

    Hello World Future
    
    • 1

    7. 异常处理 handle(result, exception)

    我们在异步任务过程中可以抛出异常,并通过handle进行异常处理:

    public class TestCompletableFuture {
        public static void main(String[] args) {
            testHandle();
        }
    
        /**
         * 异常处理
         */
        private static void testHandle() {
            //模拟参数为null
            String param = null;
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                if (param == null) {
                    throw new RuntimeException("params_error");
                }
                return "Hello" + param;
            }).handle((res, ex) -> res != null ? res : ex.getMessage());
    
            try {
                String result = completableFuture.get();
                System.out.println(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    以上就是CompletableFuture的常见的一些API,当然还有很多其他一些API方法,这里不再赘述,可以查看对应的API文档进行使用…

  • 相关阅读:
    数据结构-其他
    Frp内网穿透
    使用cpolar 连接Ubuntu和Windows
    Linux篇:进程
    多极神经元红蓝铅笔手绘,多极神经元手绘图作业
    XSS game -xss学习
    前端Vue+后端Django实现微信登录
    ACK 思维导图
    《异常检测——从经典算法到深度学习》29 EasyTSAD: 用于时间序列异常检测模型的工业级基准
    JVM启动参数大全及默认值
  • 原文地址:https://blog.csdn.net/SmallPig_Code/article/details/127906881