• 【函数式编程实战】(十一) CompletableFuture、反应式编程源码解析与实战


    前言
    📫 作者简介:小明java问道之路,专注于研究计算机底层,就职于金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的设计和架构📫 
    🏆 Java领域优质创作者、阿里云专家博主、华为云专家🏆
    🔥 如果此文还不错的话,还请👍关注、点赞、收藏三连支持👍一下博主哦

    目录

    导读

    一、同步与异步

    1、为什么要有异步

    2、什么是同步?什么是异步?

    二、Future异步编程

    1、Future 接口

    2、Future 接口的使用

    3、Future 接口的缺陷(局限性)

    三、CompletableFuture 接口详解

    1、CompletableFuture 的创建

    2、CompletableFuture.supplyAsync 源码分析

    3、CompletableFuture.runAsync源码分析

    4、CompletableFuture API实战

    四、反应式编程

    1、什么是反应式(resctive)编程

    2、反应式流(Flow API)

    2.1、发布订阅模式

    2.2、Flow API 源码解析

    2.3、Flow API 实战

    总结


    本文导读

    Java代码为了更好的发展和性能,开发了 异步编程的模式,Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

    一、同步与异步

    1、为什么要有异步

    在Java发展的这20年,他只做了一件事不被淘汰,为了不被淘汰不断的更新jdk的版本,以便使用计算机硬件、操作系统以及新的编程概念。

    Java一开始提供了 synchronized 锁、Runable,后面java5有引入了 java.util.concurrent 包,java7中的 forkjoin 框架 java.util.concurrent.RecursiveTask,到java8中Stream流、lambda表达式的支持,这一切都是为了支持高并发。

    即便如此,多线程虽然极大的提升了性能,如果合理的使用线程池的话,好处,第一可以降低资源消耗,重复利用已创建的线程;第二:提高响应速度,任务可以不需要等到线程创建就能立即执行;第三:提高线程的可管理性。统一分配、调优和监控。但是线程池也不是没有缺点,使用k个线程的线程池就只能并发的执行k个任务,其他任务还是回休眠或者阻塞

    这时候如果有线程不和其他任务相关联,又可以不用阻塞,就好了。Java8考虑到了,充分发挥了计算机硬件的处理能力,异步API 应运而生。

    2、什么是同步?什么是异步?

    同步就是 a 程序强依赖 b 程序,我必须等到你的回复或者执行完毕,才能做出下一步响应,类似于编程中程序被解释器(JVM)顺序执行一样(加载 > 验证 > 准备 > 解析 > 初始化);

    异步则相反,a 程序不强依赖 b 程序,响应的时间也无所谓,无论你返回还是不返回,a 程序都能继续运行,也就是说我不存在等待对方的概念,a 程序就是 异步非阻塞的。

    下面举一个例子就说明什么是同步、什么是异步

    异步编程涉及两种风格,Future风格API 和反应式风格API ,Future fun(int a){},fun( a , x-> {}),这两个模式的实战会在后面小结讲解。

    、Future异步编程

    1、Future 接口

    Java5中就引入了Future 接口,他的涉及初衷就是异步计算,例如我们结算一个商户下的所有订单,这个时候并不需要for循环去累加,Future 接口使用的时候只需要封装 Callable中,再提交给ExecutorService。

    2、Future 接口的使用

    看下Java8之前是如何使用异步的

    1. public static void main(String[] args) throws Exception {
    2. List orderInfos = Arrays.asList(new OrderInfo("123", BigDecimal.ONE),
    3. new OrderInfo("456", BigDecimal.TEN), new OrderInfo("789", BigDecimal.TEN));
    4. // 创建 ExecutorService 通过它可以向线程池提交任务
    5. ExecutorService executorService = Executors.newCachedThreadPool();
    6. // 异步操作的同时,可以进行其他操作
    7. Future decimalFuture = executorService.submit(new Callable() {
    8. @Override
    9. public BigDecimal call() throws Exception {
    10. return reduceAmt(orderInfos);
    11. }
    12. });
    13. // Java8 写法
    14. Future decimalFuture = executorService.submit(() -> reduceAmt(orderInfos));
    15. System.out.println(decimalFuture.get());
    16. }
    17. private static BigDecimal reduceAmt(List orderInfos) {
    18. return orderInfos.stream()
    19. .map(OrderInfo::getOrderAmt)
    20. .reduce(BigDecimal.ZERO, BigDecimal::add);
    21. }

    异步编程可以在 ExecutorService 中,以并发的方式调用另一个线程执行操作,后续调用get() 方法获取操作结果,如果操作完成会理科返回,如果操作没有完成则回阻塞线程,直到操作完成返回。

    3、Future 接口的缺陷(局限性)

    Future接口 还提供了方法来检测异步计算是否已经结束(isDone() 方法),等待异步操作结束。

    但是当长时间计算任务完成时,该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并。

    就会发生很多性能问题:1、将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。)2、此时就要等待 Future 集合中的所有任务都完成。3、当Future的完成事件发生时会收到通知,使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果,每一步都需要等待。

    三、CompletableFuture 接口详解

    1、CompletableFuture 的创建

    CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值,runAsync适合创建不需要返回值的计算任务

    通过该supplyAsync 函数创建的CompletableFuture实例会异步执行当前传入的计算任务,在调用端,则可以通过get或join获取最终计算结果。

    同事直接new(构造函数创建)也是可以的,下面通过一个实战小例子看下,高并发高性能的添加购物车结构如何设计

    1. import java.util.concurrent.CompletableFuture;
    2. import org.springframework.core.task.AsyncTaskExecutor;
    3. public class CompletableFutureImpl {
    4. @Autowired
    5. @Qualifier("asyncExecutor")
    6. private AsyncTaskExecutor asyncTaskExecutor;
    7. public DefaultResponseVO addGoodsCart(HttpServletRequest request, @Valid AddCartReqVO reqVo) {
    8. // 添加购物车
    9. GetCartItemEntity respVO = mallCartProcess.addGoodsCart(buildAddGoodsCartReqVO(reqBo));
    10. /**
    11. * 异步刷新到购物车
    12. */
    13. CompletableFuture voidCompletableFuture = CompletableFuture.runAsync(() -> syncAddCacheCheck(respVO), asyncTaskExecutor);
    14. // 通过supplyAsync 函数创建的
    15. CompletableFuture uCompletableFuture = CompletableFuture.supplyAsync(() -> syncAddCacheCheck(respVO));
    16. // 构造函数创建
    17. CompletableFuture completableFuture = new CompletableFuture().runAsync(() -> syncAddCacheCheck(respVO));
    18. return new DefaultResponseVO(code, msg, respVO);
    19. }
    20. /**
    21. * 添加购物车缓存(异步刷新到购物车)
    22. */
    23. public void syncAddCacheCheck(GetCartItemEntity cartItemEntity) {
    24. try {
    25. // 添加购物车缓存(异步刷新到购物车)
    26. mallCartProcess.getCartInfo(cartItemEntity);
    27. } catch (Exception e) {
    28. logger.error("syncAddCache error", e);
    29. }
    30. }
    31. }
    32. 2、CompletableFuture.supplyAsync 源码分析

      本小节讲解CompletableFuture的底层实现

      上面为java 8中 supplyAsync 函数的实现源码。可以看到,当 supplyAsync 入参只有 supplier 时,会默认使用asyncPool作为线程池(一般情况下为ForkJoinPool的commonPool),并调用内部方法asyncSupplyStage执行具体的逻辑。在 asyncSupplyStage 方法中,程序会创建一个空的CompletableFuture 返回给调用方。同时该 CompletableFuture 和传入的 supplier 会被包装在一个AsyncSupply 实例对象中,然后一起提交到线程池中进行处理。

      值得注意的是,当supplyAsync返回时,调用方只会拿到一个空的CompletableFuture实例。看到这里,我们可以猜测,当计算最终完成时,计算结果会被set到对应的CompletableFuture的result字段中。调用方通过join或者get就能取到该CompletableFuture的result字段的值。所以,虽然实际创建CompletableFuture的线程和进行任务计算的线程不同,但是最终会通过result来进行结果的传递。这种方式与传统的Future中结果传递方式类似(计算线程set值,使用线程get值)。

      上面为java 8中 AsyncSupply 的实现源码,AsyncSupply的源码很简单。首先,它实现了Runnable接口,所以被提交到线程池中后,工作线程会执行其run()方法。通过对AsyncSupply中run方法的分析,也基本证实我们之前的猜测。即计算任务由工作线程调用run方法执行,并设置到CompletableFuture的结果中。其他线程中的使用方,则可以调用该CompletableFuture的join或者get方法获取其结果。

      因此,我们只需要搞清楚其run()中的实现即可。在 run() 中,程序首先检查了传入的CompletableFuture 和 Supplier 是否为空,如果均不为空,再检查 CompletableFuture 的 d.result是否为空,如果不为空,则说明 CompletableFuture 的值已经被其他线程主动设置过了(这也是CompletableFuture与Future最大的不同之处),因此这里就不会再被重新设置一次。如果 d.result 为空,则调用Supplier(源码中的 f 变量)的get()方法,执行具体的计算,然后通过 completeValue 方法将结果设置到CompletableFuture中。最后,调用CompletableFuture的postComplete()方法,执行连接到当前CompletableFuture上的后置任务。

      3、CompletableFuture.runAsync 源码分析

      通过上面的源码可以看出,runAsync也会生成一个空的CompletableFuture,并包装在AsyncRun中提交到线程池中执行。这与supplyAsync是完全一致的。由于Runnable没有返回值,这里返回的CompletableFuture的结果值是Void类型的。

      AsyncRun的run中,计算的执行是通过调用传入的Runnable(源码中的 f 变量)的run方法进行的。由于没有返回值,所以这里在设置CompletableFuture的值时,使用其completeNull()方法,设置一个特殊的空值标记。 设计方面和supplyAsync一致

      4、CompletableFuture API 实战

      thenApply、thenAccept、thenRun

      thenApply 提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。thenAccept 提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。thenRun 提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。

      thenCombine、thenCompose

      thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture。嵌套获取层级也越来越深。因此,需要一种方式,能将这种嵌套模式展开,使其没有那么多层级。thenCompose的主要目的就是解决这个问题(这里也可以将thenCompose的作用类比于stream接口中的flatMap,因为他们都可以将类型嵌套展开)。

      whenComplete、handle

      whenComplete主要用于注入任务完成时的回调通知逻辑(获得的结果是前置任务的结果,whenComplete中的逻辑不会影响计算结果)。handle与handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果(产生了新的结果)

      四、反应式编程

      1、什么是反应式(resctive)编程

      反应式编程是最近几年才提出的概念,主要有四个特征:响应式,反应式编程的响应速度应该很快,而且是稳定可预测的。韧性,系统出现失败时,任然可以继续响应服务,任何一个组件都能以异步的方式想其他组件分发任务。弹性,影响代码响应的因素的代码(系统)的负载能力,反应式编程可以增加分配的资源,受流量影响后有自动适配的能力,服务更大的负载。消息驱动,各个组件之间松耦合,组件隔离,跨组件通信使用异步消息传递。

      反应式(resctive)编程在应用层的主要特征是任务以异步的方式执行,非阻塞的处理事件流,充分利用多核CPU的特点,大多反应式框架(RxJava等)都可以独立开辟线程池,用于执行阻塞式操作,主线程池中运行都是无阻塞的。

      2、反应式流(Flow API)

      2.1、发布订阅模式

      Future CompletableFuture 的思维模式是计算的执行是独立且并发的。使用 get()方法可以在执行结束后获取 Future 对象的执行结果。因此,Future 是一个一次性对象,它只能从头到尾执行代码一次。

      与此相反,反应式编程的思维模式是类 Future 的对象随着时间的推移可以产生很多的结果。举个例子是 Web 服务器的监听组件对象。该组件监听来自网络的 HTTP请求,并根据请求的内容返回相应的数据。

      2.2、Flow API 源码解析

      Java9 使用 java.util.concurrent.Flow 提供的接口对反应式编程进行建模,实现了名为“发布-订阅”的模型(也叫协议,简写为 pub-sub )

      反应式编程有三个主要的概念,分别是:订阅者可以订阅的发布者;名为订阅的连接;消息(也叫事件),它们通过连接传输。

      反应式流(Flow API)规范可以总结为4个接口:Publisher(发布者)、Subscriber(订阅者)、Subscription(订阅)和Processor(处理者)

      Publisher负责生成数据,并将数据发送给 Subscription(每个Subscriber对应一个Subscription)。Publisher接口声明了一个方法 subscribe(),Subscriber可以通过该方法向 Publisher发起订阅。 

      一旦Subscriber订阅成功,就可以接收来自Publisher的事件。这些事件是通过Subscriber接口上的方法发送的

      Subscriber的第一个事件是通过对 onSubscribe()方法的调用接收的。Publisher调用 onSubscribe() 方法时,会将Subscription对象传递给 Subscriber。通过Subscription,Subscriber可以管理其订阅情况

      Subscriber 可以通过调用 request() 方法来请求 Publisher 发送数据,或者通过调用 cancel()方法表明它不再对数据感兴趣并且取消订阅。当调用 request() 时,Subscriber 可以传入一个long类型的数值以表明它愿意接受多少数据。这也是回压能够发挥作用的地方,以避免Publisher 发送多于 Subscriber能够处理的数据量。在 Publisher 发送完所请求数量的数据项之后,Subscriber 可以再次调用 request()方法来请求更多的数据。

      Subscriber 请求数据之后,数据就会开始流经反应式流。Publisher 发布的每个数据项都会通过调用 Subscriber 的 onNext()方法递交给 Subscriber。如果有任何错误,就会调用 onError()方法。如果 Publisher 没有更多的数据,也不会继续产生更多的数据,那么将会调用 Subscriber 的onComplete() 方法来告知 Subscriber 它已经结束

      反应式流规范的接口本身并不支持以函数式的方式组成这样的流。Reactor 项目是反应式流规范的一个实现,提供了一组用于组装反应式流的函数式API。有我们自己实现。

      2.3、Flow API 实战

      FlowImpl :创建Publisher并向其订阅TempSubscriber

      1. public class FlowImpl {
      2. public static void main(String[] args) {
      3. // 创建 Publisher 并向其订阅 Subscriber
      4. getOrderAmt("20220727123").subscribe(new SubscriberImpl());
      5. }
      6. // Publisher是个函数式接口
      7. private static Flow.Publisher getOrderAmt(String orderId) {
      8. return subscriber -> subscriber.onSubscribe(new SubscriptionImpl(subscriber, orderId));
      9. }
      10. }

      Subscription接口:实现向 Subscriber 发送 OrderInfo Steam

      1. public class SubscriptionImpl implements Flow.Subscription {
      2. private final Flow.Subscribersuper OrderInfo> subscriber;
      3. private final String orderId;
      4. public SubscriptionImpl(Flow.Subscribersuper OrderInfo> subscriber, String orderId) {
      5. this.subscriber = subscriber;
      6. this.orderId = orderId;
      7. }
      8. @Override
      9. public void request(long n) {
      10. // 另起一个线程向 subscriber 发送下一个元素
      11. Executors.newSingleThreadExecutor().submit(() -> {
      12. for (long i = 0L; i < n; i++) // subscriber 每处理一个请求执行一次循环
      13. try {
      14. // 将当前 订单号 发送给 Subscriber
      15. subscriber.onNext(OrderInfo.reduceAmt(orderId));
      16. } catch (Exception e) {
      17. // 查询报错将这个报错信息传给 Subscriber
      18. subscriber.onError(e);
      19. e.printStackTrace();
      20. break;
      21. }
      22. });
      23. }
      24. @Override
      25. public void cancel() {
      26. // 如果 Subscription 被取消了,向 subscriber 发送一个完成信号
      27. subscriber.onComplete();
      28. }
      29. }

      Subscriber接口:实现打印输出收到的 订单 数据

      1. public class SubscriberImpl implements Flow.Subscriber {
      2. private Flow.Subscription subscription;
      3. @Override
      4. public void onSubscribe(Flow.Subscription subscription) {
      5. this.subscription = subscription;
      6. subscription.request(1);
      7. }
      8. @Override
      9. public void onNext(OrderInfo orderInfo) {
      10. System.out.println(orderInfo);
      11. subscription.request(1);
      12. }
      13. @Override
      14. public void onError(Throwable throwable) {
      15. System.out.println(throwable.getMessage());
      16. }
      17. @Override
      18. public void onComplete() {
      19. System.out.println("Done!");
      20. }
      21. }

      总结

      Java代码为了更好的发展和性能,开发了 异步编程的模式,Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

    33. 相关阅读:
      多链路聚合路由在消防智能指挥系统中的应用
      【NodeJs-5天学习】第二天篇② —— 网络编程(TCP、HTTP、Web应用服务)
      jar 文件携带参数启动
      Qt官方示例:Qt Quick Controls - Gallery
      视频知识点(17)- flv.js 实现播放本地视频文件的技巧
      附录5-本地生活案例(上拉触底,下拉刷新)
      C 语言的标识符,保留标识符,关键字
      【华为上机真题 2022】流水线
      spring注解开发
      前端程序员兼职副业平台推荐
    34. 原文地址:https://blog.csdn.net/FMC_WBL/article/details/125941583