• xxl-job源码解读:触发器线程池TriggerPool


    xxl-job源码解读:触发器线程池TriggerPool

    本文基于xxl-job的2.3.1版本

    基本说明

    作为触发器调用的统一入口,为触发器的调用提供线程池异步处理,并根据触发时间进行线程池的区分。

    在不进行源码改动的情况下,共有四个地方会调用触发器JobTriggerPoolHelper.trigger

    1. 调度器触发执行:由定时任务的触发器正常调度
    2. 页面手动触发执行:从任务信息页面,点击执行一次,手动触发执行
    3. 失败监听器触发执行:如果任务执行失败,并且任务设置了失败重试次数,会根据重试次数再次调用触发器执行
    4. 父任务成功触发执行:设置了父子任务的情况下,父任务成功后,会由XxlJobCompleter 触发调用子任务的触发器执行

    触发器源码解读

    JobTriggerPoolHelper 负责分配触发器线程池,并作为触发器调用的统一入口 。

    类名全路径:com.xxl.job.admin.core.thread.JobTriggerPoolHelper

    代码逻辑流程图

    在这里插入图片描述

    源码解读

    调用入口方法

        /**
         * 为任务添加一个触发器
         * 

    可用于立即执行一次

    * * @param jobId 触发的任务ID * @param triggerType 添加的触发器类型 * @param failRetryCount >=0: use this param * <0: use param from job info config * @param executorShardingParam 分片执行参数 * @param executorParam null: use job param * not null: cover job param */
    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    线程池区分+超时判断

    快慢线程池定义,区别在队列大小,可以通过配置修改最大线程数

        // fast/slow thread pool
        private ThreadPoolExecutor fastTriggerPool = null;
        private ThreadPoolExecutor slowTriggerPool = null;
    
        public void start() {
            fastTriggerPool = new ThreadPoolExecutor(
                    10,
                    XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(1000),
                    r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()));
    
            slowTriggerPool = new ThreadPoolExecutor(
                    10,
                    XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(2000),
                    r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    分配线程池进行执行。一分钟为周期,进行超时任务统计,高频调用的并且触发执行时间长的会被转移到慢线程池。

        // job timeout count
        private volatile long minTim = System.currentTimeMillis() / 60000;     // ms > min
        private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
    
    
        /**
         * add trigger
         */
        public void addTrigger(final int jobId,
                               final TriggerTypeEnum triggerType,
                               final int failRetryCount,
                               final String executorShardingParam,
                               final String executorParam,
                               final String addressList) {
    
            // choose thread pool
            ThreadPoolExecutor triggerPool = fastTriggerPool;
            AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
            // 同一个任务 一分钟内任务触发超500ms 十次,转入慢线程池处理
            if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
                triggerPool = slowTriggerPool;
            }
    
            // trigger
            triggerPool.execute(() -> {
    
                long start = System.currentTimeMillis();
    
                try {
                    // do trigger
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {
    
                    // 检查时间循环,每整分钟清空一次超时触发集合
                    long minTimNow = System.currentTimeMillis() / 60000;
                    if (minTim != minTimNow) {
                        minTim = minTimNow;
                        jobTimeoutCountMap.clear();
                    }
    
                    // incr timeout-count-map
                    long cost = System.currentTimeMillis() - start;
                    if (cost > 500) {       // ob-timeout threshold 500ms
                        // 根据JobId进行统计, 任务触发超过500ms认为超时, 统计超时次数
                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                        if (timeoutCount != null) {
                            timeoutCount.incrementAndGet();
                        }
                    }
    
                }
    
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
  • 相关阅读:
    Swift 创建一个自己的命名空间
    EMC原理 传导(共模 差模) 辐射(近场 远场) 详解
    记一次MySQL5初始化被kill的问题排查
    爬虫 | 正则、Xpath、BeautifulSoup示例学习
    基于nodejs的二手物物交换平台【毕业设计源码】
    [开源项目]可观测、易使用的SpringBoot线程池
    19异常的学习笔记
    wireshark使用host文件做IP域名解析
    如何使用AI图片清晰度增强器软件增强和锐化图片、提高照片清晰度并去除噪点
    【Web安全靶场】sqli-labs-master 38-53 Stacked-Injections
  • 原文地址:https://blog.csdn.net/Azhuzhu_chaste/article/details/126834876