• 【响应式编程】Schedulers之线程池共用问题


    本篇重点介绍下Reactor中线程池的共用问题。由于在响应式编程中线程数很有限,所以共用时一定要注意阻塞问题,而这个在以前命令式编程中是很少被关注的,需要引起注意。

    Worker共用问题

    从上面的线程模型中可以看出,原生Executor数量是很有限的,例如boundedElastic()获取的是10倍CPU核数的数量。为了描述简化,我们把原生Executor称为“工作线程”,他是Scheduler线程池的工作线程。所以会有大量Worker共用一个工作线程的情况,这就会引发一些意想不到的情况。
    例如有两个Flux流,其中一个处理onNext方法很快,而另一个则处理onNext很慢(比如是有阻塞代码的),当这两个流对应的Worker是派生至同一个"工作线程"时,就会导致那个本来执行速度快的Flux流也执行的很慢,相当于这两个流的工作任务task放在了同一个task queue里进行了排队。
    因此,当如果流的处理速度不一样时,特别是非阻塞与阻塞两种不同类型的流时,一定要采用独立的Scheduler来分开处理,避免产生Worker的共用问题。

    Schedulers的共用问题

    Schedulers的共用比较明显,当使用boundedElastic()elastic()parallel()等静态方法获取线程池时,每次调用的时候,其实都是共用的同一个线程池,其源码如下:

    1. public static Scheduler elastic() {
    2. return cache(CACHED_ELASTIC, ELASTIC, ELASTIC_SUPPLIER);
    3. }
    4. static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) {
    5. CachedScheduler s = reference.get();
    6. if (s != null) {
    7. return s;
    8. }
    9. s = new CachedScheduler(key, supplier.get());
    10. if (reference.compareAndSet(null, s)) {
    11. return s;
    12. }
    13. //the reference was updated in the meantime with a cached scheduler
    14. //fallback to it and dispose the extraneous one
    15. s._dispose();
    16. return reference.get();
    17. }

    因此,同样的,在使用这些Schedulers的内置方法获取线程池时也要注意共用问题,例如一个Stream流里存在阻塞,会导致其他流也被阻塞。

    Spring Cloud Gateway (Reactor-Netty)

    同样的共用问题在Spring Cloud Gateway里同样存在,HttpServer与WebClient也存在共用线程池的问题。准确的说,是在Spring5的WebFlux中,再进一步,其实是Reactor-Netty,因为HttpServer与WebClient底层用的就是Reactor-Netty包。
    我们知道,Netty的线程模型用的是EventLoopGroup,所以在创建server或client时,就需要创建EventLoopGroup线程池,而在Reactor-Netty中,默认用的是一个由HttpResources类提供的静态方法来获取:

    1. public final class HttpResources extends TcpResources {
    2. public static HttpResources get() {
    3. return getOrCreate(httpResources, null, null, ON_HTTP_NEW, "http");
    4. }
    5. }

    即都会采用默认的名为"http-reactor-xxx"的loops。
    因此,当一个程序作为web server时,就会用到HttpServer,例如WebFlux的netty server或者SpringCloudGateway, 此时,如果在程序中用到WebClient时,如果WebClient请求并处理数据时出现阻塞,则会导致整个server也会阻塞。
    特别是,对于netty使用的EventLoopGroup的线程数通常是cpu核数的两倍,所以一旦有阻塞代码,那么很快整个服务器的线程数都会被迅速耗尽。
    因此,在写响应式代码时一定要注意避免阻塞代码,如果无法避免,一定要用独立的线程池来处理,避免出现共用的情况。

     

  • 相关阅读:
    博客园商业化之路-众包平台:偶遇外包项目需求
    mysqlslap压力测试和线程池
    前端-(6)
    Nginx 反向代理 SSL 证书绑定域名
    找不到名称 “$“。是否需要安装 jQuery 的类型定义? 请尝试使用 `npm i --save-dev @types/jquery`。
    maxwell采集mysql binlog 日志数据到kafka topic
    (完美方案)解决mfc140u.dll文件丢失问题,快速且有效的修复
    C++ 虚析构函数的作用?
    MySQL索引与事务
    第八章《Java高级语法》第2节:补码
  • 原文地址:https://blog.csdn.net/m0_57042151/article/details/127439772