• 并发编程之CompletableFuture全网最细最全用法(一)


    CompletableFuture用法

    从JDK 8开始,在Concurrent包中提供了一个强大的异步编程工具CompletableFuture。在JDK8之前,异步编程可以通过线程池和Future来实现,但功能还不够强大。
    demo1,示例代码:

    public class CompletableFutureDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
        
            CompletableFuture<String> future = new CompletableFuture();
            new Thread() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 另一个线程执行任务,将结果赋值给future
                    future.complete("hello 伟大的架构师");
                }
            }.start();
    
            System.out.println("任务已经提交");
    
            // 阻塞的方法
            String result = future.get();
            System.out.println(result);
    
        }
    }
    

    CompletableFuture实现了Future接口,所以它也具有Future的特性:调用get()方法会阻塞在那,直到结果返回。
    另外1个线程调用complete方法完成该Future,则所有阻塞在get()方法的线程都将获得返回结果。

    以下是各api的详细详解

    1.runAsync与supplyAsync

    例1:runAsync(Runnable)
     public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 通过异步的方式给future指派任务,future没有返回值
            CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务执行完毕");
                }
            });
    
            Object o = future.get();
            System.out.println(o);
    
        }
    

    CompletableFuture.runAsync(…)传入的是一个Runnable接口

    例2:supplyAsync(Supplier)
    public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return "异步线程返回结果";
                }
            });
    
            String result = future.get();
            System.out.println(result);
        }
    

    例2和例1的区别在于,例2的任务有返回值。没有返回值的任务,提交的是Runnable,返回的是
    CompletableFuture;有返回值的任务,提交的是 Supplier,返回的是CompletableFuture。Supplier和前面的Callable很相似。
    通过上面两个例子可以看出,在基本的用法上,CompletableFuture和Future很相似,都可以提交
    两类任务:一类是无返回值的,另一类是有返回值的。

    2.thenRun、thenAccept和thenApply

    对于 Future,在提交任务之后,只能调用 get()等结果返回;但对于 CompletableFuture,可以在结果上面再加一个callback,当得到结果之后,再接着执行callback。

    例1:thenRun(Runnable)
      public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "hello 阿瑞";
                }
            });
    
            CompletableFuture<Void> voidCompletableFuture = future.thenRun(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务执行结束后的代码执行");
                }
            });
    
            voidCompletableFuture.get();
            System.out.println("任务执行结束");
        }
    

    该案例最后不能获取到结果,只会得到一个null。

    例2:thenAccept(Consumer)
    public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "hello 伟大的架构师";
                }
            }).thenAccept(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    // 可以获取上个任务的执行结果,接着进行处理
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(s.length());
                }
            });
    
            Void aVoid = future.get();
            System.out.println(aVoid);
        }
    

    上述代码在thenAccept中可以获取任务的执行结果,接着进行处理。

    例3:thenApply(Function)
    public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "hello arui";
                }
            }).thenApply(new Function<String, Integer>() {
                @Override
                public Integer apply(String s) {
                    // 接收上个任务的返回值,接着处理,同时将处理结果返回给future
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return s.length();
                }
            });
    
            Integer integer = future.get();
            System.out.println(integer);
        }
    

    三个例子都是在任务执行完成之后,接着执行回调,只是回调的形式不同:

    1. thenRun后面跟的是一个无参数、无返回值的方法,即Runnable,所以最终的返回值是
      CompletableFuture类型。
    2. thenAccept后面跟的是一个有参数、无返回值的方法,称为Consumer,返回值也是
      CompletableFuture类型。顾名思义,只进不出,所以称为Consumer;前面的Supplier 是无参数,有返回值,只出不进,和Consumer刚好相反。
    3. thenApply 后面跟的是一个有参数、有返回值的方法,称为Function。返回值是CompletableFuture类型。而参数接收的是前一个任务,即 supplyAsync(…)这个任务的返回值。因此这里只能用supplyAsync,不能用runAsync。因为runAsync没有返回值,不能为下一个链式方法传入参数。

    3. thenCompose与thenCombine

    例1:thenCompose

    在上面的例子中,thenApply接收的是一个Function,但是这个Function的返回值是一个通常的基本数据类型或一个对象,而不是另外一个CompletableFuture。如果 Function 的返回值也是一个CompletableFuture,就会出现嵌套的CompletableFuture。考虑下面的例子:

      public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<CompletableFuture<Integer>> future
                    = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    return "hello lagou";
                }
            }).thenApply(new Function<String, CompletableFuture<Integer>>() {
                @Override
                public CompletableFuture<Integer> apply(String s) {
                    return CompletableFuture.supplyAsync(new Supplier<Integer>() {
                        @Override
                        public Integer get() {
                            return s.length();
                        }
                    });
                }
            });
    
            Integer integer = future.get().get();
            System.out.println(integer);
        }
    

    如果希望返回值是一个非嵌套的CompletableFuture,可以使用thenCompose:

     public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    return "返回结果";
                }
            }).thenCompose(new Function<String, CompletableFuture<Integer>>() {
                @Override
                public CompletableFuture<Integer> apply(String s) {
                    return CompletableFuture.supplyAsync(new Supplier<Integer>() {
                        @Override
                        public Integer get() {
                            return s.length();
                        }
                    });
                }
            });
    
            System.out.println(future.get());
        }
    

    下面是thenCompose方法的接口定义:
    在这里插入图片描述
    CompletableFuture中的实现:
    在这里插入图片描述
    从该方法的定义可以看出,它传入的参数是一个Function类型,并且Function的返回值必须是
    CompletionStage的子类,也就是CompletableFuture类型

    例2:thenCombine

    thenCombine方法的接口定义如下,从传入的参数可以看出,它不同于thenCompose。
    在这里插入图片描述
    第1个参数是一个CompletableFuture类型,第2个参数是一个方法,并且是一个BiFunction,也就
    是该方法有2个输入参数,1个返回值。
    从该接口的定义可以大致推测,它是要在2个 CompletableFuture 完成之后,把2个
    CompletableFuture的返回值传进去,再额外做一些事情。实例如下:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    return "hello";
                }
            }).thenCombine(CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    return "hello1";
                }
            }), new BiFunction<String, String, Integer>() {
                @Override
                public Integer apply(String s, String s2) {
                    System.out.println("s = " + s);
                    System.out.println("s2 = " + s2);
                    return s.length() + s2.length();
                }
            });
    
            System.out.println(future.get());
        }
    

    4.任意个CompletableFuture的组合(allOf,anyOf)

    上面的thenCompose和thenCombine只能组合2个CompletableFuture,而接下来的allOf 和anyOf 可以组合任意多个CompletableFuture。方法接口定义如下所示。
    在这里插入图片描述
    在这里插入图片描述
    首先,这两个方法都是静态方法,参数是变长的CompletableFuture的集合。其次,allOf和anyOf
    的区别,前者是“与”,后者是“或”。
    allOf的返回值是CompletableFuture类型,这是因为每个传入的CompletableFuture的返回
    值都可能不同,所以组合的结果是无法用某种类型来表示的,索性返回Void类型。
    anyOf 的含义是只要有任意一个 CompletableFuture 结束,就可以做接下来的事情,而无须像
    AllOf那样,等待所有的CompletableFuture结束。
    但由于每个CompletableFuture的返回值类型都可能不同,任意一个,意味着无法判断是什么类
    型,所以anyOf的返回值是CompletableFuture类型。

     public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture[] futures = new CompletableFuture[10];
    
            for (int i = 0; i < 10; i++) {
                CompletableFuture<Void> myFuture = CompletableFuture.runAsync(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Thread.sleep(1000 + RANDOM.nextInt(1000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        result++;
                    }
                });
    
                futures[i] = myFuture;
            }
    
    //        for (int i = 0; i < 10; i++) {
    //            futures[i].get();
    //            System.out.println(result);
    //        }
    
    //        CompletableFuture future = CompletableFuture.allOf(futures).thenRun(new Runnable() {
    //            @Override
    //            public void run() {
    //                System.out.println("计算完成");
    //            }
    //        });
    //
    //        future.get();
    //        System.out.println(result);
    
    
            CompletableFuture myfuture = CompletableFuture.anyOf(futures).thenRun(new Runnable() {
                @Override
                public void run() {
                    System.out.println(result);
                }
            });
    
            myfuture.get();
        }
    

    5. 四种任务原型

    通过上面的例子可以总结出,提交给CompletableFuture执行的任务有四种类型:Runnable、Consumer、Supplier、Function。下面是这四种任务原型的对比。
    | 四种任务原型 | 无参数|有参数|
|:--------:| -------------:|-------------:|
| 无返回值 | Runnable接口对应的提交方法:runAsync,thenRun|Consumer接口对应的提交方法:thenAccept|

    runAsync 与 supplierAsync 是 CompletableFuture 的静态方法;而 thenAccept、thenAsync、thenApply是CompletableFutre的成员方法。
    因为初始的时候没有CompletableFuture对象,也没有参数可传,所以提交的只能是Runnable或者Supplier,只能是静态方法;通过静态方法生成CompletableFuture对象之后,便可以链式地提交其他任务了,这个时候就可以提交Runnable、Consumer、Function,且都是成员方法。

  • 相关阅读:
    QT 自学内容 day05 自定义的控件封装,定时器的两种使用方法,event 事件分发器,事件过滤器!
    XCTF-web1文件包含绕过file include
    MyBatis一对多关联查询
    三级分类部分三级目录无法加载,后端接口能在前端返回所有数据
    OpenGL获取GPU信息
    Java集合面试题
    【一文清晰】单元测试到底是什么?应该怎么做?
    备份解决方案介绍
    结构光相机原理
    【节能学院】Acrel5000web能耗系统在某学院的应用
  • 原文地址:https://blog.csdn.net/Quasimodo24/article/details/126952541