• Java基础之浅聊 CompletableFuture类


    其实前面聊future的接口,主要聊的是FurtureTask接口,不过发现其FurtureTask也有其弊端,使用get方法的生活会将线程阻塞。有时候会通过while循环通过isdone来判断是否异步回调已经完成,但是这样虽然会解决阻塞问题,让主线程去执行其它的业务,但是其有一个问题那就是占用CPU的资源。如果不懂可以看另一篇文章:传送阵

    除了以上的缺点,还有一个问题,那就是如果处理的了多个异步,然后将所有异步结果进行一起计算呢?如果使用FurtureTask,就有些不方便了。既然我们都可以想到的问题,那么java的大佬岂能不会想到。

    所以就有了一个神奇的类CompletableFuture。说句题外话这个类其实操作起来有点像是JavaScript中Promise。如果使用过Promise的话,对于这个CompletableFuture用法就很容易上手。(感兴趣的话可以看我promise的文章:传送阵

    在这里插入图片描述

    可以看出这个方法是JDK1.8版本才出现的一个新的类,

    public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 
    
    • 1

    可以看出CompletableFuture这个类同时实现两个接口FutureCompletionStage

    而可以实现异步调用自然是有Future,而新增的其它功能就是靠另一个接口CompletionStage来实现了。

    CompletionStage接口

    先看一下官网文档:

    在这里插入图片描述

    这个也是1.8版本才有的一个接口,而简单的说其特点:

    • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成之后会触发另一个阶段。

    • 一个阶段的计算执行可以是一个Function,consumer或者Runnable等,格式和像是是Promise的样子:

      例如: stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println()) 。 另外一个表单( 组合 )应用阶段本身的功能,而不是它们的结果。

    • 一个阶段的执行可能是被单个阶段完成触发,也可以由多个阶段一起触发。

    现在不看源码,简单的说就是简单演示其如何使用,以及使用的需要注意的事情。其有很多方法:

    在这里插入图片描述

    这些都是抽象方法,很多方法都需要实现其接口的子类来是使用,所以还是通过CompletableFuture来实现这些方法。

    CompletableFuture

    主要聊这个这个类,其也是java1.8版本之后新增的一个类,其提供了非常强大的Future的扩展功能。

    • 可以帮助程序员使用异步的时候简化,并且提供了函数式编程的能力,可以通过回调方法得到处理结果,也提供了转换和组合CompletableFuture的方法。
    • 可以实现完成的Future,也可以代表一个完成阶段的CompletionStage,它支持在计算完成以后触发一些函数或执行某些动作。毕竟实现了Future和CompletionStage两个接口。

    现在开始用代码演示吧。毕竟直说理论都懵,而且也不太擅长。

    创建一个CompletableFuture

    CompletableFuture类可以通过new进行创建。

    看一下官网的说法:

    CompletableFuture() 
    创建一个新的不完整的CompletableFuture
    • 1
    • 2

    这个这个构建方法,简单的说就是为了满足java中的对象类可以通过new可以创建一个实例对象,同时也为了方便以后的修改而保留了这个new创建实例对象的能力。但是官网有给出了一个建议,那就是这个一个不完整的CompletableFuture。

    那如何创建创建一个CompletableFuture类呢?看官网API的时候看出CompletableFuture有很多方法可以得到CompletableFuture类。

    在这里插入图片描述

    所以可以看出需要通过方法返回CompletableFuture这个类,而一般使用对是通过静态方法返回

    四大核心静态方法

    在这里插入图片描述

    虽然有七个,但是最核心或者说常用的四个:

    静态方法描述补充
    runAsync(Runnable runnable)返回一个新的CompletableFuture,它在运行给定操作后由运行在 ForkJoinPool.commonPool()中的任务 异步完成。无返回值
    runAsync(Runnable runnable, Executor executor)返回一个新的CompletableFuture,它在运行给定操作之后由在给定执行程序中运行的任务异步完成。无返回值
    supplyAsync(Supplier supplier)返回一个新的CompletableFuture,它通过在 ForkJoinPool.commonPool()中运行的任务与通过调用给定的供应商获得的值 异步完成。有返回值
    supplyAsync(Supplier supplier, Executor executor)返回一个新的CompletableFuture,由给定执行器中运行的任务异步完成,并通过调用给定的供应商获得的值。有返回值

    补充:

    • 其中Supplier是也是新增的四大核心函数式接口,如果不了解可以看下传送阵
    • 如果没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码的,如果指定了线程池,则使用自定义或者特别指定线程池执行的异步代码。

    还是老规矩,开始代码演示:

    • 没有返回值:
    public class test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
         CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(()->{
             System.out.println("Runnable生成的线程"+Thread.currentThread().getName());
             try {
                 TimeUnit.SECONDS.sleep(2);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
         });
        System.out.println(completableFuture.get());
        
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这里插入图片描述

    通过线程的名字可以看出ForkJoinPool.commonPool,可以看出其默认的线程池生成的。

    现在来创建一个自定义的Executor类,试一下:

    public class test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    //       为了演示,直接使用工具类中返回的线程池,而不通过new ThreadPoolExecutor();创建线程池了
            Executor executor= Executors.newFixedThreadPool(3);
    
         CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(()->{
             System.out.println("Runnable生成的线程"+Thread.currentThread().getName());
             try {
                 TimeUnit.SECONDS.sleep(2);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
         },executor);
        System.out.println(completableFuture.get());
        executor.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这里插入图片描述

    • 有返回值:

      因为默认线程池和自定义线程池大多数都雷同,为了减少篇幅所以直接写。

      public class test {
          public static void main(String[] args) throws ExecutionException, InterruptedException {
      //       为了演示,直接使用工具类中返回的线程池,而不通过new ThreadPoolExecutor();创建线程池了
              ExecutorService executor = Executors.newFixedThreadPool(3);
      
              CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                  System.out.println("Runnable生成的线程" + Thread.currentThread().getName());
                  try {
                      TimeUnit.SECONDS.sleep(2);
                  } catch (InterruptedException e) {
                      throw new RuntimeException(e);
                  }
                  return "有返回值了";
              }, executor);
              System.out.println(completableFuture.get());
              executor.shutdown();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18

    在这里插入图片描述

    CompletableFuture优势

    前面一直说Future的接口(前面用FurtureTask演示的)说到其会造成阻塞或者解决方式会消耗CPU性能。而CompletableFuture自然就是为了解决这个问题:CompletableFuture是Future的功能增强版,减少阻塞和轮询。可以传入回调对象,当异步任务完成或者发生异常的时候,会自动调用回调对象的回调方法。

    既然要是加强版,自然也会有其原来就有的能力。

    来一个原理有的能力演示:

    public class test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    //       为了演示,直接使用工具类中返回的线程池,而不通过new ThreadPoolExecutor();创建线程池了
            ExecutorService executor = Executors.newFixedThreadPool(3);
    
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println("Runnable生成的线程" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "有返回值了";
            }, executor);
            System.out.println("运行主线程");
            System.out.println(completableFuture.get());
            System.out.println("运行完毕");
            executor.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这里插入图片描述

    如果这样写,运动的话,可以看出其,还是有阻塞这个弊端的。

    不过仔细看方法的话,还有一个join方法:

    get和join区别

    CompletableFuture中多了一个方法join,但是看起使用的时候似乎与get方法一样,如果说效果的话,两者没有什么区别,都是返回异步任务的返回值,但是既然说其有区别,自然就有区别。

    通过编译软件可以看出:

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    可以看出其两者最大的区别,就是在编译的时候是否抛出异常,当然如果允许中出现异常join也会抛出的。

    再聊Future的时候来一个对比有戏价格的例子,现在用CompletableFuture体验一下:

    public class test {
        public static void main(String[] args) throws InterruptedException {
            List<NetShop> shoplist = Arrays.asList(
                    new NetShop("拼多多"),
                    new NetShop("京东"),
                    new NetShop("淘宝")
            );
            long start = System.currentTimeMillis();
    
            List<String> pricelist=shoplist.stream().map((shop) ->
                    CompletableFuture.supplyAsync(() ->
                            String.format("塞尔达在" + "在%s中的价格是%s", shop.getName(), shop.getPrice())
                    )
            ).collect(Collectors.toList()).stream().map(cp->cp.join()).collect(Collectors.toList());
            pricelist.forEach(price->{
                System.out.println(price);;
            });
    
    
    
        long end = System.currentTimeMillis();
    
            System.out.println("运行的时间:"+(end -start));
    }
    
    
    }
    
    class NetShop {
        String name;
    
        public NetShop(String name) {
            this.name = name;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    //    直接通过方法得到一个随机价格,而不在写价格属性了
    
        public Double getPrice() {
            Double price = 0d;
            try {
                TimeUnit.SECONDS.sleep(1);
                price = ThreadLocalRandom.current().nextDouble() + 299;
    
            } catch (Exception E) {
                E.printStackTrace();
            } finally {
                return price;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    在这里插入图片描述

    这里体验的时候一定要注意:

    //异步没有生效不知道为什么,这个可能需要再研究一下
    map().map()
    
    // 异步生效了
    map().collect(Collectors.toList().map()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    说实话这个疑问,我目前还没有找到原因。如果找到了以后再补充吧,这里还是提醒。

    而且前面一直说有点像是JavaScript中的Promise所以自然应该有类似的调用。

    因为有很多方法,暂时不说所有的想法,这个时候为了演示CompletableFuture的优势,所以先通过几个方法演示优势,后面具体聊所有的方法。

    在这里插入图片描述

    还是老规矩代码演示:

    public class test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    //       为了演示,直接使用工具类中返回的线程池,而不通过new ThreadPoolExecutor();创建线程池了
            ExecutorService executor = Executors.newFixedThreadPool(3);
    
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println("Runnable生成的线程" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "有返回值了";
            }, executor);
            System.out.println("运行主线程");
    //        这里的两个参数,第一个v是completableFuture的返回值,第二个e是异常或者报错信息
          completableFuture.whenComplete((v,e)->{
                if(e==null){
                    System.out.println("有get的效果得到值:"+v);
                }
            });
            System.out.println("运行完毕");
            executor.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    现在看一下运行效果:

    在这里插入图片描述

    可以看出其没有阻塞主线程的运行,而是异步返回结果在返回的时候得到了。

    然后再来一个有异常的:

    public class test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    //       为了演示,直接使用工具类中返回的线程池,而不通过new ThreadPoolExecutor();创建线程池了
            ExecutorService executor = Executors.newFixedThreadPool(3);
    
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println("Runnable生成的线程" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
               double a= 1/0;
                return "有返回值了";
            }, executor);
            System.out.println("运行主线程");
    //        这里的两个参数,第一个v是completableFuture的返回值,第二个e是异常或者报错信息
          completableFuture.whenComplete((v,e)->{
                if(e==null){
                    System.out.println("有get的效果得到值:"+v);
                }else {
                    System.out.println(e);
                }
            });
            System.out.println("运行完毕");
            executor.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    在这里插入图片描述

    其实调用whenComplete方法可以返回得到一个completableFuture,那么连续用呢?

    public class test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    //       为了演示,直接使用工具类中返回的线程池,而不通过new ThreadPoolExecutor();创建线程池了
            ExecutorService executor = Executors.newFixedThreadPool(3);
    
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println("Runnable生成的线程" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "有返回值了";
            }, executor);
            System.out.println("运行主线程");
    //        这里的两个参数,第一个v是completableFuture的返回值,第二个e是异常或者报错信息
          completableFuture.whenComplete((v,e)->{
                if(e==null){
                    System.out.println("有get的效果得到值:"+v);
                    v=v+" 这里可以修改v,同时将v这个值再次返回";
                }
    
            }).whenComplete((v,e)->{
              System.out.println("第二次 有get的效果得到值:"+v);
          });
            System.out.println("运行完毕");
            executor.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    在这里插入图片描述

    可以看出这个后面第二个的参数v的值,和第一参数v的值似乎是一个值,而且在第一个whenComplete方法中似乎对v值的修改没有传递到后面的whenComplete方法中。

    不过能否称为连续调用呢?自然可以当然不是这种方法。下面开始聊CompletableFuture的方法,其方法太多了,毕竟是强化版。

    方法

    其方法很多,还是那句话,本篇只能聊如何用,而不是去翻看源码的实现。

    获取结果

    这一类方法,都是获取异步的运算的结果;

    方法描述
    public T get()等待这个未来完成的必要,然后返回结果。
    public T get(long timeout, TimeUnit unit)如果有必要等待这个未来完成的给定时间,然后返回其结果(如果有的话)。
    public T join()完成后返回结果值,如果完成异常,则返回(未检查)异常。
    public T getNow(T valueIfAbsent)如果已完成,则返回结果值(或抛出任何遇到的异常),否则返回给定的值IfAbsent。
    public boolean complete(T value)complete方法可以判断异步是否执行完毕,如果执行完毕返回False,然后参数代替返回值。如果未执行完毕则返回ture,然后参数代替返回值。

    这个可以前三个方法就不再具体说了,毕竟前面一直使用,现在简单的演示一下表中的第四个方法。

    public class test {
        public static void main(String[] args) {
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
                int a = 1 / 0;
                return 10;
            });
    
            CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
    
                return 10;
            });
            System.out.println(" 错误运算后的返回值:"+completableFuture1.getNow(4));
            System.out.println(" 正确后的返回值:"+completableFuture2.getNow(4));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述

    其实这个方法,还有一个特点,那就是不会阻塞,因为如果没有返回值可以得到参数值,所以不会阻塞。

    现象演示一下complete方法。

    public class test {
        public static void main(String[] args) {
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return 10;
            });
    
            CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
    
                return 10;
            });
            System.out.println(" 错误运算后的返回值:"+completableFuture1.complete(4)+ "   "+completableFuture1.join());
            System.out.println(" 正确后的返回值:"+completableFuture2.complete(4)+"   "+completableFuture1.join());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在这里插入图片描述

    可以看出complete方法可以判断异步是否执行完毕,如果执行完毕返回False,然后参数代替返回值。如果未执行完毕则返回ture,然后参数代替返回值。

    处理计算结果

    方法描述
    public CompletableFuture thenApply(Function fn)返回一个新的CompletionStage,当此阶段正常完成时,将以该阶段的结果作为所提供函数的参数执行
    public CompletableFuture handle(BiFunction fn)返回一个新的CompletionStage,当此阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数执行。 当完成作为参数时,使用结果(或null如果没有))和此阶段的异常(或null如果没有))调用给定函数。

    现在开始演示,前面一直说CompletableFuture可以将异步串联起来,所以现在试一下:

    public class test {
        public static void main(String[] args) {
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "第一个";
            });
            completableFuture1.thenApply((v)->{
                System.out.println("在第一个Apply中:"+v);
                return  "第二个";
            }).thenApply((v)->{
                System.out.println("在第二个Apply中:"+v);
                return  "第三个";
            }).whenComplete((v,e)->{
             if(e==null){
                 System.out.println("在when中:"+v);
             }
            });
            System.out.println("运行主线程");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在这里插入图片描述

    似乎没有效果啊,因为CompletableFuture是为了解决阻塞的,所以会自动关闭的。所以前面演示的时候才会使用自己创建的线程池。

    现在真正演示:

    public class test {
        public static void main(String[] args) {
            ExecutorService executorService=Executors.newFixedThreadPool(3);
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "第一个";
            },executorService);
            completableFuture1.thenApply((v)->{
                System.out.println("在第一个Apply中:"+v);
                return  "第二个";
            }).thenApply((v)->{
                System.out.println("在第二个Apply中:"+v);
                return  "第三个";
            }).whenComplete((v,e)->{
             if(e==null){
                 System.out.println("在when中:"+v);
             }
            });
            System.out.println("运行主线程");
            executorService.shutdown();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在这里插入图片描述

    可以看出现在可以将异步作为一个串行来处理了。

    现在再来一个实验那就是如果中级报错:

    public class test {
        public static void main(String[] args) {
            ExecutorService executorService=Executors.newFixedThreadPool(3);
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "第一个";
            },executorService);
            completableFuture1.thenApply((v)->{
                System.out.println("在第一个Apply中:"+v);
                int a=1/0;
                return  "第二个";
            }).thenApply((v)->{
                System.out.println("在第二个Apply中:"+v);
                return  "第三个";
            }).whenComplete((v,e)->{
             if(e==null){
                 System.out.println("在when中:"+v);
             }
            }).exceptionally(e->{
    //            这个可以获得异常的数据
                System.out.println("捕获异常----"+e);
                return  null;
            });
            System.out.println("运行主线程");
            executorService.shutdown();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在这里插入图片描述

    可以看出如果thenApply中出现错误,就会将串行截断,而这个时候就需要使用handle方法。

    演示:

    public class test {
        public static void main(String[] args) {
            ExecutorService executorService=Executors.newFixedThreadPool(3);
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "第一个";
            },executorService);
            completableFuture1.handle((f,v)->{
                System.out.println("在第一个handle中:"+v);
                int a=1/0;
                return  "第二个";
            }).handle((f,v)->{
                System.out.println("在第二个handle中:"+v);
                return  "第三个";
            }).whenComplete((v,e)->{
             if(e==null){
                 System.out.println("在when中:"+v);
             }
            }).exceptionally(e->{
    //            这个可以获得异常的数据
                System.out.println("捕获异常----"+e);
                return  null;
            });
            System.out.println("运行主线程");
            executorService.shutdown();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    在这里插入图片描述

    消费计算结果

    方法描述
    public CompletableFuture thenAccept(Consumer action)返回一个新的CompletionStage,当此阶段正常完成时,将以该阶段的结果作为提供的操作的参数执行

    看参数就指定,这是一个消费者这接口所以可以说这个没有返回值。所以老规矩还是直接演示代码吧:

    public class test {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "第一个";
            }, executorService);
            completableFuture1.thenApply((v) -> {
                System.out.println("thenApply----:" + v);
    
                return "第二个";
            }).thenAccept((v) -> {
    //            其没有返回值所以无需返回值 这个也可以看出其与thenApply的区别了
                System.out.println("thenAccept----:" + v);
            });
            System.out.println("运行主线程");
            executorService.shutdown();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    在这里插入图片描述

    这个可以看出了threnApply和thenAccept两个方法的区别了,既然说区别,就再补充一个方法吧,那就是很相似的一个方法thenRun.

    方法描述
    public CompletableFuture thenRun(Runnable action)返回一个新的CompletionStage,当此阶段正常完成时,执行给定的操作.不过可以看出其异步中也不会返回数据。

    不过其实际就是重写开启了一个线程。而这个线程与上一个线程没有任何关系。

    public class test {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(200);
                    System.out.println("completableFuture1-------");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "第一个";
            }, executorService);
            completableFuture1.thenRun(()->{
                System.out.println("thenRun-------");
            });
            System.out.println("运行主线程");
            executorService.shutdown();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这里插入图片描述

    运算速度选用

    方法描述
    public CompletableFuture applyToEither(CompletionStage other,
    Function fn)
    返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,执行相应的结果作为提供的函数的参数。

    其实这个看的有点懵,还是代码演示:

    public class test {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(200);
                    System.out.println("completableFuture1-------");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "completableFuture1";
            }, executorService);
            CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(100);
                    System.out.println("completableFuture2-------");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "completableFuture2";
            }, executorService);
    
    
    
            completableFuture1.acceptEither(completableFuture2,(v)->{
                System.out.println("acceptEither调用运行时间短的-------"+v);
            });
            System.out.println("运行主线程");
            executorService.shutdown();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在这里插入图片描述

    可以看出completableFuture1设置的睡眠时间长,所以acceptEither选择运行了completableFuture2的结果。所以可以看出调用的completableFuture对比一些参数第一个completableFuture。两者谁运行的快然后用谁的运行结果。

    运算结果合并

    方法描述
    public CompletableFuture thenCombine(CompletionStage other,
    BiFunction fn)
    返回一个新的CompletionStage,当这个和另一个给定的阶段都正常完成时,两个结果作为提供函数的参数执行。

    这个其实是将两个异步任务的结果一起交给thenCombine进行处理,先完成的等待,等待其它的分支,然后再一起合并处理。

    public class test {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(200);
                    System.out.println("completableFuture1-------");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "completableFuture1";
            }, executorService);
            CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(300);
                    System.out.println("completableFuture2-------");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "completableFuture2";
            }, executorService);
    
            completableFuture1.thenCombine(completableFuture2,(v1,v2)->{
                String cb="合并两个结果-------"+v1+"  "+v2;
    
                return  cb;
            }).thenAccept((v)->{
                System.out.println(v);
            });
            System.out.println("运行主线程");
            executorService.shutdown();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    在这里插入图片描述

    前面的确说一些方法,但是与官网的API还是差太多了,现在看一下有多少方法;

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    前面虽然聊了不少方法,如果一对比似乎等于说了九头牛中的一根毛,但是仔细观看的的会发现很多方法,似乎多了个Async。现在举一个例子,比如thenApply和thenApplyAsync。这两者有什么区别吗?下面说。

    补充-线程池选择

    其实如果查询Async翻译的话 :

    在这里插入图片描述

    这个时候就有点懵了,completableFuture本身不是异步任务吗?咋地又来一个异步?

    这个如何理解呢,还是老规矩代码演示:

    public class test {
        public static void main(String[] args) {
    
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName());
                return "第一个";
            }).thenApply((v)->{
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("在第一个Apply中:"+v);
                System.out.println(Thread.currentThread().getName());
                return  "第二个";
            }).thenApply((v)->{
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }            
                System.out.println(Thread.currentThread().getName());
                System.out.println("在第二个Apply中:"+v);
                return  "第三个";
            }) ;
    
            System.out.println(completableFuture1.join());
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    在这里插入图片描述

    其实这个打印出的 线程池的来源,这个前面也演示过,这里不过是再演示一下而已。

    public class test {
        public static void main(String[] args) {
            ExecutorService executorService=Executors.newFixedThreadPool(3);
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
    
                System.out.println(Thread.currentThread().getName());
                return "第一个";
            },executorService).thenApply((v)->{
    
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("在第一个Apply中:"+v);
                System.out.println(Thread.currentThread().getName());
                return  "第二个";
            }).thenApply((v)->{
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName());
                System.out.println("在第二个Apply中:"+v);
                return  "第三个";
            }) ;
    //        阻塞一下,不然没有效果
            System.out.println(completableFuture1.join());
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在这里插入图片描述

    上面两个方法,有点重复,但是又不得不列出,现在再将thenApply编程thenApplyAsync。

    public class test {
        public static void main(String[] args) {
            ExecutorService executorService=Executors.newFixedThreadPool(3);
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
    
                System.out.println(Thread.currentThread().getName());
                return "第一个";
            },executorService).thenApplyAsync((v)->{
    
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("在第一个Apply中:"+v);
                System.out.println(Thread.currentThread().getName());
                return  "第二个";
            }).thenApplyAsync((v)->{
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName());
                System.out.println("在第二个Apply中:"+v);
                return  "第三个";
            }) ;
    //        阻塞一下,不然没有效果
            System.out.println(completableFuture1.join());
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在这里插入图片描述

    可以看出带上Async,其后面调用的异步方式的线程池变了。

    由此可以总结:

    • 如果不传递自定义线程池,都是默认的ForkJoinPool。
    • 如果执行的第一个任务的时候传递了一个自定义的线程池,现在可以看出有两种个情况。
      • 如果执行的的不带有Async执行第二个任务时,则第二个任务和第一个任务共用一个自定义线程池。
      • 如果带有Async执行第二个任务时,则第一个任务使用自定义线程池,但是如果带有Async那就是使用的是ForkJoin线程池,而其后面是否带有Async都是ForkJoin线程池了。

    不过又要补充一点,未来防止出现问题,我使用线程休眠,如果取消掉呢?

    public class test {
        public static void main(String[] args) {
            ExecutorService executorService=Executors.newFixedThreadPool(3);
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
    
                System.out.println(Thread.currentThread().getName());
                return "第一个";
            },executorService).thenApply((v)->{
                System.out.println("在第一个Apply中:"+v);
                System.out.println(Thread.currentThread().getName());
                return  "第二个";
            }).thenApply((v)->{
                System.out.println(Thread.currentThread().getName());
                System.out.println("在第二个Apply中:"+v);
                return  "第三个";
            }) ;
    //        阻塞一下,不然没有效果
            System.out.println(completableFuture1.join());
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    在这里插入图片描述

    可以看出:

    因为处理的时间很多,系统优化的切换的原则,使用是不带有Async,但是线程也直接用main线程处理了。

    再用带有Async的试一下:

    public class test {
        public static void main(String[] args) {
            ExecutorService executorService=Executors.newFixedThreadPool(3);
            CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
    
                System.out.println(Thread.currentThread().getName());
                return "第一个";
            },executorService).thenApplyAsync((v)->{
                System.out.println("在第一个Apply中:"+v);
                System.out.println(Thread.currentThread().getName());
                return  "第二个";
            }).thenApplyAsync((v)->{
                System.out.println(Thread.currentThread().getName());
                System.out.println("在第二个Apply中:"+v);
                return  "第三个";
            }) ;
    //        阻塞一下,不然没有效果
            System.out.println(completableFuture1.join());
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    在这里插入图片描述

    可以看出这样调用的不会使用main线程,这个就需要看一下源码,来查看问什么?

    看一下:

       public <U> CompletableFuture<U> thenApplyAsync(
            Function<? super T,? extends U> fn) {
            return uniApplyStage(asyncPool, fn);
        }
    
    • 1
    • 2
    • 3
    • 4

    可以看出这个是有一个线程池参数asyncPool,既然这里可以放,那就是数一个类属性或者属性所以直接搜索然后看:

      private static final boolean useCommonPool =
            (ForkJoinPool.getCommonPoolParallelism() > 1);
    
        /**
         * Default executor -- ForkJoinPool.commonPool() unless it cannot
         * support parallelism.
         */
    //  三目运算符因为useCommonPool上面已经定义,所以使用了ForkJoinPool
        private static final Executor asyncPool = useCommonPool ?
            ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    
        /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
        static final class ThreadPerTaskExecutor implements Executor {
            public void execute(Runnable r) { new Thread(r).start(); }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    既然调用ForkJoinPool线程池,那么ForkJoinPool自然尤其优势,这个后面具体聊,篇幅够长了。

  • 相关阅读:
    julia 笔记目录
    免费SSL证书
    python学习笔记——文件
    大学生第一款浏览器怎么选,这款浏览器适合学生用
    应急响应LINUX&Windows
    玩转nginx的配置文件3
    uniapp上使用document方案之renderjs
    Micro-OLED(硅基OLED)的工艺简介
    数据挖掘小参考资料推荐
    【云原生】Docker网络
  • 原文地址:https://blog.csdn.net/u011863822/article/details/126870498