• xxl-job源码解读:触发器Trigger


    本文基于xxl-job的2.3.1版本

    基本说明

    触发器主要用于执行任务,主要包括以下功能:

    1. 处理路由策略,选择执行器,确定执行器路由,包括分片广播
    2. 远程调用执行器,触发任务的执行
    3. 调用日志模块,记录执行日志

    触发器只负责远程调用执行器,执行结果为异步回调,不由触发器直接处理。

    这部分流程逻辑很简单,重点在不同路由策略的执行器路由选取。

    代码功能解读

    xxl-job触发器代码主要在 com.xxl.job.admin.core.trigger.XxlJobTrigger

    代码逻辑流程图

    在这里插入图片描述

    主流程解析

    参数预处理,根据是否分片广播判断是否循环调用执行器。

        /**
         * trigger job
         *
         * @param jobId                 任务ID
         * @param triggerType           触发器类型
         * @param failRetryCount        >=0: use this param
         *                              <0: use param from job info config
         * @param executorShardingParam 分片执行参数-格式为: 分片序号/分片总数 例如两个实例的第一台: 0/2
         * @param executorParam         null: use job param
         *                              not null: cover job param
         * @param addressList           null: use executor addressList
         *                              not null: cover
         */
        public static void trigger(int jobId,
                                   TriggerTypeEnum triggerType,
                                   int failRetryCount,
                                   String executorShardingParam,
                                   String executorParam,
                                   String addressList) {
    
            // 从 xxl_job_info 获取任务信息
            XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
            if (jobInfo == null) {
                logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
                return;
            }
            if (executorParam != null) {
                jobInfo.setExecutorParam(executorParam);
            }
            int finalFailRetryCount = failRetryCount >= 0 ? failRetryCount : jobInfo.getExecutorFailRetryCount();
    
            // 根据执行器ID,获取执行器信息
            XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
    
            // cover addressList
            if (addressList != null && addressList.trim().length() > 0) {
                group.setAddressType(1);
                group.setAddressList(addressList.trim());
            }
    
            // sharding param
            int[] shardingParam = null;
            if (executorShardingParam != null) {
                // 判断分片参数 为空或者格式不对时,设置默认分片参数: 0/1 (表示总执行器一台,第一台执行)
                String[] shardingArr = executorShardingParam.split("/");
                if (shardingArr.length == 2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
                    shardingParam = new int[2];
                    shardingParam[0] = Integer.parseInt(shardingArr[0]);
                    shardingParam[1] = Integer.parseInt(shardingArr[1]);
                }
            }
    
            // 判断路由策略,是否分片广播执行
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                    && group.getRegistryList() != null && !group.getRegistryList().isEmpty()
                    && shardingParam == null) {
                for (int i = 0; i < group.getRegistryList().size(); i++) {
                    processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
                }
            } else {
                if (shardingParam == null) {
                    shardingParam = new int[]{0, 1};
                }
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
            }
    
        }
    
        private static boolean isNumeric(String str) {
            try {
                Integer.valueOf(str);
                return true;
            } catch (NumberFormatException e) {
                return false;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    根据路由策略判断,选择调度地址,远程调用执行器触发任务执行。同时记录调度日志。

        /**
         * @param group               job group, registry list may be empty
         * @param jobInfo             任务信息(已填充执行参数)
         * @param finalFailRetryCount 失败重试次数
         * @param triggerType         触发器类型
         * @param index               分片广播-序号
         * @param total               分片广播-总分片数(执行器的注册实例总数)
         */
        private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total) {
    
            // param
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
            ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
            String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) ? String.valueOf(index).concat("/").concat(String.valueOf(total)) : null;
    
            // 1、save log-id
            XxlJobLog jobLog = new XxlJobLog();
            jobLog.setJobGroup(jobInfo.getJobGroup());
            jobLog.setJobId(jobInfo.getId());
            jobLog.setTriggerTime(new Date());
            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
            logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
    
            // 2、init trigger-param 任务参数复制
            TriggerParam triggerParam = new TriggerParam();
            triggerParam.setJobId(jobInfo.getId());
            triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
            triggerParam.setExecutorParams(jobInfo.getExecutorParam());
            triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
            triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
            triggerParam.setLogId(jobLog.getId());
            triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
            triggerParam.setGlueType(jobInfo.getGlueType());
            triggerParam.setGlueSource(jobInfo.getGlueSource());
            triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
            triggerParam.setBroadcastIndex(index);
            triggerParam.setBroadcastTotal(total);
    
            // 3、init address 根据路由策略选择调度地址
            String address = null;
            ReturnT<String> routeAddressResult = null;
            if (group.getRegistryList() != null && !group.getRegistryList().isEmpty()) {
                if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                    if (index < group.getRegistryList().size()) {
                        address = group.getRegistryList().get(index);
                    } else {
                        address = group.getRegistryList().get(0);
                    }
                } else {
                    routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                    if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                        address = routeAddressResult.getContent();
                    }
                }
            } else {
                routeAddressResult = new ReturnT<>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
            }
    
            // 4、trigger remote executor
            ReturnT<String> triggerResult;
            if (address != null) {
                triggerResult = runExecutor(triggerParam, address);
            } else {
                triggerResult = new ReturnT<>(ReturnT.FAIL_CODE, null);
            }
    
            // 5、collection trigger info
            /** 日志信息拼接, 省略 **/
            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
    
            logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
        }
    
        /**
         * 触发任务执行
         *
         * @param triggerParam 任务参数
         * @param address      执行器地址
         * @return 执行结果
         */
        public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {
        	/** 远程调用执行器, 省略 **/
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    分片广播说明

    分片广播为路由策略的一种,但是较其他的路由策略区别较大。

    该特性适用场景如:

    1. 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;

    2. 广播任务场景:广播执行器机器运行shell脚本、广播集群节点进行缓存更新等。

    以第一种场景为例,假设有10个执行器。触发器会循环10次,分别请求各个执行器,total 为 10,index 分别为0~9。

    执行器需要根据index,将10w数据分为十份,各个执行器各自取一份,不交叉,进行分开处理,提高效率。

  • 相关阅读:
    .net第二章数据类型、变量和常量
    思维导图之规范与重构
    挖掘数据价值,华为云大数据BI解决方案有绝招
    Unity-WebGL基于JS实现网页录音
    API_异常,数组_方法_面向对象,220814,,
    uniapp 自定义tabbar页面不刷新
    PAT A1006 Sign In and Sign Out
    [Machine Learning][Part 2]监督学习的实现
    什么是一致性哈希算法?一致性哈希算法原理刨析
    开发潜能52个方法
  • 原文地址:https://blog.csdn.net/Azhuzhu_chaste/article/details/126855151