这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
继之前研究过的RocketMQ发送消息还有这种坑?遇到SYSTEM_BUSY不重试?
今天我们来分析分析RocketMQ
什么情况下会出现system busy
,因为线上的RocketMQ
集群偶尔会出现[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while
。所以打算详细研究下为何会出现这种情况
我们通过查看SYSTEM_BUSY
状态码调用的方式可以看到总共有如下几种system busy
Grpc这边的异常我们暂时不讨论,因为只是简单的超时判断,和其他四种不太一样
下面我们结合源码来详细分析这四种情况是如何出现的
何时会抛出[REJECTREQUEST]system busy
先说结论
[REJECTREQUEST]system busy
下面我们来结合源码具体分析下
我们直接全局搜索找到对应的源码,然后看看方法
这里看不到实际的逻辑,所以我们继续往上看看,NettyRequestProcessor
接口的实现类有很多,我们主要看SendMessageProcessor
这里第一个if else我们不用管,我们现在不是使用的这种模式部署的,主要看下面的代码
这里可以看到是否抛出[REJECTREQUEST]system busy
异常主要有两个条件
@Override
public boolean isOSPageCacheBusy() {
long begin = this.getCommitLog().getBeginTimeInLock();
long diff = this.systemClock.now() - begin;
return diff < 10000000
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}
begin
: (将消息写入Commitlog文件所持有锁的时间),begin
即beginTimeInLock
,在asyncPutMessages
中赋值也就是消息追加到MappedFile
的开始时间
diff
: (一次消息追加过程中持有锁的总时长,即往内存映射文件或pageCache追加一条消息所耗时间)diff 大于 配置的osPageCacheBusyTimeOutMills
时间,默认1s就表示Os PageCache
Busy
这里的transientStorePool
实际是对外内存,判断条件的代码我们来看看
@Override
public boolean isTransientStorePoolDeficient() {
return remainTransientStoreBufferNumbs() == 0;
}
public int remainTransientStoreBufferNumbs() {
return this.transientStorePool.availableBufferNums();
}
public int availableBufferNums() {
if (messageStore.isTransientStorePoolEnable()) {
return availableBuffers.size();
}
return Integer.MAX_VALUE;
}
首先判断是否开启transientStorePoolEnable
堆外内存存储消息,默认为false
如果没开启返回Integer.MAX_VALUE
一定不为0.
所以如果没有开启transientStorePoolEnable
,则只会判断操作系统页面缓存(Os PageCache)是否繁忙
开启了transientStorePoolEnable
还会判断堆外内存是否够用
堆外内存的相关配置通过transientStorePoolSize
和mappedFileSizeCommitLog
配置,默认为5个ByteBuffer
,每个1G
所以开启堆外内存存储消息,会额外消耗5G内存。
如果对外内存不够则建议调整transientStorePoolSize
的值
这里结合源码分析主要是在线程池提交任务被拒绝的时候会给client
返回这个异常,我们具体来看看这个线程池
我们看看队列的容量是多少
this.sendThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getSendThreadPoolQueueCapacity());
可以看到默认是1w,也就是如果发送消息的请求超过了1w未处理,就会直接给client
抛出这个异常
我们全局搜索[TIMEOUT_CLEAN_QUEUE]broker busy
有两处地方,我们先看看RemotingProtocolServer
this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 10, TimeUnit.SECONDS);
可以看到使用启动器启动扫描去处理过期的请求,如何处理呢
这里就是检验线程池中的任务是否超过执行时间,如果超过则移除,然后给client
返回[TIMEOUT_CLEAN_QUEUE]broker busy,具体的超时时间不同的线程池有不同的配置,
消息发送的3s
我们再看看BrokerFastFailure
这里其实可以看到也是清理线程池的BlockingQueue
,和上面的RemotingProtocolServer
类似,不同的是这里的这些BlockingQueue
主要是给broker
使用的,上面的主要是给远程通信用的。
这里可以看到主要是如果发生了 OS page cache is busy,每隔10毫秒执行一次,直接去将broker
中等待处理的消息queue全部丢弃,返回失败
这里和上面[TIMEOUT_CLEAN_QUEUE]broker busy
不同的是如果发生OS page cache is busy
是直接丢弃,不是等待超时
两者都需要broker开启快速失败即brokerFastFailureEnable
为true,默认为true
本次我们结合源码分析了出现system busy
的四种情况
sendMessageExecutor
)处理任务超时sendMessageExecutor
)中的queue任务sendMessageExecutor
)队列容量满了(默认1w)如何缓解这几种异常呢?本质都是broker
的压力太大了,处理不过来。
最好的解决方式就是提升broker
的处理速度,即RocketMQ
的 QPS。
有以下几种方式
broker
集群broker
qps(实际1中的方法也属于这条)不太可取方案:
sendMessageExecutor
线程池的队列容量osPageCacheBusyTimeOutMills
超时时间