本文基于xxl-job的2.3.1版本
触发器主要用于执行任务,主要包括以下功能:
触发器只负责远程调用执行器,执行结果为异步回调,不由触发器直接处理。
这部分流程逻辑很简单,重点在不同路由策略的执行器路由选取。
xxl-job触发器代码主要在 com.xxl.job.admin.core.trigger.XxlJobTrigger 中

参数预处理,根据是否分片广播判断是否循环调用执行器。
/**
* trigger job
*
* @param jobId 任务ID
* @param triggerType 触发器类型
* @param failRetryCount >=0: use this param
* <0: use param from job info config
* @param executorShardingParam 分片执行参数-格式为: 分片序号/分片总数 例如两个实例的第一台: 0/2
* @param executorParam null: use job param
* not null: cover job param
* @param addressList null: use executor addressList
* not null: cover
*/
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
// 从 xxl_job_info 获取任务信息
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
int finalFailRetryCount = failRetryCount >= 0 ? failRetryCount : jobInfo.getExecutorFailRetryCount();
// 根据执行器ID,获取执行器信息
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// cover addressList
if (addressList != null && addressList.trim().length() > 0) {
group.setAddressType(1);
group.setAddressList(addressList.trim());
}
// sharding param
int[] shardingParam = null;
if (executorShardingParam != null) {
// 判断分片参数 为空或者格式不对时,设置默认分片参数: 0/1 (表示总执行器一台,第一台执行)
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length == 2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.parseInt(shardingArr[0]);
shardingParam[1] = Integer.parseInt(shardingArr[1]);
}
}
// 判断路由策略,是否分片广播执行
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList() != null && !group.getRegistryList().isEmpty()
&& shardingParam == null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else {
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
private static boolean isNumeric(String str) {
try {
Integer.valueOf(str);
return true;
} catch (NumberFormatException e) {
return false;
}
}
根据路由策略判断,选择调度地址,远程调用执行器触发任务执行。同时记录调度日志。
/**
* @param group job group, registry list may be empty
* @param jobInfo 任务信息(已填充执行参数)
* @param finalFailRetryCount 失败重试次数
* @param triggerType 触发器类型
* @param index 分片广播-序号
* @param total 分片广播-总分片数(执行器的注册实例总数)
*/
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total) {
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) ? String.valueOf(index).concat("/").concat(String.valueOf(total)) : null;
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、init trigger-param 任务参数复制
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// 3、init address 根据路由策略选择调度地址
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList() != null && !group.getRegistryList().isEmpty()) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、trigger remote executor
ReturnT<String> triggerResult;
if (address != null) {
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<>(ReturnT.FAIL_CODE, null);
}
// 5、collection trigger info
/** 日志信息拼接, 省略 **/
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
/**
* 触发任务执行
*
* @param triggerParam 任务参数
* @param address 执行器地址
* @return 执行结果
*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {
/** 远程调用执行器, 省略 **/
}
分片广播为路由策略的一种,但是较其他的路由策略区别较大。
该特性适用场景如:
分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
广播任务场景:广播执行器机器运行shell脚本、广播集群节点进行缓存更新等。
以第一种场景为例,假设有10个执行器。触发器会循环10次,分别请求各个执行器,total 为 10,index 分别为0~9。
执行器需要根据index,将10w数据分为十份,各个执行器各自取一份,不交叉,进行分开处理,提高效率。