• 关于java.util.concurrent.RejectedExecutionException: event executor terminated


    多线程报了个java.util.concurrent.RejectedExecutionException: event executor terminated 

    线程池的拒绝策略

    ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略

    • CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务
    • AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务
    • DiscardPolicy:直接抛弃任务,不做任何处理
    • DiscardOldestPolicy:去除任务队列中的第一个任务,重新提交

    线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核心线程数,也即最小的线程数。workQueue - 阻塞队列 。 maximumPoolSize - 最大线程数

    当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。

    总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。

    当你的线程跑满,并且等待队列也满了以后,再执行任务,就会报错,你也可以抛弃掉新增的任务
    你的线程池最大是100,队列是200,所以你任务跑多了,就会报这个错误

    拒绝策略的源码

    CallerRunsPolicy

    1. /**
    2. * A handler for rejected tasks that runs the rejected task
    3. * directly in the calling thread of the {@code execute} method,
    4. * unless the executor has been shut down, in which case the task
    5. * is discarded.
    6. * 用于拒绝任务的处理程序,
    7. * 可以直接在{@code execute}方法的调用线程中运行被拒绝的任务
    8. * 除非执行器已被关闭,否则将丢弃该任务。
    9. */
    10. public static class CallerRunsPolicy implements RejectedExecutionHandler {
    11. /**
    12. * Creates a {@code CallerRunsPolicy}.
    13. * 创建一个{@code CallerRunsPolicy}。
    14. */
    15. public CallerRunsPolicy() { }
    16. /**
    17. * Executes task r in the caller's thread, unless the executor
    18. * has been shut down, in which case the task is discarded.
    19. * 除非执行器已关闭,否则在调用者线程中执行任务,
    20. * r 在这种情况下,该任务将被丢弃。
    21. *
    22. * @param r the runnable task requested to be executed
    23. * r 请求执行的可运行任务
    24. * @param e the executor attempting to execute this task
    25. * e 尝试执行此任务的执行者
    26. */
    27. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    28. if (!e.isShutdown()) {
    29. r.run();
    30. }
    31. }
    32. }

    分析:

    CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

    这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)

    这样生产者虽然没有被阻塞,但提交任务也会被暂停。

    但这种策略也有隐患,当生产者较少时,生产者消费任务的时间里,消费者可能已经把任务都消费完了,队列处于空状态,当生产者执行完任务后才能再继续生产任务,这个过程中可能导致消费者线程的饥饿。

    AbortPolicy

    1. /**
    2. * A handler for rejected tasks that throws a
    3. * {@code RejectedExecutionException}.
    4. * 抛出{@code RejectedExecutionException}的拒绝任务处理程序。
    5. */
    6. public static class AbortPolicy implements RejectedExecutionHandler {
    7. /**
    8. * Creates an {@code AbortPolicy}.
    9. */
    10. public AbortPolicy() { }
    11. /**
    12. * Always throws RejectedExecutionException.
    13. * 总是抛出RejectedExecutionException
    14. * @param r the runnable task requested to be executed
    15. * @param e the executor attempting to execute this task
    16. * @throws RejectedExecutionException always
    17. */
    18. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    19. throw new RejectedExecutionException("Task " + r.toString() +
    20. " rejected from " +
    21. e.toString());
    22. }
    23. }

    分析:

    该策略是默认饱和策略。

    使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。

    DiscardPolicy

    1. /**
    2. * A handler for rejected tasks that silently discards the
    3. * rejected task.
    4. * 拒绝任务的处理程序,默认丢弃拒绝任务。
    5. */
    6. public static class DiscardPolicy implements RejectedExecutionHandler {
    7. /**
    8. * Creates a {@code DiscardPolicy}.
    9. */
    10. public DiscardPolicy() { }
    11. /**
    12. * Does nothing, which has the effect of discarding task r.
    13. * 不执行任何操作,这具有丢弃任务 r 的作用。
    14. * @param r the runnable task requested to be executed
    15. * @param e the executor attempting to execute this task
    16. */
    17. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    18. }
    19. }

    分析:

    如代码所示,不做任何处理直接抛弃任务

    DiscardOldestPolicy

    1. /**
    2. * A handler for rejected tasks that discards the oldest unhandled
    3. * request and then retries {@code execute}, unless the executor
    4. * is shut down, in which case the task is discarded.
    5. * 处理被拒绝任务的处理程序,它丢弃最旧的未处理请求,
    6. * 然后重试{@code execute},
    7. * 除非执行器*被关闭,在这种情况下,该任务将被丢弃。
    8. */
    9. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    10. /**
    11. * Creates a {@code DiscardOldestPolicy} for the given executor.
    12. */
    13. public DiscardOldestPolicy() { }
    14. /**
    15. * Obtains and ignores the next task that the executor
    16. * would otherwise execute, if one is immediately available,
    17. * and then retries execution of task r, unless the executor
    18. * is shut down, in which case task r is instead discarded.
    19. * 获取并忽略执行者*会立即执行的下一个任务(如果一个任务立即可用),
    20. * 然后重试任务r的执行,除非执行者*被关闭,在这种情况下,任务r会被丢弃。
    21. * @param r the runnable task requested to be executed
    22. * @param e the executor attempting to execute this task
    23. */
    24. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    25. if (!e.isShutdown()) {
    26. e.getQueue().poll();
    27. e.execute(r);
    28. }
    29. }
    30. }

    分析:

    如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。

    自定义策略

    看完发现默认的几个拒绝策略并不是特别的友好,那么可不可以咱们自己搞个呢?

    可以发现,所有的拒绝策略都是实现了 RejectedExecutionHandler 接口

    1. public interface RejectedExecutionHandler {
    2. /**
    3. * Method that may be invoked by a {@link ThreadPoolExecutor} when
    4. * {@link ThreadPoolExecutor#execute execute} cannot accept a
    5. * task. This may occur when no more threads or queue slots are
    6. * available because their bounds would be exceeded, or upon
    7. * shutdown of the Executor.
    8. *
    9. * <p>In the absence of other alternatives, the method may throw
    10. * an unchecked {@link RejectedExecutionException}, which will be
    11. * propagated to the caller of {@code execute}.
    12. *
    13. * @param r the runnable task requested to be executed
    14. * @param executor the executor attempting to execute this task
    15. * @throws RejectedExecutionException if there is no remedy
    16. */
    17. void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    18. }

    这个接口只有一个 rejectedExecution 方法。

    r 为待执行任务;executor 为线程池;方法可能会抛出拒绝异常。

    那么咱们就可以通过实现 RejectedExecutionHandler 接口扩展

    两个栗子:一

    netty自己实现的线程池里面私有的一个拒绝策略。单独启动一个新的临时线程来执行任务。

    1. private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
    2. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    3. try {
    4. final Thread t = new Thread(r, "Temporary task executor");
    5. t.start();
    6. } catch (Throwable e) {
    7. throw new RejectedExecutionException(
    8. "Failed to start a new thread", e);
    9. }
    10. }
    11. }

    两个栗子:二

    dubbo的一个例子,它直接继承的 AbortPolicy ,加强了日志输出,并且输出dump文件

    1. public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    2. @Override
    3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    4. String msg = String.format("Thread pool is EXHAUSTED!" +
    5. " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
    6. " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
    7. threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
    8. e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
    9. url.getProtocol(), url.getIp(), url.getPort());
    10. logger.warn(msg);
    11. dumpJStack();
    12. throw new RejectedExecutionException(msg);
    13. }
    14. }

    自己玩

    参考类似的思路,最简单的做法,我们可以直接定义一个RejectedExecutionHandler,当队列满时改为调用BlockingQueue.put来实现生产者的阻塞:

    1. new RejectedExecutionHandler() {
    2. @Override
    3. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    4. if (!executor.isShutdown()) {
    5. try {
    6. executor.getQueue().put(r);
    7. } catch (InterruptedException e) {
    8. // should not be interrupted
    9. }
    10. }
    11. }
    12. };

    这样,我们就无需再关心Queue和Consumer的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。

    相比最初的设计,这种方式的代码量能减少不少,而且能避免并发环境的很多问题。当然,你也可以采用另外的手段,例如在提交时采用信号量做入口限制等,但是如果仅仅是要让生产者阻塞,那就显得复杂了。

    总结

    四种线程池拒绝策略,具体使用哪种策略,还得根据实际业务场景才能做出抉择。

  • 相关阅读:
    vite react react-pdf pdfjs-dist 加载不全的解决方案 cmaps本地路径
    loganalyzer 展示数据库中的日志
    VSCode中打开md文件的智能提示
    P - Balanced Stone Heaps
    React报错之Expected `onClick` listener to be a function
    Pytorch多机多卡的多种打开方式
    QT基础学习
    实现过滤词汇高亮
    Redhat(5)-ansible-loop-handler-errors-tags
    买房怎样申请贷款
  • 原文地址:https://blog.csdn.net/baidu_37366055/article/details/125535868