本文基于xxl-job的2.3.1版本
作为触发器调用的统一入口,为触发器的调用提供线程池异步处理,并根据触发时间进行线程池的区分。
在不进行源码改动的情况下,共有四个地方会调用触发器JobTriggerPoolHelper.trigger
XxlJobCompleter
触发调用子任务的触发器执行JobTriggerPoolHelper
负责分配触发器线程池,并作为触发器调用的统一入口 。
类名全路径:com.xxl.job.admin.core.thread.JobTriggerPoolHelper
调用入口方法:
/**
* 为任务添加一个触发器
* 可用于立即执行一次
*
* @param jobId 触发的任务ID
* @param triggerType 添加的触发器类型
* @param failRetryCount >=0: use this param
* <0: use param from job info config
* @param executorShardingParam 分片执行参数
* @param executorParam null: use job param
* not null: cover job param
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
线程池区分+超时判断 :
快慢线程池定义,区别在队列大小,可以通过配置修改最大线程数
// fast/slow thread pool
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;
public void start() {
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()));
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2000),
r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()));
}
分配线程池进行执行。一分钟为周期,进行超时任务统计,高频调用的并且触发执行时间长的会被转移到慢线程池。
// job timeout count
private volatile long minTim = System.currentTimeMillis() / 60000; // ms > min
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
/**
* add trigger
*/
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
ThreadPoolExecutor triggerPool = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
// 同一个任务 一分钟内任务触发超500ms 十次,转入慢线程池处理
if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool = slowTriggerPool;
}
// trigger
triggerPool.execute(() -> {
long start = System.currentTimeMillis();
try {
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// 检查时间循环,每整分钟清空一次超时触发集合
long minTimNow = System.currentTimeMillis() / 60000;
if (minTim != minTimNow) {
minTim = minTimNow;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis() - start;
if (cost > 500) { // ob-timeout threshold 500ms
// 根据JobId进行统计, 任务触发超过500ms认为超时, 统计超时次数
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
});
}