• 分布式定时任务框架 xxl-job 源码解析


    xxl-job 架构

    xxl-job 总体可以分成三部分:执行器管理,任务管理,调度中心等。

    在这里插入图片描述

    • 执行器管理:这一部分主要是用来管理任务执行器,比如一个任务是交个哪个节点(机器)进行执行,这个执行任务的节点可以称作是执行器。
    • 任务管理:这一部分主要是业务处理逻辑,一般会由 @XXLJob 注解修饰。
    • 调用中心:将某个任务路由(分配)到某个或某些执行器上执行。

    从上述架构可以看出,所有的任务都被调度中心统一管理,什么时候执行,交个哪个节点执行都是调度中心来决定。所以这里就需要执行器和调度中心进行一定的远程通信,xxl-job 通过 Netty 实现了一套自己的远程通信协议

    这里的执行器就相当于是客户端,然后调度中心相当于是服务端,所以看源码就可以抓住这点一点点切入即可。

    源码解析

    先看配置文件 XxlJobAdminConfig 这个相当于是客户端入口,源码如下:

    @Component
    public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
    
        private XxlJobScheduler xxlJobScheduler;
    
        @Override
        public void afterPropertiesSet() throws Exception {
            adminConfig = this;
    
            xxlJobScheduler = new XxlJobScheduler();
            xxlJobScheduler.init();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在代码启动时,这个类会被加载,并且会回调 afterPropertiesSet() 方法,然后进入 XxlJobScheduler 类的 init() 方法,代码如下:

    public class XxlJobScheduler  {
        private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
    
    
        public void init() throws Exception {
            // 国际化处理
            initI18n();
    
            // 只是创建了两个备用的线程池 fastTriggerPool、slowTriggerPool 后面会使用到
            // fastTriggerPool 主要用来执行业务 handler
            // slowTriggerPool 当在 fastTriggerPool 线程池中分配线程执行的超时次数超过了 10 次
            // 就会交给这个慢线程池处理,减轻 fastTriggerPool 线程池的工作压力
            JobTriggerPoolHelper.toStart();
    
            // 心跳检测监控,把 xxl_job_registry 中 update_time 字段更新成最新当前时间
            // 然后再更新表 xxl_job_group 中的 addressList 字段
            // 如果 update_time 字段超过了 90s 没有更新的话,就把对应的 jobHandler 踢出
            // 相当于检测到此时的执行器挂了
            JobRegistryHelper.getInstance().start();
    
            // 每次捞取出 1000 条失败的任务,然后发送通知,比如邮件通知等
            JobFailMonitorHelper.getInstance().start();
    
            // admin lose-monitor run ( depend on JobTriggerPoolHelper )
            JobCompleteHelper.getInstance().start();
    
            // admin log report start
            JobLogReportHelper.getInstance().start();
    		
    		// xxl-job 的核心处理逻辑
            // 分配一个线程,循环捞取哪些要执行的任务,然后路由到对应的执行器执行
            JobScheduleHelper.getInstance().start();
    
            logger.info(">>>>>>>>> init xxl-job admin success.");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    然后进入 JobScheduleHelper.getInstance().start() 代码如下:

    public class JobScheduleHelper {
    
        private static JobScheduleHelper instance = new JobScheduleHelper();
        public static JobScheduleHelper getInstance(){
            return instance;
        }
    
        public static final long PRE_READ_MS = 5000;    // pre read
    
        private Thread scheduleThread;
        private Thread ringThread;
        private volatile boolean scheduleThreadToStop = false;
        private volatile boolean ringThreadToStop = false;
        // 用来存放多少秒后需要执行的任务
        private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
    
        public void start(){
            // schedule thread
            scheduleThread = new Thread(new Runnable() {
                @Override
                public void run() {
                
                    try {
      					// 启动线程延迟 5s 后开始执行
                        TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                    } catch (InterruptedException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
    
                    // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                    int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
    
                    while (!scheduleThreadToStop) {
    
                        // 开始去扫描 job
                        long start = System.currentTimeMillis();
    
                        Connection conn = null;
                        Boolean connAutoCommit = null;
                        PreparedStatement preparedStatement = null;
    
                        boolean preReadSuc = true;
                        try {
    
                            conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                            connAutoCommit = conn.getAutoCommit();
                            conn.setAutoCommit(false);
    
                            // 这里加锁,从侧面说明了这个调度中心也可以实现高可用,并且已经加了悲观锁保证数据安全性
                            preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                            preparedStatement.execute();
    
    						// 事物开启,并且设置成手动提交
                            // 1、预读 xxl_job_info 数据,在 nextTriggerTime 的基础上后推迟 5s,提前做好准别
                            // 2、这里讲返回的数据分成三部分 |---5---10---15---|
                            long nowTime = System.currentTimeMillis();
                            // 假设现在 nowTime=10,那么现在这条语句会查询出 <= 15s 的数据
                            List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                            if (scheduleList!=null && scheduleList.size( )>0) {
          
                                // 循环处理查询出的 job—info 数据
                                for (XxlJobInfo jobInfo: scheduleList) {
    
                                    // 当前时间已经超过了下次执行时间+5s,说明这次是不能执行的
                                    // 因为已经超过了 5s,我们捞取的数据只是在当前时间+5s之内的数据
                                    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                        // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                        logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
    
                                        // 1、misfire match
                                        MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                        if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                            // FIRE_ONCE_NOW 》 trigger
                                            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                        }
    
                                        // 2、刷新下次执行的时间
                                        refreshNextValidTime(jobInfo, new Date());
    
                                    } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                       // 当前时间大于了下次执行时间,并且小于下次执行时间+5s,说明肯定要执行这个业务逻辑了
    
                                        // 1、开始发执行路由规则定位到具体的节点执行
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
    
                                        // 2、执行完之后刷新下次执行的时间
                                        refreshNextValidTime(jobInfo, new Date());
    
                                        // next-trigger-time in 5s, pre-read again
                                        if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
    
                                            // 1、make ring second
                                            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                            // 2、push time ring
                                            pushTimeRing(ringSecond, jobInfo.getId());
    
                                            // 3、fresh next
                                            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                        }
    
                                    } else {
                                        // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
    
                                        // 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());
    
                                        // 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                    }
    
                                }
    
                                // 3、update trigger info
                                for (XxlJobInfo jobInfo: scheduleList) {
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                                }
    
                            } else {
                                preReadSuc = false;
                            }
    
                            // tx stop
    
    
                        } catch (Exception e) {
                            if (!scheduleThreadToStop) {
                                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                            }
                        } finally {
                             conn.commit();
                        }
                        long cost = System.currentTimeMillis()-start;
    
                        // Wait seconds, align second
                        if (cost < 1000) {  // scan-overtime, not wait
                            try {
                                // pre-read period: success > scan each second; fail > skip this period;
                                TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                            } catch (InterruptedException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
                    }
                }
            });
            scheduleThread.setDaemon(true);
            scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
            scheduleThread.start();
    
    
            // 这个循环线程,执行由上述线程添加到 ringItemData map 中的数据
            ringThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    while (!ringThreadToStop) {
    
                        // align second
                        try {
                            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                        } catch (InterruptedException e) {
                            if (!ringThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
    
                        try {
                            // second data
                            List<Integer> ringItemData = new ArrayList<>();
                            int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                            for (int i = 0; i < 2; i++) {
                                List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                                if (tmpData != null) {
                                    ringItemData.addAll(tmpData);
                                }
                            }
    
                            // ring trigger
                            if (ringItemData.size() > 0) {
                                // do trigger
                                for (int jobId: ringItemData) {
                                    // do trigger
                                    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                                }
                                // clear
                                ringItemData.clear();
                            }
                        } catch (Exception e) {
                            if (!ringThreadToStop) {
                                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                            }
                        }
                    }
                }
            });
            ringThread.setDaemon(true);
            ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
            ringThread.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211

    进入 trigger 内部,代码如下:

        public static void trigger(int jobId,
                                  TriggerTypeEnum triggerType,
                                  int failRetryCount,
                                  String executorShardingParam,
                                  String executorParam,
                                  String addressList) {
    
           // 查询 xxl_job_info 中的数据
           XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
           if (jobInfo == null) {
               logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
               return;
           }
           // 判断是否带指定的参数,如果有参数,就赋值到 ExecutorParam 字段
           if (executorParam != null) {
               jobInfo.setExecutorParam(executorParam);
           }
           // 重试次数
           int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
           XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
    
           // 如果是手动传入了执行器地址列表,就直接使用传入的 addressList
           if (addressList!=null && addressList.trim().length()>0) {
               // 如果是手动注入 addressType=1,如果是自动注入 addressType=0,具体可以查看类 XxlJobGroup
               group.setAddressType(1);
               group.setAddressList(addressList.trim());
           }
    
           //  分片参数
           int[] shardingParam = null;
           if (executorShardingParam!=null){
               // 可以查看 executorShardingParam 从哪里传进来的,发现只有一个地方,那就是失败监控线程会重试
               // 然后这里就要开始进行解析这些分片参数,正常运行这里都不会进来的,外部调用这里 executorShardingParam 参数都是 null
               String[] shardingArr = executorShardingParam.split("/");
               if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
                   shardingParam = new int[2];
                   shardingParam[0] = Integer.valueOf(shardingArr[0]);
                   shardingParam[1] = Integer.valueOf(shardingArr[1]);
               }
           }
           // 分配类型为广播模式:
           //  1.执行器 addressList 地址列表不能为空,因为分配要路由到指定的执行器上
           if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                   && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
                   && shardingParam==null) {
               // 获取到了所有的执行器地址列表,然后开始轮询调用
               for (int i = 0; i < group.getRegistryList().size(); i++) {
                   processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
               }
           } else {
               if (shardingParam == null) {
                   shardingParam = new int[]{0, 1};
               }
               processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
           }
    
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    在进入 processTrigger() 内部

            // 4、trigger remote executor
            ReturnT<String> triggerResult = null;
            if (address != null) {
                triggerResult = runExecutor(triggerParam, address);
            } else {
                triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在进入 runExecutor() 方法,代码如下:

        public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
            ReturnT<String> runResult = null;
            try {
            	// 创建可以调用远程服务的客户端对象
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                runResult = executorBiz.run(triggerParam);
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
                runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
            }
    
            StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
            runResultSB.append("
    address:"
    ).append(address); runResultSB.append("
    code:"
    ).append(runResult.getCode()); runResultSB.append("
    msg:"
    ).append(runResult.getMsg()); runResult.setMsg(runResultSB.toString()); return runResult; }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    进入 run() 方法,如下:

    public class ExecutorBizClient implements ExecutorBiz {
    
        public ExecutorBizClient() {
        }
        public ExecutorBizClient(String addressUrl, String accessToken) {
            this.addressUrl = addressUrl;
            this.accessToken = accessToken;
    
            // valid
            if (!this.addressUrl.endsWith("/")) {
                this.addressUrl = this.addressUrl + "/";
            }
        }
    
        private String addressUrl ;
        private String accessToken;
        private int timeout = 3;
    
    
        @Override
        public ReturnT<String> beat() {
            return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class);
        }
    
        @Override
        public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){
            return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class);
        }
    
        @Override
        public ReturnT<String> run(TriggerParam triggerParam) {
            System.out.println("ExecutorBizClient.....走了 run 类型的 client 请求.....");
            return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
        }
    
        @Override
        public ReturnT<String> kill(KillParam killParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class);
        }
    
        @Override
        public ReturnT<LogResult> log(LogParam logParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    进入 postBody() 方法,代码如下:

       public static ReturnT postBody(String url, String accessToken, int timeout, Object requestObj, Class returnTargClassOfT) {
            System.out.println("---------->url="+url+",requestObj="+requestObj+",returnTargClassOfT="+returnTargClassOfT);
            LocalDateTime now = LocalDateTime.now();
            System.out.println(Thread.currentThread().getName()+",now="+now+"url="+url+"requestObj = " + requestObj+",returnTargClassOfT="+returnTargClassOfT);
            HttpURLConnection connection = null;
            BufferedReader bufferedReader = null;
            try {
                // connection
                URL realUrl = new URL(url);
                connection = (HttpURLConnection) realUrl.openConnection();
    
                // trust-https
                boolean useHttps = url.startsWith("https");
                if (useHttps) {
                    HttpsURLConnection https = (HttpsURLConnection) connection;
                    trustAllHosts(https);
                }
    
                // connection setting
                connection.setRequestMethod("POST");
                connection.setDoOutput(true);
                connection.setDoInput(true);
                connection.setUseCaches(false);
                connection.setReadTimeout(timeout * 1000);
                connection.setConnectTimeout(3 * 1000);
                connection.setRequestProperty("connection", "Keep-Alive");
                connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
                connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
    
                if(accessToken!=null && accessToken.trim().length()>0){
                    connection.setRequestProperty(XXL_JOB_ACCESS_TOKEN, accessToken);
                }
    
                // do connection
                connection.connect();
    
                // write requestBody
                if (requestObj != null) {
                	// 封装数据,requestObj 就是下面的图示 triggerParam 参数
                    String requestBody = GsonTool.toJson(requestObj);
    
                    DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                    dataOutputStream.write(requestBody.getBytes("UTF-8"));
                    dataOutputStream.flush();
                    dataOutputStream.close();
                }
    
                /*byte[] requestBodyBytes = requestBody.getBytes("UTF-8");
                connection.setRequestProperty("Content-Length", String.valueOf(requestBodyBytes.length));
                OutputStream outwritestream = connection.getOutputStream();
                outwritestream.write(requestBodyBytes);
                outwritestream.flush();
                outwritestream.close();*/
    
                // valid StatusCode
                int statusCode = connection.getResponseCode();
                if (statusCode != 200) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-job remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url);
                }
    
                // result
                bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
                StringBuilder result = new StringBuilder();
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    result.append(line);
                }
                String resultJson = result.toString();
                System.out.println("读取到服务端返回的结果是 result="+resultJson);
    
                // parse returnT
                try {
                    ReturnT returnT = GsonTool.fromJson(resultJson, ReturnT.class, returnTargClassOfT);
                    return returnT;
                } catch (Exception e) {
                    logger.error("xxl-job remoting (url="+url+") response content invalid("+ resultJson +").", e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-job remoting (url="+url+") response content invalid("+ resultJson +").");
                }
    
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-job remoting error("+ e.getMessage() +"), for url : " + url);
            } finally {
                try {
                    if (bufferedReader != null) {
                        bufferedReader.close();
                    }
                    if (connection != null) {
                        connection.disconnect();
                    }
                } catch (Exception e2) {
                    logger.error(e2.getMessage(), e2);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95

    这里封装的请求参数包括了这个非常中的 triggerParam,如图示:
    在这里插入图片描述

    这里开始发送客户端请求,也就是这里开始和服务端开始建立通信的,然后服务端接受代码如下:

    public class EmbedServer {
        private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);
    
            @Override
            protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
                System.out.println("开始执行 channelRead0() 方法.............msg="+msg);
                // request parse
                //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
                String requestData = msg.content().toString(CharsetUtil.UTF_8);
                String uri = msg.uri();
                HttpMethod httpMethod = msg.method();
                boolean keepAlive = HttpUtil.isKeepAlive(msg);
                String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
    
                // invoke
                bizThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 开始收到调度中心发送过来的请求,执行对象的 handler
                        Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
    
                        // to json
                        String responseJson = GsonTool.toJson(responseObj);
    
                        // 把执行结果 responseJson 发送给客户端(调度中心)
                        writeResponse(ctx, keepAlive, responseJson);
                    }
                });
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    然后调用 process() 方法,代码如下:

            private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
                // valid
                if (HttpMethod.POST != httpMethod) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
                }
                if (uri == null || uri.trim().length() == 0) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
                }
                if (accessToken != null
                        && accessToken.trim().length() > 0
                        && !accessToken.equals(accessTokenReq)) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
                }
    
                // services mapping
                try {
                    switch (uri) {
                        case "/beat":
                            return executorBiz.beat();
                        case "/idleBeat":
                            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                            return executorBiz.idleBeat(idleBeatParam);
                        case "/run":
                            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                            System.out.println("channelRead0() 方法收到客户端请求,接收到的请求参数是 triggerParam="+triggerParam);
                            return executorBiz.run(triggerParam);
                        case "/kill":
                            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                            return executorBiz.kill(killParam);
                        case "/log":
                            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                            return executorBiz.log(logParam);
                        default:
                            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    计入到服务端 run() 方法,代码如下:

        @Override
        public ReturnT<String> run(TriggerParam triggerParam) {
            // load old:jobHandler + jobThread
            JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
            IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    		// 省略了所有的判断条件,然后进入到 registJobThread() 方法
            // 开启线程开始异步执行客户端请求
            if (jobThread == null) {
                jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
            }
    
            // 把客户端发送过来的参数存放到阻塞队列中 LinkedBlockingQueue triggerQueue;
            ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
            return pushResult;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    然后进入到 registJobThread() 方法,代码如下:

        public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
            JobThread newJobThread = new JobThread(jobId, handler);
            // 开启了一个线程,那么肯定会执行 JobThread 类里面的 run 方法
            newJobThread.start();
            logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
    
            JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
            if (oldJobThread != null) {
                oldJobThread.toStop(removeOldReason);
                oldJobThread.interrupt();
            }
    
            return newJobThread;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    进入到 JobThread 的 run() 方法,代码如下:

        @Override
    	public void run() {
    
        	// init
        	try {
    			handler.init();
    		} catch (Throwable e) {
        		logger.error(e.getMessage(), e);
    		}
    
    		// 这里可以看出来一个线程在这里死循环,这样写主要是复用这个线程
    		while(!toStop){
    			running = false;
    			// 记录这个线程在这里面空转了几次
    			// 如果空转了 30 次,那么就会把这个线程直接终结掉
    			idleTimes++;
    
                TriggerParam triggerParam = null;
                try {
    				// 直接从阻塞队列中取出 triggerParam 任务参数,然后开始执行对应业务处理器
    				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
    				if (triggerParam!=null) {
    					running = true;
    					idleTimes = 0;
    					triggerLogIdSet.remove(triggerParam.getLogId());
    
    					// log filename, like "logPath/yyyy-MM-dd/9999.log"
    					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
    					XxlJobContext xxlJobContext = new XxlJobContext(
    							triggerParam.getJobId(),
    							triggerParam.getExecutorParams(),
    							logFileName,
    							triggerParam.getBroadcastIndex(),
    							triggerParam.getBroadcastTotal());
    
    					// init job context
    					XxlJobContext.setXxlJobContext(xxlJobContext);
    
    					// execute
    					XxlJobHelper.log("
    ----------- xxl-job job execute start -----------
    ----------- Param:"
    + xxlJobContext.getJobParam()); // 如果设置在 admin 管理页面中设置了执行器超时时间处理,就走下面这个 if 逻辑 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { // 使用 Future 框架实现超时机制 FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() { @Override public Boolean call() throws Exception { // init job context XxlJobContext.setXxlJobContext(xxlJobContext); handler.execute(); return true; } }); futureThread = new Thread(futureTask); futureThread.start(); // 调用 get(Timeout) 方法,在指定超时间没有返回结果直接抛出异常 Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { // 捕获 get() 超时抛出异常 XxlJobHelper.log("
    ----------- xxl-job job execute timeout"
    ); XxlJobHelper.log(e); // handle result XxlJobHelper.handleTimeout("job execute timeout "); } finally { // 并且中断在 Callback 中开启的 futureThread 该线程 futureThread.interrupt(); } } else { // 最终最终调用我们编写的业务执行器,其实就是调用我们之前的定时器而已 handler.execute(); } // valid execute handle data if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) { XxlJobHelper.handleFail("job handle result lost."); } else { String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg(); tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000) ?tempHandleMsg.substring(0, 50000).concat("...") :tempHandleMsg; XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg); } XxlJobHelper.log("
    ----------- xxl-job job execute end(finish) -----------
    ----------- Result: handleCode="
    + XxlJobContext.getXxlJobContext().getHandleCode() + ", handleMsg = " + XxlJobContext.getXxlJobContext().getHandleMsg() ); } else { // 超过30次空转,就把该线程中断 if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } } catch (Throwable e) { if (toStop) { XxlJobHelper.log("
    ----------- JobThread toStop, stopReason:"
    + stopReason); } // handle result StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String errorMsg = stringWriter.toString(); XxlJobHelper.handleFail(errorMsg); XxlJobHelper.log("
    ----------- JobThread Exception:"
    + errorMsg + "
    ----------- xxl-job job execute end(error) -----------"
    ); } finally { if(triggerParam != null) { // callback handler info if (!toStop) { // 把 jobHandler 执行完之后的结果返回客户端(调度中心) TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.getXxlJobContext().getHandleCode(), XxlJobContext.getXxlJobContext().getHandleMsg() ) ); } else { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_CODE_FAIL, stopReason + " [job running, killed]" ) ); } } } } // callback trigger request in queue while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_CODE_FAIL, stopReason + " [job not executed, in the job queue, killed.]") ); } } // destroy try { handler.destroy(); } catch (Throwable e) { logger.error(e.getMessage(), e); } logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160

    进入 TriggerCallbackThread 回调线程,代码如下:

      public void start() {
    
            // valid
            if (XxlJobExecutor.getAdminBizList() == null) {
                logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
                return;
            }
    
            // callback
            triggerCallbackThread = new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    // normal callback
                    while(!toStop){
                        try {
                        	// 取出阻塞队列中的值
                            HandleCallbackParam callback = getInstance().callBackQueue.take();
                            if (callback != null) {
    
                                // callback list param
                                List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                                int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                                callbackParamList.add(callback);
    
                                // callback, will retry if error
                                if (callbackParamList!=null && callbackParamList.size()>0) {
                                    doCallback(callbackParamList);
                                }
                            }
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
    
                    // last callback
                    try {
                        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                        if (callbackParamList!=null && callbackParamList.size()>0) {
                        	// 实际底层调用
                            doCallback(callbackParamList);
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");
    
                }
            });
            triggerCallbackThread.setDaemon(true);
            triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
            triggerCallbackThread.start();
    
    
            // retry
            triggerRetryCallbackThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while(!toStop){
                        try {
                            retryFailCallbackFile();
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
    
                        }
                        try {
                            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                        } catch (InterruptedException e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                    logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
                }
            });
            triggerRetryCallbackThread.setDaemon(true);
            triggerRetryCallbackThread.start();
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    进入 doCallBack() 方法内部

        private void doCallback(List<HandleCallbackParam> callbackParamList){
            // 回调线程执行完 handler 的结果
            boolean callbackRet = false;
            // callback, will retry if error
            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                try {
                    ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
                    if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                        callbackLog(callbackParamList, "
    ----------- xxl-job job callback finish."
    ); callbackRet = true; break; } else { callbackLog(callbackParamList, "
    ----------- xxl-job job callback fail, callbackResult:"
    + callbackResult); } } catch (Exception e) { callbackLog(callbackParamList, "
    ----------- xxl-job job callback error, errorMsg:"
    + e.getMessage()); } } if (!callbackRet) { appendFailCallbackFile(callbackParamList); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
  • 相关阅读:
    COMP3511 Spring 2022
    Resin反序列化链分析
    Windows安装多个版本的Java
    Linux——gdb调试时多进程切换方法(attach/follow-fork-mode)
    基于数据库的全文检索实现
    【Android学习】Android studio环境搭建-解决下载gradle慢&加载mainfest.xml慢的问题
    企业OA系统在低代码平台中要如何开发?
    【Python21天学习挑战赛】-入门必备
    Linux线程
    JS选择排序
  • 原文地址:https://blog.csdn.net/qq_35971258/article/details/126233122