• java8 CompletableFuture: 组合式异步编程


    Future 接口

    Future接口在Java 5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模

    了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在

    Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,

    不再需要呆呆等待耗时的操作完成。

    1. @Test
    2. public void testFuture() {
    3. ExecutorService executor = Executors.newCachedThreadPool();
    4. Future future = executor.submit(new Callable() {
    5. public Double call() {
    6. return doSomeLongComputation();
    7. }});
    8. doSomethingElse();
    9. try {
    10. Double result = future.get(1, TimeUnit.SECONDS);
    11. System.out.println(result);
    12. } catch (ExecutionException ee) {
    13. // 计算抛出一个异常
    14. } catch (InterruptedException ie) {
    15. // 当前线程在等待过程中被中断
    16. } catch (TimeoutException te) {
    17. // 在Future对象完成之前超过已过期
    18. }
    19. }
    20. private Double doSomeLongComputation() {
    21. try {
    22. Thread.sleep(1000);
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. return 0.0;
    27. }
    28. private void doSomethingElse() {
    29. try {
    30. Thread.sleep(500);
    31. } catch (InterruptedException e) {
    32. e.printStackTrace();
    33. }
    34. System.out.println("doSomethingElse");
    35. }

    Future 接口的局限性

    通过例子,我们知道Future接口提供了方法来检测异步计算是否已经结束(使用

    isDone方法),等待异步操作结束,以及获取计算的结果。但是这些特性还不足以让你编写简洁

    的并发代码。比如,我们很难表述Future结果之间的依赖性;从文字描述上这很简单,“当长时

    间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都

    完成后,将计算的结果与另一个查询操作结果合并”。但是,使用Future中提供的方法完成这样

    的操作又是另外一回事。这也是我们需要更具描述能力的特性的原因,比如下面这些。

    1: 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第

    一个的结果。

    2:  等待Future集合中的所有任务都完成。

    3:  仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同

    一个值),并返回它的结果。

    4: 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。

    5: 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future

    计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。

    使用 CompletableFuture 构建异步应用

    1. @Test
    2. public void testGetPriceAsync() {
    3. CompletableFuture futurePrice = new CompletableFuture<>();
    4. new Thread( () -> {
    5. double price = calculatePrice("Apple");
    6. futurePrice.complete(price);
    7. }).start();
    8. doSomethingElse();
    9. try {
    10. Double result = futurePrice.get();
    11. System.out.println(result);
    12. } catch (ExecutionException | InterruptedException ee) {
    13. // 计算抛出一个异常
    14. }
    15. }
    16. private Double calculatePrice(String product) {
    17. Double value = 0.0;
    18. try {
    19. Thread.sleep(1000);
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. switch (product) {
    24. case "Apple":
    25. value = 10.0;
    26. break;
    27. case "Banana":
    28. value = 5.0;
    29. break;
    30. default:
    31. break;
    32. }
    33. return value;
    34. }

    使用 CompletableFuture 发起异步请求

    1. @Test
    2. public void TestFindPrice() {
    3. List<String> list = Arrays.asList("Apple", "Banana");
    4. List<CompletableFuture<String>> priceFuture =
    5. list.stream()
    6. .map(a -> CompletableFuture.supplyAsync(
    7. () -> a + " price is:" + calculatePrice(a)))
    8. .collect(Collectors.toList());
    9. List<String> resultList = priceFuture.stream()
    10. .map(CompletableFuture::join)
    11. .collect(Collectors.toList());
    12. resultList.forEach(System.out::println);
    13. }

    构造同步和异步操作

    1. @Test
    2. public void testFindPrice2() {
    3. List<String> list = Arrays.asList("Apple", "Banana");
    4. ExecutorService executor = Executors.newCachedThreadPool();
    5. List<CompletableFuture<String>> priceFutures =
    6. list.stream()
    7. .map(a -> CompletableFuture.supplyAsync(
    8. () -> calculatePrice(a), executor))
    9. .map(future -> future.thenApply(Quote::parse))
    10. .map(future -> future.thenCompose(quote ->
    11. CompletableFuture.supplyAsync(
    12. () -> Discount.applyDiscount(quote), executor)))
    13. .collect(Collectors.toList());
    14. List<String> resultList = priceFutures.stream()
    15. .map(CompletableFuture::join)
    16. .collect(Collectors.toList());
    17. resultList.forEach(System.out::println);
    18. }
    19. static class Quote {
    20. public static Double parse(Double val) {
    21. return val;
    22. }
    23. }
    24. static class Discount {
    25. public static String applyDiscount(Double val) {
    26. try {
    27. Thread.sleep(200);
    28. } catch (InterruptedException e) {
    29. e.printStackTrace();
    30. }
    31. return "price is:" + val;
    32. }
    33. }

    将两个 CompletableFuture 对象整合起来,无论它们是否存在依赖

    1. @Test
    2. public void testFindPrice3() {
    3. Future<Double> futurePrice =
    4. CompletableFuture.supplyAsync(() -> calculatePrice("Apple"))
    5. .thenCombine(
    6. CompletableFuture.supplyAsync(
    7. () -> getRate()),
    8. (price, rate) -> price * rate
    9. );
    10. try {
    11. Double result = futurePrice.get();
    12. System.out.println(result);
    13. } catch (ExecutionException | InterruptedException ee) {
    14. // 计算抛出一个异常
    15. }
    16. }
    17. private Double getRate() {
    18. return 0.8;
    19. }

    《手掌与疾病》

  • 相关阅读:
    卷积神经网络提取图像特征的操作是怎样完成的
    一个快速切换一个底层实现的思路分享
    python接口自动化测试(单元测试方法)
    java基于springboot+Vue图片分享社区网站
    【一些理解】搜广推:推荐、广告、搜索算法的区别、入坑?
    Redis中的Zset类型
    一个程序员的晋升之路
    https跟http有什么区别?
    博途PLC和MATLAB矩阵运算存储方法对比
    每日一题day7-1652. 拆炸弹
  • 原文地址:https://blog.csdn.net/kan_Feng/article/details/126879280