• CompletableFuture 使用教程


    CompletableFuture实现了CompletionStage接口和Future接口,CompletionStage是对Future的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

    Future没有提供通知机制,Future是否执行完任务需要通过轮询isDone这个方法查询执行结果或者调用get()方法阻塞任务执行,CompletionStage解决了该问题,前一个任务执行成功后可以自动触发下一个任务的执行,中间无需等待。

    1. 创建异步任务

    supplyAsync方法

    //使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    //自定义线程,根据supplier构建执行任务
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    
    • 1
    • 2
    • 3
    • 4

    runAsync方法

    //使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
    public static CompletableFuture<Void> runAsync(Runnable runnable) 
    //自定义线程,根据runnable构建执行任务
    public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)
    
    • 1
    • 2
    • 3
    • 4
    • supplyAsync执行CompletableFuture任务,支持返回值
    • runAsync执行CompletableFuture任务,没有返回值。
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) {
            CompletableFuture<Void> runFuture =
                CompletableFuture.runAsync(() -> userService.compute("runAsync"), fishExecutor);
            CompletableFuture<String> supplyFuture =
                CompletableFuture.supplyAsync(() -> userService.compute("supplyAsync"), fishExecutor);
            System.out.println(runFuture.join());
            System.out.println(supplyFuture.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    join与get的区别

    • 相同点
      1.join()和get()方法都是用来获取CompletableFuture异步之后的返回值

    • 不同点
      1.join()方法抛出的是RuntimeException异常,不需要特殊处理
      2.get()方法抛出的是需要手动处理的异常,例如ExecutionException, InterruptedException,需要抛出或者try-catch

    • 结果打印

    16:45:22.054 [fish] INFO com.example.mavendemo.Test$UserService - runAsync
    16:45:22.054 [fish] INFO com.example.mavendemo.Test$UserService - supplyAsync
    null
    supplyAsync
    
    • 1
    • 2
    • 3
    • 4

    2. 任务异步回调

    1. thenRun/thenRunAsync

    public CompletableFuture<Void> thenRun(Runnable action);
    public CompletableFuture<Void> thenRunAsync(Runnable action);
    
    • 1
    • 2
    • thenRun/thenRunAsync作用是做完第一个任务后,再做第二个任务(Runnable action)。前一个任务执行结束后,执行回调方法(Runnable action);但是两个任务之间没有参数传递,第二个任务也没有返回值
    • thenRun 和 thenRunAsync 的区别:

    thenRun执行第二个任务的线程池和第一个任务是同一个,或者thenRunAsync传入的线程池
    thenRunAsync执行第二个任务使用的是ForkJoin线程池,或者你传入的线程池

        public CompletableFuture<Void> thenRun(Runnable action) {
            return uniRunStage(null, action);
        }
    
        public CompletableFuture<Void> thenRunAsync(Runnable action) {
            return uniRunStage(asyncPool, action);
        }
    
        public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
            return uniRunStage(screenExecutor(executor), action);
        }    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test2 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
    
        public static void main(String[] args) {
            CompletableFuture<String> supplyFuture =
                CompletableFuture.supplyAsync(() -> userService.compute("第一个任务"), fishExecutor);
            CompletableFuture<Void> voidCompletableFuture1 = supplyFuture.thenRun(() -> userService.noResult("第二个任务"));
            CompletableFuture<Void> voidCompletableFuture2 = supplyFuture.thenRunAsync(() -> userService.noResult("第三个任务"));
            System.out.println(voidCompletableFuture1.join());
            System.out.println(voidCompletableFuture2.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
            public void noResult(String message) {
                log.info(message);
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 打印,第二个任务和第一个任务使用fish线程
    16:45:48.526 [fish] INFO com.example.mavendemo.Test2$UserService - 第一个任务
    16:45:48.529 [fish] INFO com.example.mavendemo.Test2$UserService - 第二个任务
    null
    16:45:48.530 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test2$UserService - 第三个任务
    null
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 加入第四任务,在看任务使用的线程
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test2 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static ExecutorService catExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "cat"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) {
            CompletableFuture<String> supplyFuture =
                CompletableFuture.supplyAsync(() -> userService.compute("第一个任务"), fishExecutor);
            CompletableFuture<Void> voidCompletableFuture1 = supplyFuture.thenRun(() -> userService.noResult("第二个任务"));
            CompletableFuture<Void> voidCompletableFuture2 = supplyFuture.thenRunAsync(() -> userService.noResult("第三个任务"));
            CompletableFuture<Void> voidCompletableFuture3 =
                supplyFuture.thenRunAsync(() -> userService.noResult("第四个任务"), catExecutor);
            System.out.println(voidCompletableFuture1.join());
            System.out.println(voidCompletableFuture2.join());
            System.out.println(voidCompletableFuture3.join());
            fishExecutor.shutdown();
            catExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
            public void noResult(String message) {
                log.info(message);
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 打印,第二个任务和第四个任务使用cat线程,第三个任务使用ForkJoinPool线程
    16:47:34.132 [fish] INFO com.example.mavendemo.Test2$UserService - 第一个任务
    16:47:34.134 [cat] INFO com.example.mavendemo.Test2$UserService - 第四个任务
    16:47:34.134 [cat] INFO com.example.mavendemo.Test2$UserService - 第二个任务
    null
    16:47:34.135 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test2$UserService - 第三个任务
    null
    null
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2. thenAccept/thenAcceptAsync

    • 第一个任务执行结束后,执行第二个回调方法任务,会将第一个任务的执行结果作为入参,传递到回调方法中,但回调方法没有返回值
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test3 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) {
            CompletableFuture<String> supplyFuture =
                CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);
            CompletableFuture<Void> voidCompletableFuture1 =
                supplyFuture.thenAccept((a) -> userService.noResult("传入了前一个任务的结果:" + a));
            CompletableFuture<Void> voidCompletableFuture2 =
                supplyFuture.thenAcceptAsync((a) -> userService.noResult("传入了前一个任务的结果:" + a));
            System.out.println(voidCompletableFuture1.join());
            System.out.println(voidCompletableFuture2.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
            public void noResult(String message) {
                log.info(message);
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 打印
    17:01:09.637 [fish] INFO com.example.mavendemo.Test3$UserService - 这是第一个任务
    17:01:09.639 [fish] INFO com.example.mavendemo.Test3$UserService - 传入了前一个任务的结果:这是第一个任务
    17:01:09.639 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test3$UserService - 传入了前一个任务的结果:这是第一个任务
    null
    null
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3. thenApply/thenApplyAsync

    • 第一个任务执行完成后,执行第二个任务的回调方法,将第一个任务的执行结果作为入参,传递到回调方法中,而且回调方法有返回值
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test4 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) {
            CompletableFuture<String> supplyFuture =
                CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);
            CompletableFuture<String> voidCompletableFuture1 =
                supplyFuture.thenApply((a) -> userService.compute("传入了前一个任务的结果:" + a));
            CompletableFuture<String> voidCompletableFuture2 =
                supplyFuture.thenApplyAsync((a) -> userService.compute("传入了前一个任务的结果:" + a));
            System.out.println(voidCompletableFuture1.join());
            System.out.println(voidCompletableFuture2.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
            public void noResult(String message) {
                log.info(message);
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 打印
    17:06:08.946 [fish] INFO com.example.mavendemo.Test4$UserService - 这是第一个任务
    17:06:08.948 [fish] INFO com.example.mavendemo.Test4$UserService - 传入了前一个任务的结果:这是第一个任务
    传入了前一个任务的结果:这是第一个任务
    17:06:08.948 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test4$UserService - 传入了前一个任务的结果:这是第一个任务
    传入了前一个任务的结果:这是第一个任务
    
    • 1
    • 2
    • 3
    • 4
    • 5

    4. exceptionally

    • 任务执行异常时,执行回调方法;并且将抛出的异常作为参数,传到回调方法
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test5 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) {
            CompletableFuture<String> supplyFuture =
                CompletableFuture.supplyAsync(() -> userService.exceptionMethod("这是第一个任务"), fishExecutor);
            CompletableFuture<String> exceptionally = supplyFuture.exceptionally(o -> userService.catchException(o));
            System.out.println(exceptionally.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String exceptionMethod(String message) {
                log.info(message);
                throw new RuntimeException(message);
            }
    
            public String catchException(Object o) {
                RuntimeException e = (RuntimeException)o;
                log.error("{}", e);
                return e.getMessage();
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 打印
    17:37:53.901 [fish] INFO com.example.mavendemo.Test5$UserService - 这是第一个任务
    17:37:53.907 [fish] ERROR com.example.mavendemo.Test5$UserService - {}
    java.util.concurrent.CompletionException: java.lang.RuntimeException: 这是第一个任务
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.RuntimeException: 这是第一个任务
    	at com.example.mavendemo.Test5$UserService.exceptionMethod(Test5.java:27)
    	at com.example.mavendemo.Test5.lambda$main$1(Test5.java:16)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    	... 3 common frames omitted
    java.lang.RuntimeException: 这是第一个任务
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    5. whenComplete/whenCompleteAsync

    • 第一个任务执行结束后,执行第二个回调方法任务,把上一级任务执行的结果和异常信息作为入参传入回调方法;并且whenComplete方法返回的CompletableFuture的result是上个任务的结果。
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test6 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) {
            CompletableFuture<String> supplyFuture =
                CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);
            CompletableFuture<String> voidCompletableFuture1 =
                supplyFuture.whenComplete((a,throwable) -> userService.compute("传入了前一个任务的结果:" + a));
            CompletableFuture<String> voidCompletableFuture2 =
                supplyFuture.whenCompleteAsync((a,throwable) -> userService.compute("传入了前一个任务的结果:" + a));
            System.out.println(voidCompletableFuture1.join());
            System.out.println(voidCompletableFuture2.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
            public void noResult(String message) {
                log.info(message);
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 打印
    17:49:36.135 [fish] INFO com.example.mavendemo.Test6$UserService - 这是第一个任务
    17:49:36.137 [fish] INFO com.example.mavendemo.Test6$UserService - 传入了前一个任务的结果:这是第一个任务
    这是第一个任务
    17:49:36.137 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test6$UserService - 传入了前一个任务的结果:这是第一个任务
    这是第一个任务
    
    • 1
    • 2
    • 3
    • 4
    • 5

    6. handle/handleAsync

    • 第一个任务执行结束后,执行第二个回调方法任务,把上一级任务执行的结果和异常信息作为入参传入回调方法;并且whenComplete方法返回的CompletableFuture的result是回调方法的结果。
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test7 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) {
            CompletableFuture<String> supplyFuture =
                CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);
            CompletableFuture<String> voidCompletableFuture1 =
                supplyFuture.handle((a,throwable) -> userService.compute("传入了前一个任务的结果:" + a));
            CompletableFuture<String> voidCompletableFuture2 =
                supplyFuture.handleAsync((a,throwable) -> userService.compute("传入了前一个任务的结果:" + a));
            System.out.println(voidCompletableFuture1.join());
            System.out.println(voidCompletableFuture2.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
            public void noResult(String message) {
                log.info(message);
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 打印
    17:52:20.197 [fish] INFO com.example.mavendemo.Test7$UserService - 这是第一个任务
    17:52:20.199 [fish] INFO com.example.mavendemo.Test7$UserService - 传入了前一个任务的结果:这是第一个任务
    传入了前一个任务的结果:这是第一个任务
    17:52:20.199 [ForkJoinPool.commonPool-worker-9] INFO com.example.mavendemo.Test7$UserService - 传入了前一个任务的结果:这是第一个任务
    传入了前一个任务的结果:这是第一个任务
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3. 多任务组合

    1. AND

    • thenCombine / thenAcceptBoth / runAfterBoth都表示将两个CompletableFuture组合起来,只有这两个都正常执行结束,才会执行回调任务。
    • 区别在于:
      thenCombine/thenCombineAsync: 会将两个任务的执行结果作为入参,传到指定方法中,且有返回值
      thenAcceptBoth/thenAcceptBothAsync: 会将两个任务的执行结果作为入参,传到指定方法中,且无返回值
      runAfterBoth/runAfterBothAsync: 不会把执行结果当做入参,且没有返回值。
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test8 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) {
            CompletableFuture<String> first =
                CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);
            CompletableFuture<String> second =
                CompletableFuture.supplyAsync(() -> userService.compute("这是第二个任务"), fishExecutor);
            CompletableFuture<String> thenCombine = first.thenCombine(second, (s, s2) -> userService.combine(s, s2));
            System.out.println(thenCombine.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
            public String combine(String first, String second) {
                log.info("这是combine方法");
                return first + second;
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 打印
    09:49:38.621 [fish] INFO com.example.mavendemo.Test8$UserService - 这是第二个任务
    09:49:38.621 [fish] INFO com.example.mavendemo.Test8$UserService - 这是第一个任务
    09:49:38.624 [fish] INFO com.example.mavendemo.Test8$UserService - 这是combine方法
    这是第一个任务这是第二个任务
    
    • 1
    • 2
    • 3
    • 4

    2. OR

    • applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行结束,就会执行回调任务。
    • 区别在于:
      applyToEither/applyToEitherAsync: 会将已经执行完成的任务的结果作为入参,传到指定方法中,且有返回值
      acceptEither/acceptEitherAsync: 会将已经执行完成的任务的结果作为入参,传到指定方法中,且无返回值
      runAfterEither/runAfterEitherAsync: 不会把执行结果当做入参,且没有返回值。
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test9 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) {
            CompletableFuture<String> first =
                CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);
            CompletableFuture<String> second =
                CompletableFuture.supplyAsync(() -> userService.compute("这是第二个任务"), fishExecutor);
            CompletableFuture<String> applyToEither = first.applyToEither(second, s -> userService.either(s));
            System.out.println(applyToEither.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
            public String either(String result) {
                log.info("这是either方法");
                return result;
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 打印
    10:02:51.523 [fish] INFO com.example.mavendemo.Test9$UserService - 这是第一个任务
    10:02:51.523 [fish] INFO com.example.mavendemo.Test9$UserService - 这是第二个任务
    10:02:51.526 [fish] INFO com.example.mavendemo.Test9$UserService - 这是either方法
    这是第一个任务
    
    • 1
    • 2
    • 3
    • 4
    • 也可能打印
    10:03:50.381 [fish] INFO com.example.mavendemo.Test9$UserService - 这是第二个任务
    10:03:50.381 [fish] INFO com.example.mavendemo.Test9$UserService - 这是第一个任务
    10:03:50.383 [fish] INFO com.example.mavendemo.Test9$UserService - 这是either方法
    这是第二个任务
    
    • 1
    • 2
    • 3
    • 4

    3. AllOf

    • 所有任务都执行完成后,才执行 allOf 返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture执行get或join方法,会抛出异常;如果都是正常执行,则返回null。
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test10 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) {
            CompletableFuture<String> first =
                CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);
            CompletableFuture<String> second =
                CompletableFuture.supplyAsync(() -> userService.error("这是第二个任务"), fishExecutor);
    
            CompletableFuture<Void> allOfFuture =
                CompletableFuture.allOf(first, second).whenComplete((unused, throwable) -> {
                    System.out.println(unused);
                    System.out.println(throwable.getMessage());
                    System.out.println("finish");
                });
            System.out.println(allOfFuture.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
            public String error(String message) {
                log.info(message);
                throw new RuntimeException("出错了");
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 打印
    10:41:43.120 [fish] INFO com.example.mavendemo.Test10$UserService - 这是第二个任务
    10:41:43.120 [fish] INFO com.example.mavendemo.Test10$UserService - 这是第一个任务
    null
    java.lang.RuntimeException: 出错了
    finish
    Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: 出错了
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.RuntimeException: 出错了
    	at com.example.mavendemo.Test10$UserService.computeError(Test10.java:40)
    	at com.example.mavendemo.Test10.lambda$main$2(Test10.java:18)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    	... 3 more
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    4. AnyOf

    • 任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行get或join方法,返回执行完任务的结果。
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test11 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> first =
                CompletableFuture.supplyAsync(() -> userService.compute("这是第一个任务"), fishExecutor);
            CompletableFuture<String> second =
                CompletableFuture.supplyAsync(() -> userService.compute("这是第二个任务"), fishExecutor);
    
            CompletableFuture<Object> anyOfFuture =
                CompletableFuture.anyOf(second, first).whenComplete((unused, throwable) -> {
                    System.out.println(unused);
                    System.out.println("finish");
                });
            System.out.println(anyOfFuture.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 打印
    13:15:46.529 [fish] INFO com.example.mavendemo.Test11$UserService - 这是第二个任务
    13:15:46.529 [fish] INFO com.example.mavendemo.Test11$UserService - 这是第一个任务
    这是第二个任务
    finish
    这是第二个任务
    
    • 1
    • 2
    • 3
    • 4
    • 5

    5. thenCompose/thenComposeAsync

    • 在一个任务执行完成后,将该任务的执行结果作为入参,传到指定的方法去执行,该方法会返回一个新的CompletableFuture实例。
    package com.example.mavendemo;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import lombok.extern.slf4j.Slf4j;
    
    public class Test12 {
    
        static ExecutorService fishExecutor = Executors.newCachedThreadPool((r) -> new Thread(r, "fish"));
        static UserService userService = new UserService();
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Integer> first =
                CompletableFuture.supplyAsync(() -> userService.number("996"), fishExecutor);
            CompletableFuture<String> thenCompose = first.thenCompose(
                s -> CompletableFuture.supplyAsync(() -> userService.compute("这是第二个任务,接收到的入参是:" + s), fishExecutor));
            System.out.println(thenCompose.join());
            fishExecutor.shutdown();
        }
    
        @Slf4j
        static class UserService {
    
            public Integer number(String number) {
                log.info(number);
                return Integer.valueOf(number);
            }
    
            public String compute(String message) {
                log.info(message);
                return message;
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 打印
    13:57:16.096 [fish] INFO com.example.mavendemo.Test12$UserService - 996
    13:57:16.098 [fish] INFO com.example.mavendemo.Test12$UserService - 这是第二个任务,接收到的入参是:996
    这是第二个任务,接收到的入参是:996
    
    • 1
    • 2
    • 3

    注意点

    1. 如果任务执行发生异常,需要调用get()或join()获取返回值,才能获取异常信息;或者使用try…catch…或者使用exceptionally方法。

    2. CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,最好添加超时时间,例如get(10, TimeUnit.SECONDS)

    3. 推荐使用自定义线程池,这样可以根据需要优化线程池配置

  • 相关阅读:
    正则笔记(持续更新)
    LIS系统 检验系统源码 检验科LIS系统源码
    C++ Reference: Standard C++ Library reference: C Library: cfenv: feraiseexcept
    Linux内核4.14版本——drm框架分析(14)——Atomic KMS 架构(struct drm_atomic_state)
    Linux常用指令总结
    centos离线安装telnet、traceroute工具
    音频——I2S 左对齐模式(三)
    贪心算法--装箱问题
    COMSOL泰森多边形Voronoi图多孔骨架优化模型受力分析
    生命在于学习——代码审计工具
  • 原文地址:https://blog.csdn.net/qq_40977118/article/details/128101532