• Project Reactor源码分析-subscribeOn


    相关示例源码:github.com/chentianmin…

    功能分析

    1. public final Flux subscribeOn(Scheduler scheduler, boolean requestOnSeparateThread)

    在subscribe的时候进行线程切换,subscribeOn()使得它上游的订阅阶段以及整个消费阶段异步执行

    各参数含义如下:

    1. scheduler:线程切换的调度器,Scheduler用来生成实际执行异步任务的Worker
    2. requestOnSeparateThread:是否需要在Worker上执行request()请求,默认true。在慢Publisher、快Consumer场景中可能导致长时间阻塞,可将requestOnSeparateThread设置成false解决这个问题。

    代码示例

    1. @Test
    2. public void testBlock() {
    3. delayPublishFlux(1000, 1, 10)
    4. .doOnRequest(i -> logLong(i, "request"))
    5. .subscribeOn(Schedulers.newElastic("subscribeOn"))
    6. .publishOn(Schedulers.newElastic("publishOn"), 2)
    7. .subscribe(i -> logInt(i, "消费"));
    8. sleep(10000);
    9. }

    在红框时间段内,消费者被阻塞了!

    requestOnSeparateThread设置成false:恢复正常

    下面我们从源码角度分析subscribeOn()操作符,顺便探究一下上述问题发生的原因。

    源码分析

    首先看一下subscribeOn()操作符在装配阶段做了什么,直接查看Flux#subscribeOn()源码。

    Flux#subscribeOn()

    装配阶段重点是创建了FluxSubscribeOn对象,该类同样实现了OptimizableOperator接口。

    在分析publishOn()的时候,我们已经知道此时在subscribe()阶段会调用FluxSubscribeOn#subscribeOrReturn()方法。

    FluxSubscribeOn#subscribeOrReturn()

    1. 创建了SubscribeOnSubscriber对象,它同时是一个任务。
    2. 在当前线程调用下游onSubscribe()方法。
    3. 使用Woker异步执行SubscribeOnSubscriber任务,实际执行SubscribeOnSubscriber#run()方法。

    SubscribeOnSubscriber#run()

    继续调用了上游的Publisher#subscribe()方法,执行订阅。后续肯定会调用到SubscribeOnSubscriber#onSubscribe()方法。

    SubscribeOnSubscriber#onSubscribe()

    重点是调用了requestUpstream()方法。实际上,在request()中,也可能调用requestUpstream()

    SubscribeOnSubscriber#requestUpstream()

    这里的重点是:由当前线程还是Worker向上游请求数据

    1. requestOnSeparateThread=false:肯定由当前线程执行。
    2. requestOnSeparateThread=true:取决于当前线程是否与执行subscribe()操作是同一线程。

    对于前面的例子,requestOnSeparateThread=true时:

    1. 执行onSubscribe()方法时,当前线程与执行subscribe()操作是同一线程,直接由当前线程继续向上游请求数据。
    2. 执行request()方法时,当前线程变成了publishOn线程,此时由Worker向上游请求数据。而问题正是出现这里,当前的Worker还在生产数据,新提交的任务根本没有线程来执行。request被阻塞住,下游的消费者自然不会消费数据。

    这其实还跟elasticScheduler(弹性线程池)有关,它底层是由单线程池组成的,下一篇文章会深入分析。

    requestOnSeparateThread=false时,由当前线程(publishOn)继续向上游请求数据,因此不会造成阻塞。

    消费阶段

    subscribeOn操作符的消费阶段实现简单,都是直接将元素下发给下游。

  • 相关阅读:
    石油石化物资采购杂志社石油石化物资采购编辑部2023年第18期部分目录
    java高并发系列-第1天:必须知道的几个概念
    工作常用之Spark调优一】
    LeetCode204,计算质数,线性筛
    【51单片机实验笔记】中断篇(一) 外部中断
    解析外贸开发信的结构?营销邮件书写技巧?
    java开源商城免费搭建 VR全景商城 saas商城 b2b2c商城 o2o商城 积分商城 秒杀商城 拼团商城 分销商城 短视频商城
    大数据安全 | 【实验】凯撒加密与解密
    Java【方法】,方法重载,方法递归,你都会了吗?
    【时间序列预测】Informer论文笔记
  • 原文地址:https://blog.csdn.net/LBWNB_Java/article/details/126901243