• Project Reactor源码阅读-Scheduler


    Reacor在进行线程切换(subscribeOn/publishOn)以及并行计算(runOn)都会使用Scheduler

    通过前面的源码分析,我们知道对于每次切换,实际上都会先基于选择的Scheduler,创建一个对应的Worker,真正异步执行是通过调用Worker#schedule()实现的

    本文主要分析各种Scheduler实现原理。

    立即调度-ImmediateScheduler

    通过Schedulers.immediate()创建,在调用者线程立即执行任务,底层就是直接调用run()方法执行

    单线程池调度-SingleScheduler

    通过Schedulers.single()创建,底层是一个单线程池,将任务交给改单线程池执行

    通过Schedulers.newSingle("test")形式,会为创建名称为test的单线程池。

    弹性调度-ElasticScheduler

    通过Schedulers.elatic()创建,底层实现比较复杂,我们详细分析一下。

    代码示例

    1. @Test
    2. public void testElastic() {
    3. Scheduler scheduler = Schedulers.elastic();
    4. for (int i = 0; i < 10; i++) {
    5. scheduler.schedule(new Task(i, 1000));
    6. }
    7. sleep(10000);
    8. }

    直接调用scheduler.schedule()提交任务,并发执行。

    1. @Test
    2. public void testElasticWorker() {
    3. Scheduler.Worker worker = Schedulers.elastic().createWorker();
    4. for (int i = 0; i < 10; i++) {
    5. worker.schedule(new Task(i, 1000));
    6. }
    7. sleep(10000);
    8. }

    调用同一个worker提交任务,异步执行,但是实际上只有1个线程在工作

    Schedulers#elatic()

    涉及到到的3个静态常量如下:

    1. static AtomicReference<CachedScheduler> CACHED_ELASTIC = new AtomicReference<>();
    2. static final String ELASTIC = "elastic";
    3. static final Supplier<Scheduler> ELASTIC_SUPPLIER =
    4. () -> newElastic(ELASTIC, ElasticScheduler.DEFAULT_TTL_SECONDS, true);

    Schedulers#cache()

    这里的目的是缓存Scheduler,重点是调用supplier.get(),实际执行 newElastic(ELASTIC, ElasticScheduler.DEFAULT_TTL_SECONDS, true);

    Schedulers#newElastic()

    实际上会调用Factory#newElatic()来创建Scheduler

    Factory#newElastic()

    Factory里面创建了ElasticScheduler对象。后续一般都调用createWorker()方法。

    我们可以自定义实现Factory接口,实现全局覆盖Schedulers行为

    ElasticScheduler#createWorker()

    先调用pick()方法,然后创建ElasticWorker对象。

    ElasticScheduler#pick()

    1. 优先从cache中拿数据,如果存在,直接返回缓存对象。
    2. 否则创建CacheService

    cache实际上就是对CacheService进行缓存,因为该对象底层包含线程池,资源消耗高,应该尽可能复用

    在调用dispose()时,会将当前cacheService加入到cache中。

    CachedService

    重点是调用了ElasticScheduler#get()获取线程池。

    ElasticScheduler#get()

    每次都创建了一个单线程池

    当我们使用Worker异步执行任务时,实际上是将任务交给对应的单线程池执行。接下来看看为什么直接调用ElasticScheduler#schedule()就能多线程并发执行任务?

    ElasticScheduler#schedule()

    每次都会重新选择CachedService来执行任务,因此能多线程并发执行任务

    有点类似线程池缓存,只不过一个是针对线程,另一个是针对单线程池。

    有边界弹性调度-BoundedElasticScheduler

    使用ElaticSchedulerOOM风险:

    1. 可能创建非常多CacheService对象,即可能创建非常多的单线程池。
    2. 可能提交非常多任务。

    针对这两个问题,Reactor推荐使用BoundedElasticScheduler,该Scheduler加上了边界限制。

    具体来说是增加了线程容量限制和任务队列容量限制,默认为10倍核心线程数和100000, 均可通过配置进行修改。

    BoundedElasticScheduler只是在原有ElasticScheduler加上边界限制,底层仍然是单线程池

    并行调度-ParallelScheduler

    在启动时,会按照一定数量(默认cpu数量)创建单线程池,创建Worker时会选择一个单线程池。能实现并行调度,是因为会创建多个Worker

    为何底层都是单线程池

    不管是弹性调度还是并行调度,Reactor底层都是基于单线程池来实现的。原因在于对于一个流水线上面的操作,理应都是串行处理的。如果使用多个线程的线程池,会存在资源浪费。考虑以下这个例子:

    1. @Test
    2. public void test() {
    3. delayPublishFlux(10, 1, 100)
    4. .publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(100)))
    5. .doOnNext(x -> sleep(1000))
    6. .subscribe(i -> logInt(i, "消费"));
    7. sleep(100000);
    8. }

    在分析publishOn()源码中,我们知道此时上游的数据是交由Worker异步添加到队列的,此时实际会有多个线程执行。但是下游消费能力有限,根本没必要用多个线程来做。

    委派调度-DelegateServiceScheduler

    将任务委派给指定的ExecutorService执行。

  • 相关阅读:
    SpringBoot框架实现简单定时任务
    Abbkine TraKine Pro 活细胞微管染色试剂盒重要特色
    #AcWing-从尾到头打印链表
    求免费好用的问卷调查平台!
    Windows 7 & Windows Server 2008 R2 简体中文版下载 (updated Jun 2024)
    Ubuntu系统-FFmpeg安装及环境配置
    【Linux】套接字编程
    LeetCode199. Binary Tree Right Side View
    获取微信openid和基本信息的总结
    接口间参数传递的一种实现方式
  • 原文地址:https://blog.csdn.net/LBWNB_Java/article/details/127047822