• xxl-job源码解析(技术分享)


    文章目录

    1. 概念

    1. 定时任务的基本概念

    程序为解决一个信息处理任务而预先编制的工作执行方案,这就是定时任务,核心组成如下:

    • 执行器:负责管理应用运行时环境,用于调度定时任务。
    • 任务:任务执行的流程,是一个类,具体的业务。
    • 触发器:按照某种时间规则,执行具体的调度任务。

    2. 定时任务的使用场景

    日常开发中,定时任务主要分为如下两种使用场景:

    时间驱动:

    • 对账单、日结
    • 营销类短信
    • MQ定时检查生产失败的消息

    数据驱动:

    • 异步数据交换
    • 数据同步

    3. 原生定时任务缺陷有哪些缺陷?

    分布式技术应用的时代,原生定时任务的缺陷显得更为突出。结合传统项目与分布式微服务的架构,思考总结如下,欢迎各位大神给与补充:

    • 不支持集群多节点部署,需要自己实现避免任务重复执行的问题。
    • 不支持分片任务,处理有序数据时,多机器分片执行任务处理不同数据。
    • 不支持动态调整,不重启服务的情况下修改任务的参数。
    • 没有报警机制,当任务失败后没有报警机制通知。
    • 不支持生命周期统一管理,如不重启服务情况下关闭、启动任务。
    • 不支持失败重试,出现异常后任务终结,不能根据状态控制任务重新执行。
    • 无法统计任务数据,当任务数据量大的时候,对于任务执行情况无法高效的统计执行情况。

    4. 基于当前 XXL-JOB 我们能做什么?

    • 执行器 HA(分布式):天生支持任务分布式执行,无需自己实现。任务"执行器"支持集群部署,可保证任务执行 HA;
    • 调度中心 HA(中心式):调度中心相当于传统调度任务的触发器,调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心 HA;
      在这里插入图片描述

    2. 系统架构和整理流程

    https://www.xuxueli.com/xxl-job/

    2.1. 设计思想

    • 将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
    • 将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。
    • 因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

    2.2. 架构图

    在这里插入图片描述

    2.3. 执行流程

    在这里插入图片描述

    3. 启动流程

    3.1. 服务器启动

    在这里插入图片描述
    首先找到配置类 XxlJobAdminConfig, 可以发现该类实现 InitializingBean接口,这里直接看 afterPropertiesSet方法即可。

    @Component
    public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
        // ---------------------- XxlJobScheduler ----------------------
    
        private XxlJobScheduler xxlJobScheduler;
    
        @Override
        public void afterPropertiesSet() throws Exception {
            adminConfig = this;
    		 // 初始化xxljob调度器	
            xxlJobScheduler = new XxlJobScheduler();
            xxlJobScheduler.init();
        }
        ...
    }
    
    
    public void init() throws Exception {
            // init i18n
            initI18n();
    
            // admin trigger pool start
            // 初始化触发器线程池
            JobTriggerPoolHelper.toStart();
    
            // admin registry monitor run
            /**
             * 30秒执行一次,维护注册表信息, 判断在线超时时间90s
             * 1. 删除90s未有心跳的执行器节点;jobRegistry
             * 2. 获取所有的注册节点,更新到jobGroup(执行器)
             */
            JobRegistryHelper.getInstance().start();
    
            // admin fail-monitor run 运行事变监视器,主要失败发送邮箱,重试触发器
            JobFailMonitorHelper.getInstance().start();
    
            // admin lose-monitor run ( depend on JobTriggerPoolHelper )
            // 将丢失主机调度日志置为失败
            JobCompleteHelper.getInstance().start();
    
            // admin log report start  统计一些失败成功报表
            JobLogReportHelper.getInstance().start();
    
            // start-schedule  ( depend on JobTriggerPoolHelper )
            /**
             * 调度器执行任务(两个线程 + 线程池执行调度逻辑)
             * 1. 调度线程50s执行一次;查询5s秒内执行的任务,并按照不同逻辑执行
             * 2. 时间轮线程每1秒执行一次;时间轮算法,并向前跨一个时刻;
             */
            JobScheduleHelper.getInstance().start();
    
            logger.info(">>>>>>>>> init xxl-job admin success.");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    3.2. 客户端启动

    在这里插入图片描述
    这里我们看XxlJobSpringExecutor,实现了 SmartInitializingSingleton 接口,实现该接口的当spring容器初始完成,调用afterSingletonsInstantiated()方法。紧接着执行监听器发送监听后,就会遍历所有的Bean然后初始化所有单例非懒加载的bean。实现DisposableBean当实例bean摧毁时调用destroy()方法。

    public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
        private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
    
    
        // start
        @Override
        public void afterSingletonsInstantiated() {
    
            // init JobHandler Repository
            /*initJobHandlerRepository(applicationContext);*/
    
            // init JobHandler Repository (for method) 初始化调度器资源管理器
            /**
             * ConcurrentMap jobHandlerRepository = new ConcurrentHashMap();
             * handle名; Handler->MethodJobHandler(反射 Object、Bean、initMethod、destroyMethod)
             */
            initJobHandlerMethodRepository(applicationContext);
    
            // refresh GlueFactory
            GlueFactory.refreshInstance(1);
    
            // super start 启动
            try {
                super.start();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    再看super.start()

     public void start() throws Exception {
    
            // init logpath 初始化日志目录,用来存储调度日志执行指令到磁盘
            XxlJobFileAppender.initLogPath(logPath);
    
            // init invoker, admin-client 初始化admin链接路径存储集合
            // 在AdminBizClient设置好addressUrl+accessToken
            initAdminBizList(adminAddresses, accessToken);
    
    
            // init JobLogFileCleanThread 清除过期日志(30天)
            // 根据存储路径目录的日志(目录名为时间),根据其目录时间进行删除,1天跑一次,守护线程
            JobLogFileCleanThread.getInstance().start(logRetentionDays);
    
            // init TriggerCallbackThread 回调调度中心任务执行状态
            TriggerCallbackThread.getInstance().start();
    
            // init executor-server  执行内嵌服务
            /**
             * 1. 使用netty开放端口,等待服务端调用
             * 2. 维护心跳时间到服务端(心跳30S)
             * 3. 向服务端申请剔除服务
             */
            initEmbedServer(address, ip, port, appname, accessToken);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    4. 服务注册

    在这里插入图片描述

    1. 任务执行器

    com.xxl.job.core.thread.ExecutorRegistryThread#start

        public void start(final String appname, final String address){
        	...
            registryThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    // registry
                    while (!toStop) {
                        try {
                            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                            // 遍历所有的调度中心
                            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                                try {
                                    ReturnT registryResult = adminBiz.registry(registryParam);
                                    if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                        registryResult = ReturnT.SUCCESS;
                                        logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                        break;
                                    } else {
                                        logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    }
                                } catch (Exception e) {
                                    logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                                }
    
                            }
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
    
                        }
    
                        try {
                        	// 休眠30s,每30s执行一次
                            if (!toStop) {
                                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                            }
                        } catch (InterruptedException e) {
                            if (!toStop) {
                                logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                            }
                        }
                    }
    
                    // registry remove
                    // 线程终止后,主动断开连接
                    try {
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                            try {
                                ReturnT registryResult = adminBiz.registryRemove(registryParam);
                                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                    registryResult = ReturnT.SUCCESS;
                                    ...
                                    break;
                                } else {
                                    logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                }
                            } catch (Exception e) {
                                if (!toStop) {
                                    logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                                }
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    ...
                }
            });
            // 设置为守护线程
            registryThread.setDaemon(true);
            registryThread.setName("xxl-job, executor ExecutorRegistryThread");
            registryThread.start();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    再来看看其RPC调用,采用的是HTTP传输协议,并采用了JSON作为序列化。

     @Override
     public ReturnT registry(RegistryParam registryParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
     }
     // 可以再细看 com.xxl.job.core.util.XxlJobRemotingUtil,postBody采用就是Http协议,GsonTool将对象转成JSON。
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. 调度中心

    再看看调度中心如何接收任务执行器请求的;
    JobApiController就为SpringMVC的Controller,负责接收请求映射

      @RequestMapping("/{uri}")
      @ResponseBody
      @PermissionLimit(limit=false)
      public ReturnT api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {
    
          // valid
          if (!"POST".equalsIgnoreCase(request.getMethod())) {
              return new ReturnT(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
          }
          if (uri==null || uri.trim().length()==0) {
              return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
          }
          if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
                  && XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
                  && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
              return new ReturnT(ReturnT.FAIL_CODE, "The access token is wrong.");
          }
    
          // services mapping
          /**
           * 1. 更新调度日志状态;
           * 2. 当执行器执行成功并且存在有子任务时,触发执行子任务
           */
          if ("callback".equals(uri)) {
              List callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
              return adminBiz.callback(callbackParamList);
          }
          // 服务注册
          else if ("registry".equals(uri)) {
              RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
              return adminBiz.registry(registryParam);
          }
          // 服务下线
          else if ("registryRemove".equals(uri)) {
              RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
              return adminBiz.registryRemove(registryParam);
          } else {
              return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
          }
      }
    
    
    	public ReturnT registry(RegistryParam registryParam) {
    		// valid  校验
    		if (!StringUtils.hasText(registryParam.getRegistryGroup())
    				|| !StringUtils.hasText(registryParam.getRegistryKey())
    				|| !StringUtils.hasText(registryParam.getRegistryValue())) {
    			return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument.");
    		}
    		// async execute 异步注册
    		registryOrRemoveThreadPool.execute(new Runnable() {
    			@Override
    			public void run() { //更新修改时间
    				int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
    				if (ret < 1) {//说明暂未数据,才新增			XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
    					// fresh  空实现
    					freshGroupRegistryInfo(registryParam);
    				}
    			}
    		});
    		return ReturnT.SUCCESS;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    5. 主动触发

    在这里插入图片描述

    1. 调度中心

    触发地址:com.xxl.job.admin.controller.JobInfoController#triggerJob

    	@RequestMapping("/trigger")
    	@ResponseBody
    	//@PermissionLimit(limit = false)
    	public ReturnT triggerJob(int id, String executorParam, String addressList) {
    		// force cover job param 设置默认值
    		if (executorParam == null) {
    			executorParam = "";
    		}
    		// 触发器类型,手动 ,重试次数,'执行器任务分片参数,格式如 1/2',任务参数,机器地址
    		JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
    		return ReturnT.SUCCESS;
    	}
    
    
    
       public void addTrigger(final int jobId,
                               final TriggerTypeEnum triggerType,
                               final int failRetryCount,
                               final String executorShardingParam,
                               final String executorParam,
                               final String addressList) {
    
            // choose thread pool  获取线程池
            ThreadPoolExecutor triggerPool_ = fastTriggerPool;
            // 获取超时次数
            AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
            // 一分钟内超时10次,则采用慢触发器执行
            if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
                triggerPool_ = slowTriggerPool;
            }
    
            // trigger
            triggerPool_.execute(new Runnable() {
                @Override
                public void run() {
    
                    long start = System.currentTimeMillis();
    
                    try {
                        // do trigger // 执行触发器
                        XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    } finally {
    
                        // check timeout-count-map  更新成为下一分钟
                        long minTim_now = System.currentTimeMillis()/60000;
                        if (minTim != minTim_now) {
                            minTim = minTim_now; // 当达到下一分钟则清除超时任务
                            jobTimeoutCountMap.clear();
                        }
    
                        // incr timeout-count-map
                        long cost = System.currentTimeMillis()-start;
                        if (cost > 500) {       // ob-timeout threshold 500ms
                            // 执行时间超过500ms,则记录执行次数
                            AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                            if (timeoutCount != null) {
                                timeoutCount.incrementAndGet();
                            }
                        }
    
                    }
    
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    注意当触发器在一分钟内超时10次,则采用慢触发器执行

     private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
    
            // param 获取阻塞处理策略
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
            // 获取路由策略,默认first
            ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
            String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
    
            // 1、save log-id  保存执行日志
            XxlJobLog jobLog = new XxlJobLog();
            jobLog.setJobGroup(jobInfo.getJobGroup());
            jobLog.setJobId(jobInfo.getId());
            jobLog.setTriggerTime(new Date());
            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
            logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
    
            // 2、init trigger-param
            TriggerParam triggerParam = new TriggerParam();
            triggerParam.setJobId(jobInfo.getId());
            triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
            triggerParam.setExecutorParams(jobInfo.getExecutorParam());
            triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
            triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
            triggerParam.setLogId(jobLog.getId());
            triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
            triggerParam.setGlueType(jobInfo.getGlueType());
            triggerParam.setGlueSource(jobInfo.getGlueSource());
            triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
            triggerParam.setBroadcastIndex(index);
            triggerParam.setBroadcastTotal(total);
    
            // 3、init address 获取触发器执行地址
            String address = null;
            ReturnT routeAddressResult = null;
            if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
                // 路由策略为分配广播
                if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                    if (index < group.getRegistryList().size()) {
                        address = group.getRegistryList().get(index);
                    } else {
                        address = group.getRegistryList().get(0);
                    }
                } else {
                    // 根据设置的路由策略,执行路由器,获取返回结果,这里用到了策略模式
                    /**
                     *  1. ExecutorRouteFirst (第一个)固定选择第一个机器
                     *  2. ExecutorRouteLast (最后一个)
                     *  3. ExecutorRouteRound (轮询), 通过Map记录任务的执行次数进行取模
                     *  4. ExecutorRouteRandom (随机)
                     *  5. ExecutorRouteConsistentHash (一致性hash),每个jobId都会hash到指定的机器上,每次都会构建虚拟节点
                     *  6. ExecutorRouteLFU (最不频繁使用,1天的使用频繁), 通过Map存储每个jobId在每个地址的使用次数,拿到最少使用地址;
                     *  7. ExecutorRouteLRU (最近最久未使用), 通过LinkedHashMap accessOrder进行实现,其内部通过双向链表实现
                     *  8. ExecutorRouteFailover(故障转移) 通过顺序遍历执行器地址,进行心跳检查
                     *  9. ExecutorRouteBusyover(忙碌转移) 照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
                     */
                    routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                    if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                        address = routeAddressResult.getContent();
                    }
                }
            } else {
                routeAddressResult = new ReturnT(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
            }
    
            // 4、trigger remote executor
            ReturnT triggerResult = null;
            if (address != null) {
            	// 已经获取到任务执行器地址,通过HTTP进行调度
                triggerResult = runExecutor(triggerParam, address);
            } else {
                triggerResult = new ReturnT(ReturnT.FAIL_CODE, null);
            }
    
            // 5、collection trigger info
            StringBuffer triggerMsgSb = new StringBuffer();
            triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
            triggerMsgSb.append("
    ").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp()); triggerMsgSb.append("
    ").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":") .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); triggerMsgSb.append("
    ").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); triggerMsgSb.append("
    ").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()); if (shardingParam != null) { triggerMsgSb.append("("+shardingParam+")"); } triggerMsgSb.append("
    ").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); triggerMsgSb.append("
    ").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); triggerMsgSb.append("
    ").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); triggerMsgSb.append("

    >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<<
    ") .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"

    ":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():""); // 6、save log trigger-info jobLog.setExecutorAddress(address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104

    2. 任务执行器

    com.xxl.job.core.server.EmbedServer#process()

     private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
                // valid
                if (HttpMethod.POST != httpMethod) {
                    return new ReturnT(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
                }
                if (uri == null || uri.trim().length() == 0) {
                    return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
                }
                if (accessToken != null
                        && accessToken.trim().length() > 0
                        && !accessToken.equals(accessTokenReq)) {
                    return new ReturnT(ReturnT.FAIL_CODE, "The access token is wrong.");
                }
    
                // services mapping
                try {
                    switch (uri) {
                        case "/beat":
                            return executorBiz.beat();
                        case "/idleBeat":
                            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                            return executorBiz.idleBeat(idleBeatParam);
                         // 触发执行器
                        case "/run":
                            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                            return executorBiz.run(triggerParam);
                        case "/kill":
                            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                            return executorBiz.kill(killParam);
                        case "/log":
                            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                            return executorBiz.log(logParam);
                        default:
                            return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    JobThread执行调度逻辑
    com.xxl.job.core.thread.JobThread#start()

     @Override
    	public void run() {
    
        	// init
        	try {
        		// 执行初始化方法(初始化连接池等信息,一个job只能执行一次)
    			handler.init();
    		} catch (Throwable e) {
        		logger.error(e.getMessage(), e);
    		}
    
    		// execute
    		while(!toStop){
    			running = false;
    			idleTimes++;
    
                TriggerParam triggerParam = null;
                try {
    				// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
    				// 从队列中获取调度日志
    				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
    				if (triggerParam!=null) {
    					running = true;
    					idleTimes = 0;
    					triggerLogIdSet.remove(triggerParam.getLogId());
    
    					// log filename, like "logPath/yyyy-MM-dd/9999.log" 写入log文件
    					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
    					XxlJobContext xxlJobContext = new XxlJobContext(
    							triggerParam.getJobId(),
    							triggerParam.getExecutorParams(),
    							logFileName,
    							triggerParam.getBroadcastIndex(),
    							triggerParam.getBroadcastTotal());
    
    					// init job context
    					XxlJobContext.setXxlJobContext(xxlJobContext);
    
    					// execute
    					XxlJobHelper.log("
    ----------- xxl-job job execute start -----------
    ----------- Param:" + xxlJobContext.getJobParam()); // 设置了超时就异步线程处理(FutureTask设置超时时间) if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { FutureTask futureTask = new FutureTask(new Callable() { @Override public Boolean call() throws Exception { // init job context XxlJobContext.setXxlJobContext(xxlJobContext); handler.execute(); return true; } }); futureThread = new Thread(futureTask); futureThread.start(); Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { XxlJobHelper.log("
    ----------- xxl-job job execute timeout"); XxlJobHelper.log(e); // handle result XxlJobHelper.handleTimeout("job execute timeout "); } finally { futureThread.interrupt(); } } else { // just execute // 反射,invoke handler; 没设置超时时间,则立刻执行触发器 handler.execute(); } // 记录执行日志 // valid execute handle data if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) { XxlJobHelper.handleFail("job handle result lost."); } else { String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg(); tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000) ?tempHandleMsg.substring(0, 50000).concat("...") :tempHandleMsg; XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg); } XxlJobHelper.log("
    ----------- xxl-job job execute end(finish) -----------
    ----------- Result: handleCode=" + XxlJobContext.getXxlJobContext().getHandleCode() + ", handleMsg = " + XxlJobContext.getXxlJobContext().getHandleMsg() ); } else { // 空闲执行次数超过30次,且队列没任务,则删除并终止线程 if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } } /** * 当任务调度有异常时,捕捉异常,通过XxlJobHelper.handleFail(errorMsg)设置失败; * 所以当JobHandler处理业务逻辑时,记得抛出异常 */ catch (Throwable e) { if (toStop) { XxlJobHelper.log("
    ----------- JobThread toStop, stopReason:" + stopReason); } // handle result StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String errorMsg = stringWriter.toString(); XxlJobHelper.handleFail(errorMsg); XxlJobHelper.log("
    ----------- JobThread Exception:" + errorMsg + "
    ----------- xxl-job job execute end(error) -----------"); } finally { if(triggerParam != null) { // callback handler info // 添加回调队列 if (!toStop) { // commonm TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.getXxlJobContext().getHandleCode(), XxlJobContext.getXxlJobContext().getHandleMsg() ) ); } else { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_CODE_FAIL, stopReason + " [job running, killed]" ) ); } } } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144

    6. 自动触发

    在这里插入图片描述
    在这里插入图片描述

    1. 自动触发逻辑

    com.xxl.job.admin.core.thread.JobScheduleHelper#start()
    scheduleThread定时线程

    scheduleThread = new Thread(new Runnable() {
               @Override
               public void run() {
    
                   try {
                       // 保证5秒执行一次
                       TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                   } catch (InterruptedException e) {
                       if (!scheduleThreadToStop) {
                           logger.error(e.getMessage(), e);
                       }
                   }
                   logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
    
                   // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                   // 每秒处理20个任务,200个线程
                   int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
    
                   while (!scheduleThreadToStop) {
    
                       // Scan Job
                       long start = System.currentTimeMillis();
    
                       Connection conn = null;
                       Boolean connAutoCommit = null;
                       PreparedStatement preparedStatement = null;
    
                       boolean preReadSuc = true;
                       try {
    
                           conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                           connAutoCommit = conn.getAutoCommit();
                           conn.setAutoCommit(false);
    
                           //获取任务调度锁表内数据信息,加写锁(分布式锁)
                           preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                           preparedStatement.execute();
    
                           // tx start
    
                           // 1、pre read
                           long nowTime = System.currentTimeMillis();
                           //获取当前时间后5秒,同时最多负载的分页数
                           List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                           if (scheduleList!=null && scheduleList.size()>0) {
                               // 2、push time-ring
                               for (XxlJobInfo jobInfo: scheduleList) {
    
                                   // time-ring jump(时间轮)
                                   if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                       // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                       logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
    
                                       // 1、misfire match
                                       /**
                                        * 调度过期策略:
                                        *   - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
                                        *   - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
                                        */
                                       MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                       if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                           // FIRE_ONCE_NOW 》 trigger
                                           JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                           logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                       }
    
                                       // 2、fresh next 更新下次执行时间
                                       refreshNextValidTime(jobInfo, new Date());
    
                                   } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                       // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
    
                                       // 1、trigger
                                       // 执行触发器
                                       JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                       logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
    
                                       // 2、fresh next 更新下次执行时间
                                       refreshNextValidTime(jobInfo, new Date());
    
                                       // next-trigger-time in 5s, pre-read again 下次触发时间在当前时间往后5秒范围内
                                       if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
    
                                           // 1、make ring second 获取下次执行秒
                                           int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                           // 2、push time ring 添加到时间轮
                                           pushTimeRing(ringSecond, jobInfo.getId());
    
                                           // 3、fresh next 更新下次执行时间
                                           refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                       }
    
                                   } else {
                                       // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
    
                                       // 未来五秒以内执行的所有任务添加到ringData
                                       // 1、make ring second
                                       int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                       // 2、push time ring 添加到时间轮
                                       pushTimeRing(ringSecond, jobInfo.getId());
    
                                       // 3、fresh next 更新下次执行时间
                                       refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                   }
    
                               }
    
                               // 3、update trigger info 更新执行时间和上次执行时间到数据库
                               for (XxlJobInfo jobInfo: scheduleList) {
                                   XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                               }
    
                           } else {
                               preReadSuc = false;
                           }
    
                           // tx stop
    
    
                       } catch (Exception e) {
                           if (!scheduleThreadToStop) {
                               logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                           }
                       } finally {
    
                           // commit
                           if (conn != null) {
                               try {
                                   conn.commit();
                               } catch (SQLException e) {
                                   if (!scheduleThreadToStop) {
                                       logger.error(e.getMessage(), e);
                                   }
                               }
                               try {
                                   conn.setAutoCommit(connAutoCommit);
                               } catch (SQLException e) {
                                   if (!scheduleThreadToStop) {
                                       logger.error(e.getMessage(), e);
                                   }
                               }
                               try {
                                   conn.close();
                               } catch (SQLException e) {
                                   if (!scheduleThreadToStop) {
                                       logger.error(e.getMessage(), e);
                                   }
                               }
                           }
    
                           // close PreparedStatement
                           if (null != preparedStatement) {
                               try {
                                   preparedStatement.close();
                               } catch (SQLException e) {
                                   if (!scheduleThreadToStop) {
                                       logger.error(e.getMessage(), e);
                                   }
                               }
                           }
                       }
                       long cost = System.currentTimeMillis()-start;
    
    
                       // Wait seconds, align second
                       if (cost < 1000) {  // scan-overtime, not wait
                           try {
                               // pre-read period: success > scan each second; fail > skip this period;
                               TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                           } catch (InterruptedException e) {
                               if (!scheduleThreadToStop) {
                                   logger.error(e.getMessage(), e);
                               }
                           }
                       }
    
                   }
    
                   logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
               }
           });
           scheduleThread.setDaemon(true);
           scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
           scheduleThread.start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188

    2. 时间轮线程

    com.xxl.job.admin.core.thread.JobScheduleHelper#start()
    ringThread时间轮线程

    // ring thread(时间轮线程)
           ringThread = new Thread(new Runnable() {
               @Override
               public void run() {
    
                   while (!ringThreadToStop) {
    
                       // align second
                       try {
                           TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                       } catch (InterruptedException e) {
                           if (!ringThreadToStop) {
                               logger.error(e.getMessage(), e);
                           }
                       }
    
                       try {
                           // second data
                           List ringItemData = new ArrayList<>();
                           int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
                           // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                           for (int i = 0; i < 2; i++) {
                               List tmpData = ringData.remove( (nowSecond+60-i)%60 );
                               if (tmpData != null) {
                                   ringItemData.addAll(tmpData);
                               }
                           }
    
                           // ring trigger
                           logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                           if (ringItemData.size() > 0) {
                               // do trigger
                               for (int jobId: ringItemData) {
                                   // do trigger
                                   // 执行触发器;逻辑就跟主动触发是一致的了。
                                   JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                               }
                               // clear
                               ringItemData.clear();
                           }
                       } catch (Exception e) {
                           if (!ringThreadToStop) {
                               logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                           }
                       }
                   }
                   logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
               }
           });
           ringThread.setDaemon(true);
           ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
           ringThread.start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    7. 设计亮点

    1. 路由策略

    1. 路由策略使用了 策略设计模式,根据选择的策略去获取对应的调度中心地址;
    2. 支持了首个、最后、随机、轮询、一致性hash、LRU、LFU、故障转移、忙碌转移、分配广播;

    2. 注册中心

    1. 续期线程每30秒对任务执行器进行续期
    2. 过期线程每30秒把90未续期的任务执行器移除;

    3. 全异步化 & 轻量级

    1. 调度中心

    1. 调度任务:线程定时获取要执行的任务,并交给调度线程池异步调用;
    2. 心跳: 新开线程清理过期的任务执行器;
    3. 失败任务:线程重试并告警;

    2. 任务执行器

    1. 执行任务: 每个job任务都有各自jobThread从队列中获取;
    2. 回调: 有两个线程 回调和重试线程,负责向xxlAdmin回调任务执行状态;
    3. 心跳: 新开线程每隔30s进行续期

    3. 异步化

    1. 异步调用:交给线程池进行异步调用任务给任务执行器
    2. 异步执行:任务执行器每个job都有各自的线程,并异步回调给xxlAdmin;

    4. 轻量级

    1. 架构上非常的轻,基本通过Mysql实现了分布式锁、注册中心、任务调度等功能,只需依赖Mysql + Java;
    2. 在全异步化的基础上,单个JOB一次运行平均耗时基本在 “10ms” 之内(基本为一次请求的网络开销),可以保证使用有限的线程支撑大量的JOB并发运行;
    3. 官方文档表示,在理论的调度中心下,单机能支撑5000任务并发;
    4. 如何提高性能:1. 机器上;2. 不同业务进行区分; 3. 修改源码(不同的xxl-job集群处理不同的job)

    4. 时间轮算法

    1. 是什么

    1. 时间轮方案将现实生活中的时钟概念引入到软件设计中,主要思路是定义一个时钟周期(比如时钟的12小时)和步长(比如时钟的一秒走一次),当指针每走一步的时候,会获取当前时钟刻度上挂载的任务并执行。
    2. 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度
    3. 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)
    4. 每个槽对应一个环形链表存储该时间应该被执行的任务
    5. 需要一个线程去驱动指针运转,获取到期任务

    2. xxl-job实现

    1. xxl-job的时间环只会存储之后5s内执行的任务,使用一个Map进行存储;
    2. Map的key为执行时间的秒数%60,value为这个秒执行的jobIdList;
    3. 时间轮线程每1秒执行一次,从时间轮从获取到jobIdList,最后进行调度任务;
  • 相关阅读:
    美创科技获通信网络安全服务能力评定(应急响应一级)认证!
    Java 方法中循环调用具有事务的方法
    Java后端开发(六)-- 二维码的生成
    [python刷题模板] 数位DP
    mybatis的mapper包导入失败
    C语言中文网 - Shell脚本 - 7
    c语言按位与,按位或,按位异或,按位取反
    全局视角看技术-Java多线程演进史
    NoSQL数据库之Redis2
    软件设计师2011上午题基础知识(易错整理)
  • 原文地址:https://blog.csdn.net/m0_66557301/article/details/126042645