• 线程池拒绝策略最佳实践


    之前线上项目偶发出现线程池耗尽的问题,最近终于有空能好好研究一把,问题实际并不复杂,也得益于Dubbo线程池的拒绝策略才能很快找到大致的原因。

    通过这个问题,也有些好奇各家使用的线程池拒绝策略是怎样的,刨刨坑、挖挖土,一起来看看吧~

    问题背景

    之前线上偶发出现线程池耗尽问题,现象如下:

     

    在调用下游Dubbo接口时,提示Server端的线程池耗尽。

    最开始以为是有突发流量,但是监控显示流量稳定,并且扩容后发现问题依然存在,渐渐意识到问题并不简单。

    问题分析

    既然有异常日志和堆栈,先看看到底什么场景下会出现这个异常。在Dubbo源码中,我们可以找到这一段提示出现在AbortPolicyWithReport中。

    public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy
    

    AbortPolicyWithReport继承自 java.util.concurrent.ThreadPoolExecutor.AbortPolicy,是一种线程池拒绝策略,当线程池中的缓冲任务队列满,且线程数量达到最大时,就会触发拒绝策略,调用拒绝策略的rejectedExecution()方法进行处理。

    那么,有哪些不同的拒绝策略呢?

    JDK线程池拒绝策略

    java.util.concurrent.ThreadPoolExecutor,我们可以找到JDK预设置的四种拒绝策略:

    • CallerRunsPolicy - 调用者线程处理

    该策略下,如果线程池未关闭,则交由当前调用者线程进行处理,否则直接丢弃任务。

    1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    2. if (!e.isShutdown()) {
    3. r.run();
    4. }
    5. }
    • AbortPolicy - 抛出异常

    如果不配置拒绝策略的话,线程池会默认使用该策略,直接抛出rejectedExecution,交由上层业务处理。

    1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    2. throw new RejectedExecutionException("...");
    3. }
    • DiscardPolicy - 丢弃当前任务

    最简单的处理方法,直接丢弃。

    1. //实际方法体就是空的,即该场景下不处理,直接丢弃
    2. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    3. }
    • DiscardOldestPolicy - 丢弃下一个要执行的任务

    该策略是丢弃队列中最老的任务(其实就是下一个要执行的任务),并尝试执行当前任务。

    1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    2. if (!e.isShutdown()) {
    3. e.getQueue().poll();
    4. e.execute(r);
    5. }
    6. }

    Dubbo线程池拒绝策略

    那么Dubbo的拒绝策略是怎样的呢?

    其实从名字就能看出来,AbortPolicyWithReport

    1. public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    2. ...
    3. @Override
    4. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    5. String msg = String.format("Thread pool is EXHAUSTED!" + ...);
    6. logger.warn(msg);
    7. dumpJStack();
    8. dispatchThreadPoolExhaustedEvent(msg);
    9. throw new RejectedExecutionException(msg);
    10. }
    11. ...
    12. }

    Dubbo的拒绝策略是抛出异常RejectedExecutionException,同时还会做一件事情 - dumpJStack(),记录下当时的JVM线程堆栈。

    dumpJStack

    先来看看源码。

    1. private void dumpJStack() {
    2. //一些dump时间间隔和并发控制
    3. ...
    4. //新建单线程池用于dump堆栈
    5. ExecutorService pool = Executors.newSingleThreadExecutor();
    6. pool.execute(() -> {
    7. ...
    8. try (FileOutputStream jStackStream = new FileOutputStream(
    9. new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
    10. JVMUtil.jstack(jStackStream);
    11. } catch (Throwable t) {
    12. logger.error("dump jStack error", t);
    13. }
    14. ...
    15. });
    16. ...
    17. }

    做法其实很简单,最终调用JVMUtil.jstack把当前JVM的线程堆栈dump下来,而这样做有一个很大的好处,就是能知道当时其他线程到底在做什么,帮助分析线程池溢出的原因。

    原因分析

    有线程堆栈就好办了,看看当时线程都在做什么。

    Dubbo底层使用Netty实现网络通信,涉及的线程池包括IO线程池(boss、worker)和业务线程池(处理业务事件)。通过之前的日志可以看到是Server端的业务线程池,即DubboServerHandler耗尽,那么统计一把,看看线程都在做什么。

    很明显,大量的线程都阻塞在获取DB连接上。那接下来就好办了,可以看看同时间段是不是有慢查询长时间占住了连接,或者是真的连接池小了,线程池和连接池配比不对,分析至此就不继续展开(并不是讨论重点哈哈)。

    不同的拒绝策略

    可以看到,Dubbo通过重写了拒绝了策略,来帮助异常场景下进行问题定位,带来了很大的帮助。

    那么其他主流组件是怎么做的呢?

    RocketMQ

    Broker为例,其中包含了非常多的线程池用于处理不同的消息处理场景,包含send、put、pull、query等等。

    在线程池的使用上,RocketMQ通过BrokerFixedThreadPoolExecutor继承封装了一层ThreadPoolExecutor,上层可以自行传入参数,其中也包含了可配置的RejectedExecutionHandler

    实际在Broker创建消息处理的不同线程池时,并没有指定特殊的拒绝策略,所以使用的是默认的AbortPolicy,即抛出异常。

    1. this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
    2. this.brokerConfig.getSendMessageThreadPoolNums(),
    3. this.brokerConfig.getSendMessageThreadPoolNums(),
    4. 1000 * 60,
    5. TimeUnit.MILLISECONDS,
    6. this.sendThreadPoolQueue,
    7. new ThreadFactoryImpl("SendMessageThread_")
    8. //并没有设置拒绝策略
    9. );

    同时为了避免任务溢出,为每个线程池默认设置了较大的任务队列大小。

    1. private int sendThreadPoolQueueCapacity = 10000;
    2. private int putThreadPoolQueueCapacity = 10000;
    3. private int pullThreadPoolQueueCapacity = 100000;
    4. private int replyThreadPoolQueueCapacity = 10000;
    5. private int queryThreadPoolQueueCapacity = 20000;
    6. private int clientManagerThreadPoolQueueCapacity = 1000000;
    7. private int consumerManagerThreadPoolQueueCapacity = 1000000;
    8. private int heartbeatThreadPoolQueueCapacity = 50000;
    9. private int endTransactionPoolQueueCapacity = 100000;

    综上,RocketMQ的拒绝策略使用了AbortPolicy,即抛出异常,同时为了避免任务队列溢出,设置了较大的任务队列。

    Netty

    EventLoopGroup为例,线程池的拒绝策略默认使用RejectedExecutionHandlers,通过单例模式提供Handler进行处理。

    1. public final class RejectedExecutionHandlers {
    2. private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
    3. @Override
    4. public void rejected(Runnable task, SingleThreadEventExecutor executor) {
    5. throw new RejectedExecutionException();
    6. }
    7. };
    8. private RejectedExecutionHandlers() { }
    9. public static RejectedExecutionHandler reject() {
    10. return REJECT;
    11. }
    12. ...
    13. }

    可以看出,Netty的拒绝策略默认也是抛出异常,与RocketMQ对比的不同的点在于,任务队列的大小会取max(16, maxPendingTasks)io.netty.eventLoop.maxPengdingTasks可通过环境变量进行配置。

    Doris

    团队内一直在用Doris,属于计算存储分离、MPP架构的分析型存储组件,看了一眼FE中的拒绝策略,官方实现了两种:

    LogDiscardPolicy

    1. static class LogDiscardPolicy implements RejectedExecutionHandler {
    2. private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class);
    3. private String threadPoolName;
    4. public LogDiscardPolicy(String threadPoolName) {
    5. this.threadPoolName = threadPoolName;
    6. }
    7. @Override
    8. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    9. LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString());
    10. }
    11. }

    可以理解就是DiscardPolicy,丢弃任务,同时记录warn日志。

    BlockedPolicy

    1. static class BlockedPolicy implements RejectedExecutionHandler {
    2. private String threadPoolName;
    3. private int timeoutSeconds;
    4. public BlockedPolicy(String threadPoolName, int timeoutSeconds) {
    5. this.threadPoolName = threadPoolName;
    6. this.timeoutSeconds = timeoutSeconds;
    7. }
    8. @Override
    9. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    10. try {
    11. executor.getQueue().offer(r, timeoutSeconds, TimeUnit.SECONDS);
    12. } catch (InterruptedException e) {
    13. LOG.warn("Task " + r.toString() + " wait to enqueue in " + threadPoolName + " " + executor.toString() + " failed");
    14. }
    15. }
    16. }

    这种策略会特殊一些,它会阻塞住当前线程,尽最大努力尝试将任务放入队列中。如果超过指定的阻塞时间timeoutSeconds(默认60s),仍然无法将任务放入队列中,则记录warn日志,并丢弃任务。

    这两种策略在Doris中都有实际使用到,同时线程池的任务队列大小默认设置为10。

    ElasticSearch

    ES的拒绝策略相对复杂一些,其自定义实现了两种拒绝策略。

    • EsAbortPolicy
    1. public class EsAbortPolicy extends EsRejectedExecutionHandler {
    2. @Override
    3. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    4. if (r instanceof AbstractRunnable) {
    5. if (((AbstractRunnable) r).isForceExecution()) {
    6. BlockingQueue<Runnable> queue = executor.getQueue();
    7. if ((queue instanceof SizeBlockingQueue) == false) {
    8. throw new IllegalStateException("forced execution, but expected a size queue");
    9. }
    10. try {
    11. ((SizeBlockingQueue<Runnable>) queue).forcePut(r);
    12. } catch (InterruptedException e) {
    13. Thread.currentThread().interrupt();
    14. throw new IllegalStateException("forced execution, but got interrupted", e);
    15. }
    16. return;
    17. }
    18. }
    19. incrementRejections();
    20. throw newRejectedException(r, executor, executor.isShutdown());
    21. }
    22. }

    其实本质上就是AbortPolicy,但是会进行一些特殊处理,包括forceExecution强制执行的判断、任务拒绝次数统计,最终抛出异常。

    ES中,线程池的forceExecution是指什么?

    在满足条件的情况下,即使用了ES自定义的AbstractRunnable进行任务封装、SizeBlockingQueue作为任务队列时,可以根据任务配置判断是否强制放入任务队列。对于一些比较重要的任务,不能丢弃时,可以将forceExecution 设置为true。

    强制放入任务队列带来的效果取决于SizeBlockingQueue中封装的队列类型,如果封装的是ArrayBlockingQueue,则会阻塞等待队列有空余;如果封装的是LinkedTransferQueue,由于队列大小无限,且put使用的是ASYNC模式,所以会立刻放入队列并返回。

    • ForceQueuePolicy
    1. static class ForceQueuePolicy extends EsRejectedExecutionHandler {
    2. ...
    3. @Override
    4. public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
    5. if (rejectAfterShutdown) {
    6. if (executor.isShutdown()) {
    7. reject(executor, task);
    8. } else {
    9. put(executor, task);
    10. if (executor.isShutdown() && executor.remove(task)) {
    11. reject(executor, task);
    12. }
    13. }
    14. } else {
    15. put(executor, task);
    16. }
    17. }
    18. private static void put(ThreadPoolExecutor executor, Runnable task) {
    19. final BlockingQueue<Runnable> queue = executor.getQueue();
    20. // force queue policy should only be used with a scaling queue
    21. assert queue instanceof ExecutorScalingQueue;
    22. try {
    23. queue.put(task);
    24. } catch (final InterruptedException e) {
    25. assert false : "a scaling queue never blocks so a put to it can never be interrupted";
    26. throw new AssertionError(e);
    27. }
    28. }
    29. private void reject(ThreadPoolExecutor executor, Runnable task) {
    30. incrementRejections();
    31. throw newRejectedException(task, executor, true);
    32. }
    33. }
    34. }

    该策略在线程池未关闭,且使用了ES自定义的ExecutorScalingQueue的任务队列时,会强制将任务放入线程池队列中。其中,ExecutorScalingQueue也是继承自LinkedTransferQueue,最终调用put方法以ASYNC模式放入任务队列中。

    看上去也是forceExecution,而且最终都是使用LinkedTransferQueue的put方法以ASYNC模式非阻塞入队列。那么EsAbortPolicyForceQueuePolicy有什么不同呢?

    两者有很多相似点,都有forceExecution的判断,而且拒绝时都是抛出RejectedExecutionException

    不同点在于,ForceQueuePolicy默认采用强制执行模式,且在线程池关闭时依然可能往队列放入任务。

    其他

    在GitHub上随意翻了一下,也有看到用策略链的方式,实现也很简单,可以随意组合配置不同的策略。

    1. public class RejectedExecutionChainPolicy implements RejectedExecutionHandler {
    2. private final RejectedExecutionHandler[] handlerChain;
    3. @Override
    4. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    5. for (RejectedExecutionHandler handler : handlerChain) {
    6. handler.rejectedExecution(r, executor);
    7. }
    8. }
    9. }

    总结

    拒绝策略主要应用在线程池出现资源溢出的情况下,除了常见的由JDK提供的四种拒绝策略外,不同的组件也会尝试使用不同的拒绝策略来应用。

    JDK提供的拒绝策略

    类型说明
    CallerRunsPolicyJDK提供,调用者线程处理
    AbortPolicyJDK线程池默认使用,抛出RejectedExecutionException异常
    DiscardPolicyJDK提供,丢弃当前任务
    DiscardOldestPolicyJDK提供,丢弃下一个要执行的任务

    自定义拒绝策略

    组件类型说明
    RocketMQAbortPolicy使用的线程池默认拒绝的略,即AbortPolicy
    DubboAbortPolicyWithReport抛出RejectedExecutionException异常,并报告溢出,记录下JVM线程堆栈
    NettyRejectedExecutionHandlers逻辑与AbortPolicy一致,抛出异常,封装为单例Handler使用
    DorisLogDiscardPolicy逻辑与DiscardPolicy一致,丢弃任务,并记录warn日志
    DorisBlockedPolicy尽最大努力尝试将任务放入队列执行,最多等待60s,超时后记录warn日志,并丢弃任务
    ElasticEsAbortPolicy正常情况下与AbortPolicy一致,如果线程标记强制执行,则强制执行或放入任务队列,实际入队列的表现取决于队列类型,可能阻塞或立即返回
    ElasticForceQueuePolicy默认强制执行任务或放入任务队列,异步非阻塞
    其他PolicyChain策略链,包含多种拒绝策略,根据条件与节点处理结果决定最终表现
  • 相关阅读:
    2019年互联网高频Java面试题指南!互联网升职加薪方案!
    机器学习基本模型与算法在线实验闯关
    iNeuOS工业互联网操作系统,设备运维业务和“低代码”表单开发工具
    非常好用的组件库【semi.design】
    初识设计模式 - 中介模式
    双键网络对讲求助模块
    [公派访问学者]申请条件及选拔方法
    【WSN定位】基于matlab灰狼算法优化无线传感器非测距定位【含Matlab源码 2008期】
    揭秘 .NET 中的 TimerQueue(下)
    [Mac软件]Goldie App v2.2 Mac黄金比例设计工具
  • 原文地址:https://blog.csdn.net/WXF_Sir/article/details/125625748