• 最详细的CompletableFuture异步编程-进阶篇


    1、异步任务的交互

    异步任务交互指 将异步任务获取结果的速度相比较,按一定的规则( 先到先用 )进行下一步处理。

    1.1 applyToEither

    applyToEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步的操作。

    CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)

    演示案例:使用最先完成的异步任务的结果

    1. public class ApplyToEitherDemo {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. // 开启异步任务1
    4. CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    5. int x = new Random().nextInt(3);
    6. CommonUtils.sleepSecond(x);
    7. CommonUtils.printThreadLog("任务1耗时:" + x + "秒");
    8. return x;
    9. });
    10. // 开启异步任务2
    11. CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    12. int y = new Random().nextInt(3);
    13. CommonUtils.sleepSecond(y);
    14. CommonUtils.printThreadLog("任务2耗时:" + y + "秒");
    15. return y;
    16. });
    17. // 哪些异步任务的结果先到达,就使用哪个异步任务的结果
    18. CompletableFuture<Integer> future = future1.applyToEither(future2, (result -> {
    19. CommonUtils.printThreadLog("最先到达的结果:" + result);
    20. return result;
    21. }));
    22. // 主线程休眠4秒,等待所有异步任务完成
    23. CommonUtils.sleepSecond(4);
    24. Integer ret = future.get();
    25. CommonUtils.printThreadLog("ret = " + ret);
    26. }
    27. }

    速记心法:任务1、任务2就像两辆公交,哪路公交先到,就乘坐(使用)哪路公交。

    以下是applyToEither 和其对应的异步回调版本

    1. CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)
    2. CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func)
    3. CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func,Executor executor)

    1.2 acceptEither

    acceptEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步操作 ( 消费使用 )。

    1. CompletableFuture<Void> acceptEither(CompletableFuture<T> other, Consumer<T> action)
    2. CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action)
    3. CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action,Executor executor)

    演示案例:使用最先完成的异步任务的结果

    1. public class AcceptEitherDemo {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. // 异步任务交互
    4. CommonUtils.printThreadLog("main start");
    5. // 开启异步任务1
    6. CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    7. int x = new Random().nextInt(3);
    8. CommonUtils.sleepSecond(x);
    9. CommonUtils.printThreadLog("任务1耗时:" + x + "秒");
    10. return x;
    11. });
    12. // 开启异步任务2
    13. CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    14. int y = new Random().nextInt(3);
    15. CommonUtils.sleepSecond(y);
    16. CommonUtils.printThreadLog("任务2耗时:" + y + "秒");
    17. return y;
    18. });
    19. // 哪些异步任务的结果先到达,就使用哪个异步任务的结果
    20. future1.acceptEither(future2,result -> {
    21. CommonUtils.printThreadLog("最先到达的结果:" + result);
    22. });
    23. // 主线程休眠4秒,等待所有异步任务完成
    24. CommonUtils.sleepSecond(4);
    25. CommonUtils.printThreadLog("main end");
    26. }
    27. }

    1.3 runAfterEither

    如果不关心最先到达的结果,只想在有一个异步任务先完成时得到完成的通知,可以使用 runAfterEither() ,以下是它的相关方法:

    1. CompletableFuture<Void> runAfterEither(CompletableFuture<T> other, Runnable action)
    2. CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action)
    3. CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action, Executor executor)
    提示
    异步任务交互的三个方法和之前学习的异步的回调方法 thenApply、thenAccept、thenRun 有异曲同工之妙。

    2、get() 和 join() 区别

    get() 和 join() 都是CompletableFuture提供的以阻塞方式获取结果的方法。

    那么该如何选用呢?请看如下案例:

    1. public class GetOrJoinDemo {
    2. public static void main(String[] args) {
    3. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    4. return "hello";
    5. });
    6. String ret = null;
    7. // 抛出检查时异常,必须处理
    8. try {
    9. ret = future.get();
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. } catch (ExecutionException e) {
    13. e.printStackTrace();
    14. }
    15. System.out.println("ret = " + ret);
    16. // 抛出运行时异常,可以不处理
    17. ret = future.join();
    18. System.out.println("ret = " + ret);
    19. }
    20. }

    使用时,我们发现,get() 抛出检查时异常 ,需要程序必须处理;而join() 方法抛出运行时异常,程序可以不处理。所以,join() 更适合用在流式编程中。

    3、ParallelStream VS CompletableFuture

    CompletableFuture 虽然提高了任务并行处理的能力,如果它和 Stream API 结合使用,能否进一步多个任务的并行处理能力呢?

    同时,对于 Stream API 本身就提供了并行流ParallelStream,它们有什么不同呢?

    我们将通过一个耗时的任务来体现它们的不同,更重要地是,我们能进一步加强 CompletableFuture 和 Stream API 的结合使用,同时搞清楚CompletableFuture 在流式操作的优势

    需求:创建10个MyTask耗时的任务,统计它们执行完的总耗时

    定义一个MyTask类,来模拟耗时的长任务

    1. public class MyTask {
    2. private int duration;
    3. public MyTask(int duration) {
    4. this.duration = duration;
    5. }
    6. // 模拟耗时的长任务
    7. public int doWork() {
    8. CommonUtils.printThreadLog("doWork");
    9. CommonUtils.sleepSecond(duration);
    10. return duration;
    11. }
    12. }

    同时,我们创建10个任务,每个持续1秒。

    1. IntStream intStream = IntStream.range(0, 10);
    2. List<MyTask> tasks = intStream.mapToObj(item -> {
    3. return new MyTask(1);
    4. }).collect(Collectors.toList());

    3.1 并行流的局限

    我们先使用串行执行,让所有的任务都在主线程 main 中执行。

    1. public class SequenceDemo {
    2. public static void main(String[] args) {
    3. // 方案一:在主线程中使用串行执行
    4. // step 1: 创建10个MyTask对象,每个任务持续1s,存入list集合便于启动Stream操作
    5. IntStream intStream = IntStream.range(0, 10);
    6. List<MyTask> tasks = intStream.mapToObj(item -> {
    7. return new MyTask(1);
    8. }).collect(Collectors.toList());
    9. // step 2: 执行tasks集合中的每个任务,统计总耗时
    10. long start = System.currentTimeMillis();
    11. List<Integer> result = tasks.stream().map(myTask -> {
    12. return myTask.doWork();
    13. }).collect(Collectors.toList());
    14. long end = System.currentTimeMillis();
    15. double costTime = (end - start) / 1000.0;
    16. System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
    17. }
    18. }

    它花费了10秒, 因为每个任务在主线程一个接一个的执行。

    因为涉及 Stream API,而且存在耗时的长任务,所以,我们可以使用 parallelStream()

    1. public class ParallelDemo {
    2. public static void main(String[] args) {
    3. // 方案二:使用并行流
    4. // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
    5. IntStream intStream = IntStream.range(0, 10);
    6. List<MyTask> tasks = intStream.mapToObj(item -> {
    7. return new MyTask(1);
    8. }).collect(Collectors.toList());
    9. // step 2: 执行10个MyTask,统计总耗时
    10. long start = System.currentTimeMillis();
    11. List<Integer> results = tasks.parallelStream().map(myTask -> {
    12. return myTask.doWork();
    13. }).collect(Collectors.toList());
    14. long end = System.currentTimeMillis();
    15. double costTime = (end - start) / 1000.0;
    16. System.out.printf("processed %d tasks %.2f second",tasks.size(),costTime);
    17. }
    18. }

    它花费了2秒多,因为此次并行执行使用了8个线程 (7个是ForkJoinPool线程池中的, 一个是 main 线程),需要注意是:运行结果由自己电脑CPU的核数决定。

    3.2 CompletableFuture 在流式操作的优势

    让我们看看使用CompletableFuture是否执行的更有效率

    1. public class CompletableFutureDemo {
    2. public static void main(String[] args) {
    3. // 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时
    4. // 方案三:使用CompletableFuture
    5. // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
    6. IntStream intStream = IntStream.range(0, 10);
    7. List<MyTask> tasks = intStream.mapToObj(item -> {
    8. return new MyTask(1);
    9. }).collect(Collectors.toList());
    10. // step 2: 根据MyTask对象构建10个耗时的异步任务
    11. long start = System.currentTimeMillis();
    12. List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {
    13. return CompletableFuture.supplyAsync(() -> {
    14. return myTask.doWork();
    15. });
    16. }).collect(Collectors.toList());
    17. // step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中
    18. List<Integer> results = futures.stream().map(future -> {
    19. return future.join();
    20. }).collect(Collectors.toList());
    21. long end = System.currentTimeMillis();
    22. double costTime = (end - start) / 1000.0;
    23. System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
    24. }
    25. }

    运行发现,两者使用的时间大致一样。能否进一步优化呢?

    CompletableFutures 比 ParallelStream 优点之一是你可以指定Executor去处理任务。你能选择更合适数量的线程。我们可以选择大于Runtime.getRuntime().availableProcessors() 数量的线程,如下所示:

    1. public class CompletableFutureDemo2 {
    2. public static void main(String[] args) {
    3. // 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时
    4. // 方案三:使用CompletableFuture
    5. // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
    6. IntStream intStream = IntStream.range(0, 10);
    7. List<MyTask> tasks = intStream.mapToObj(item -> {
    8. return new MyTask(1);
    9. }).collect(Collectors.toList());
    10. // 准备线程池
    11. final int N_CPU = Runtime.getRuntime().availableProcessors();
    12. // 设置线程池的数量最少是10个,最大是16
    13. ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), N_CPU * 2));
    14. // step 2: 根据MyTask对象构建10个耗时的异步任务
    15. long start = System.currentTimeMillis();
    16. List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {
    17. return CompletableFuture.supplyAsync(() -> {
    18. return myTask.doWork();
    19. },executor);
    20. }).collect(Collectors.toList());
    21. // step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中
    22. List<Integer> results = futures.stream().map(future -> {
    23. return future.join();
    24. }).collect(Collectors.toList());
    25. long end = System.currentTimeMillis();
    26. double costTime = (end - start) / 1000.0;
    27. System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
    28. // 关闭线程池
    29. executor.shutdown();
    30. }
    31. }

    测试代码时,电脑配置是4核8线程,而我们创建的线程池中线程数最少也是10个,所以,每个线程负责一个任务( 耗时1s ),总体来说,处理10个任务总共需要约1秒。

    3.3 合理配置线程池中的线程数

    正如我们看到的,CompletableFuture 可以更好地控制线程池中线程的数量,而 ParallelStream 不能

    问题1:如何选用 CompletableFuture 和 ParallelStream ?

    如果你的任务是IO密集型的,你应该使用CompletableFuture;

    如果你的任务是CPU密集型的,使用比处理器更多的线程是没有意义的,所以选择ParallelStream ,因为它不需要创建线程池,更容易使用。

    问题2:IO密集型任务和CPU密集型任务的区别?

    CPU密集型也叫计算密集型,此时,系统运行时大部分的状况是CPU占用率近乎100%,I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU 使用率很高。比如说要计算1+2+3+…+ 10万亿、天文计算、圆周率后几十位等, 都是属于CPU密集型程序。

    CPU密集型任务的特点:大量计算,CPU占用率一般都很高,I/O时间很短

    IO密集型指大部分的状况是CPU在等I/O (硬盘/内存) 的读写操作,但CPU的使用率不高。

    简单的说,就是需要大量的输入输出,例如读写文件、传输文件、网络请求。

    IO密集型任务的特点:大量网络请求,文件操作,CPU运算少,很多时候CPU在等待资源才能进一步操作。

    问题3:既然要控制线程池中线程的数量,多少合适呢?

    如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 Ncpu+1

    如果是IO密集型任务,参考值可以设置为 2 * Ncpu,其中Ncpu 表示 核心数。

    注意的是:以上给的是参考值,详细配置超出本次课程的范围,选不赘述。

  • 相关阅读:
    SpringBoot测试实践
    深度解读:金融企业容器云平台存储如何选型
    Jenkins Pipeline
    数电学习(六、时序逻辑电路)(二)
    基于ssm的学生综合测评管理系统047
    4.3 基于注解的声明式事务和基于XML的声明式事务
    Jupyter Notebook 如何切换虚拟环境
    flink: 从kafka读取数据
    Elastic Observability 8.11:ES|QL、APM 中的通用分析和增强的 SLOs
    rust 多线程
  • 原文地址:https://blog.csdn.net/mxt51220/article/details/132805305