本篇重点介绍下Reactor
中线程池的共用问题。由于在响应式编程
中线程数很有限,所以共用时一定要注意阻塞问题,而这个在以前命令式编程
中是很少被关注的,需要引起注意。
从上面的线程模型中可以看出,原生Executor数量是很有限的,例如boundedElastic()
获取的是10倍CPU核数的数量。为了描述简化,我们把原生Executor称为“工作线程”,他是Scheduler线程池的工作线程。所以会有大量Worker共用一个工作线程的情况,这就会引发一些意想不到的情况。
例如有两个Flux流,其中一个处理onNext方法很快,而另一个则处理onNext很慢(比如是有阻塞代码的),当这两个流对应的Worker是派生至同一个"工作线程"时,就会导致那个本来执行速度快的Flux流也执行的很慢,相当于这两个流的工作任务task放在了同一个task queue里进行了排队。
因此,当如果流的处理速度不一样时,特别是非阻塞与阻塞两种不同类型的流时,一定要采用独立的Scheduler来分开处理,避免产生Worker的共用问题。
Schedulers的共用比较明显,当使用boundedElastic()
、elastic()
、parallel()
等静态方法获取线程池时,每次调用的时候,其实都是共用的同一个线程池,其源码如下:
- public static Scheduler elastic() {
- return cache(CACHED_ELASTIC, ELASTIC, ELASTIC_SUPPLIER);
- }
- static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) {
- CachedScheduler s = reference.get();
- if (s != null) {
- return s;
- }
- s = new CachedScheduler(key, supplier.get());
- if (reference.compareAndSet(null, s)) {
- return s;
- }
- //the reference was updated in the meantime with a cached scheduler
- //fallback to it and dispose the extraneous one
- s._dispose();
- return reference.get();
- }
-
因此,同样的,在使用这些Schedulers的内置方法获取线程池时也要注意共用问题,例如一个Stream流里存在阻塞,会导致其他流也被阻塞。
同样的共用问题在Spring Cloud Gateway里同样存在,HttpServer与WebClient也存在共用线程池的问题。准确的说,是在Spring5的WebFlux中,再进一步,其实是Reactor-Netty,因为HttpServer与WebClient底层用的就是Reactor-Netty包。
我们知道,Netty的线程模型用的是EventLoopGroup,所以在创建server或client时,就需要创建EventLoopGroup线程池,而在Reactor-Netty中,默认用的是一个由HttpResources
类提供的静态方法来获取:
- public final class HttpResources extends TcpResources {
- public static HttpResources get() {
- return getOrCreate(httpResources, null, null, ON_HTTP_NEW, "http");
- }
- }
-
即都会采用默认的名为"http-reactor-xxx"的loops。
因此,当一个程序作为web server时,就会用到HttpServer,例如WebFlux的netty server或者SpringCloudGateway, 此时,如果在程序中用到WebClient时,如果WebClient请求并处理数据时出现阻塞,则会导致整个server也会阻塞。
特别是,对于netty使用的EventLoopGroup的线程数通常是cpu核数的两倍,所以一旦有阻塞代码,那么很快整个服务器的线程数都会被迅速耗尽。
因此,在写响应式代码时一定要注意避免阻塞代码,如果无法避免,一定要用独立的线程池来处理,避免出现共用的情况。