• 分布式任务系统xxl-job


    概述

    XXL-JOB通过一个中心式的调度平台,调度多个执行器执行任务,调度中心通过 DB 锁保证集群分布式调度的一致性,这样扩展执行器会增大 DB 的压力,然而大部分公司的任务数,执行器并不多;XXL-JOB提供非常好用的监控、告警功能。不同于 ElasticJob,XXL-JOB在使用时依赖MySQL,而不需要ZK。

    GitHub官网,文档非常齐全,特性:上手简单、轻量级、易扩展、动态生效、调度中心HA、执行器HA、弹性扩容缩容、路由策略、故障转移、阻塞处理策略、任务超时控制、任务失败重试、任务失败告警、分片广播任务、动态分片、事件触发等。

    注:ElasticJob出自当当,设计初衷是为了面对高并发以及复杂的业务,即使是在业务量大,服务器多时也能做好任务调度,尽可能的利用服务器的资源。ElasticJob是无中心化的,如果主服务器挂掉,会自动通过ZK的选举机制选举出新的主服务器。具有较好的扩展性和可用性。

    架构

    取自官网:
    在这里插入图片描述

    数据表

    tables_xxl_job.sql,执行此SQL文件,生成8张表

    安装

    简单,省略

    实战

    配置文件:

    # 执行器配置
    # 配置调度中心地址
    aaa.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
    # 执行器和调度中心之间的通信令牌,如果没有配置,表示关闭通信令牌的校验。在 xxl-job-admin 的配置文件中,有一个一模一样的配置项,两边都配置,就会进行校验。
    aaa.job.accessToken=
    # 配置执行器的名字
    aaa.job.name=xxl-job-demo
    # 执行器端口,默认9999
    aaa.job.port=9999
    
    # 任务配置
    aaa.xxljob.jobGroup=444
    aaa.xxljob.jobAuthor=johnny
    aaa.xxljob.jobAlarmMail=johnny@qq.com
    aaa.xxljob.jobHandler=autoJobHandler
    
    aaa.subscribeJob.jobGroup=555
    aaa.subscribeJob.author=johnny
    aaa.subscribeJob.alarmEmail=johnny@qq.com
    aaa.subscribeJob.executorHandler=subscribeJobHandler
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    配置类:

    @Configuration
    public class XxlJobConfig {
        @Value("${aaa.job.name}")
        private String appName;
        @Value("${aaa.job.admin.addresses}")
        private String addresses;
        @Value("${aaa.job.accessToken}")
        private String accessToken;
        @Value("${aaa.job.port}")
        private Integer port;
    
        @Bean(initMethod = "start", destroyMethod = "destroy")
        public XxlJobExecutor xxlJobExecutor() {
            XxlJobExecutor xxlJobExecutor = new XxlJobExecutor();
            xxlJobExecutor.setAdminAddresses(addresses);
            xxlJobExecutor.setAppName(appName);
            xxlJobExecutor.setPort(port);
            return xxlJobExecutor;
        }
    
        /**
         * token是执行器级别
         */
        @Bean
        public JobRestApi jobRestApi() {
            return new JobRestApi(accessToken);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    工具类

    基于XXL-JOB提供的API封装后的工具类:

    @Service
    public class JobService {
        private static final Integer CODE = 200;
    	/**
         * 一个执行器下面可以有多个任务组
         */
        @Value("${aaa.xxljob.jobGroup}")
        private Integer jobGroup;
        @Value("${aaa.xxljob.jobuthor}")
        private String jobAuthor;
        @Value("${aaa.xxljob.jobalarmemail}")
        private String jobAlarmEmail;
        @Value("${aaa.xxljob.jobHandler}")
        private String jobHandler;
    
        @Value("${aaa.subscribeJob.jobGroup}")
        private Integer subJobGroup;
        @Value("${aaa.subscribeJob.author}")
        private String subJobAuthor;
        @Value("${aaa.subscribeJob.alarmEmail}")
        private String subJobAlarmEmail;
        @Value("${aaa.subscribeJob.executorHandler}")
        private String subJobHandler;
    
        @Resource
        private JobRestApi jobRestApi;
    
        /**
         * xxl创建任务
         */
        public Integer createJob(String cron, String param, String des) throws Exception {
            JobEntity jobEntity = getDefaultJobEntity();
            return createJob(cron, param, des, jobEntity);
        }
    
        public Integer createSubscribeJob(String cron, String param, String des) throws Exception {
            JobEntity jobEntity = getJobEntity(subJobHandler, subJobGroup, subJobAuthor, subJobAlarmEmail);
            return createJob(cron, param, des, jobEntity);
        }
    
        public void updateSubscribeJob(Integer id, String cron, String param, String des) throws Exception {
            if (id == null) {
                return;
            }
            JobEntity jobEntity = getJobEntity(subJobHandler, subJobGroup, subJobAuthor, subJobAlarmEmail);
            jobEntity.setId(id);
            jobEntity.setJobCron(cron);
            jobEntity.setExecutorParam(param);
            jobEntity.setJobDesc(des);
            ReturnT<JobEntity> returnT = jobRestApi.updateJobInfo(jobEntity);
            if (returnT.getCode() != CODE) {
                throw new Exception(returnT.getMsg());
            }
        }
    
        /**
         * 更新Job
         */
        public void updateJob(Integer id, String cron, String params, String describe) throws Exception {
            if (id == null) {
                return;
            }
            JobEntity jobEntity = getDefaultJobEntity();
            jobEntity.setId(id);
            if (StringUtil.isNotNullOrEmpty(cron)) {
                jobEntity.setJobCron(cron);
            }
            jobEntity.setExecutorParam(params);
            jobEntity.setJobDesc(describe);
            ReturnT<JobEntity> returnT = jobRestApi.updateJobInfo(jobEntity);
            if (returnT.getCode() != CODE) {
                throw new Exception(returnT.getMsg());
            }
        }
    
        /**
         * 开启Job的调度
         */
        public void enableJob(Integer id) throws Exception {
            if (id == null) {
                return;
            }
            ReturnT<String> returnT = jobRestApi.enableJob(id);
            if (returnT.getCode() != CODE) {
                throw new Exception(returnT.getMsg());
            }
        }
    
        /**
         * 暂停Job的调度
         */
        public void disableJob(Integer id) throws Exception {
            if (id == null) {
                return;
            }
            ReturnT<String> returnT = jobRestApi.disableJob(id);
            if (returnT.getCode() != CODE) {
                throw new Exception(returnT.getMsg());
            }
        }
    
        /**
         * 删除并下线调度中的Job
         */
        public void removeJob(Integer id) throws Exception {
            if (id == null) {
                return;
            }
            ReturnT<String> returnT = jobRestApi.removeJobInfo(id);
            if (returnT.getCode() != CODE) {
                throw new Exception(returnT.getMsg());
            }
        }
    
        /**
         * 校验xxl-job
         *
         * @param id xxl平台的jobId, 同时也是我们这边的数据表里面的xxl_job_id字段
         * @return true if xxl-job exists
         */
        public Boolean checkJobById(Integer id) {
            if (id == null) {
                return false;
            }
            ReturnT<JobEntity> returnT = jobRestApi.getJobInfo(id);
            return returnT != null && returnT.getCode() == CODE;
        }
    
        private Integer createJob(String cron, String param, String des, JobEntity jobEntity) throws Exception {
            // 幂等键:同一个执行器下,该字段(任务描述)需要唯一,否则无法创建
            jobEntity.setJobDesc(des);
            jobEntity.setJobCron(cron);
            jobEntity.setExecutorParam(param);
            ReturnT<JobEntity> returnT = jobRestApi.addJobInfo(jobEntity);
            if (returnT == null || returnT.getContent() == null || returnT.getCode() != CODE) {
                throw new Exception(returnT != null ? returnT.getMsg() : "");
            } else {
                jobRestApi.enableJob(returnT.getContent().getId());
                return returnT.getContent().getId();
            }
        }
    
        private JobEntity getDefaultJobEntity() {
            JobEntity jobEntity = new JobEntity();
            jobEntity.setGlueType(GlueTypeEnum.BEAN.toString());
            //轮询执行
            jobEntity.setExecutorRouteStrategy("ROUND");
            //失败告警
            jobEntity.setExecutorFailStrategy("FAIL_ALARM");
            //阻塞处理策略 单机串行
            jobEntity.setExecutorBlockStrategy("SERIAL_EXECUTION");
            jobEntity.setExecutorHandler(jobHandler);
            jobEntity.setJobGroup(jobGroup);
            jobEntity.setAuthor(jobAuthor);
            jobEntity.setAlarmEmail(jobAlarmEmail);
            return jobEntity;
        }
    
        private JobEntity getJobEntity(String jobHandler, int jobGroup, String jobAuthor, String jobAlarmEmail) {
            JobEntity jobEntity = new JobEntity();
            jobEntity.setGlueType(GlueTypeEnum.BEAN.toString());
            // 轮询执行
            jobEntity.setExecutorRouteStrategy("ROUND");
            // 失败告警
            jobEntity.setExecutorFailStrategy("FAIL_ALARM");
            // 阻塞处理策略 单机串行
            jobEntity.setExecutorBlockStrategy("SERIAL_EXECUTION");
            jobEntity.setExecutorHandler(jobHandler);
            jobEntity.setJobGroup(jobGroup);
            jobEntity.setAuthor(jobAuthor);
            jobEntity.setAlarmEmail(jobAlarmEmail);
            return jobEntity;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174

    执行器

    项目启动成功后,打开任务调度中心管理平台,找到执行器管理,点击新增。如果是自动注册的话,可以不用填机器地址。自动注册的好处自不必说,client应用发布后,自动注册IP到XXL-JOB,尤其是容器云这种发布方式,每次应用发布后,IP会发生变更,自动将新的IP注册并维护到机器地址中,XXL-JOB调度任务时,检测此IP(会有多个)的健康状态,能联通才能触发任务在这些IP上的执行。
    在这里插入图片描述
    但是不是所有的场景都适用于自动注册。假设这样一款数据产品平台A,支持用户创建各种类型的调度任务,用户可以在此A平台上面创建成千上万个定时任务,任务主要以SQL形式提交。因此,平台A一方面需要满足用户查看他们提交的各种任务信息和执行结果,同时也需要在一天内跑完成千上万个SQL定时任务。假使SQL的查询量和执行时间不可控,且没有教好的校验拦截机制。任务大概率极易把平台A的CPU或内存打满,影响用户使用平台A,即查看任务信息和执行结果等。故而,我们需要把平台A做一下功能拆分,B模块可以满足用户查看他们提交的各种任务信息和执行结果,C模块需要在一天内跑完成千上万个SQL定时任务。B模块和C模块相当于之前的平台A,用户的大批量任务的提交和执行拆开。B模块和C模块,相当于两个Java应用,B模块(应用)需要调用C模块(应用)提供的RPC/RESTful接口,两个应用都需要配置上面提到的执行器和任务配置信息,但是因为B模块只有任务提交的代码逻辑,没有任务执行的代码逻辑,不具备任务执行功能,XXL-JOB不能把任务的调度下发到B应用的多个IP,只能下发到C应用的多个IP。因此,如果每次应用发布后,IP不变更就还好,如果是容器云发布方式并且每次发布后IP会发生变更,则只能使用手动录入方式,在每次应用发布后,手动修改IP,否则XXL-JOB调度失败,找不到下游任务执行的节点IP。

    暂时没有想到更好的解决方案。

    定时任务开发

    GLUE模式(Java)

    任务以源码方式维护在调度中心,支持通过Web IDE(网页)在线更新,实时编译和生效。
    开发流程:

    1. 调度中心,任务管理,新增任务,运行模式选GLUE模式(Java):
      在这里插入图片描述
      如上图,可发现此时JobHandler是置灰状态,填写信息后,保存。任务管理页,点击刚才新增的任务的右侧操作中的GLUE按钮(其余几个按钮的功能见名知义)
      在这里插入图片描述
      前往GLUE任务的Web IDE界面,在该界面支持对任务代码进行开发(也可在IDE中开发完成后,复制粘贴到此页面中)
      在这里插入图片描述
      代码一般是放在Git仓库里面,有版本的概念,可以看到历史版本和两个版本之间的差别。此Web IDE也提供简单的版本回溯功能:
      在这里插入图片描述
      支持30个版本的版本回溯,在GLUE任务的Web IDE界面,选择右上角下拉框版本回溯,会列出该GLUE 任务代码的更新历史,选择相应版本即可显示该版本代码,保存后GLUE代码即回退到对应的历史版本。

    此方式使用较少。

    BEAN模式(类形式)

    支持基于类的开发方式,一个任务对应一个Java类。
    优点:不限制项目环境,兼容性好。即使是无框架项目,如main方法直接启动的项目也可以提供支持。
    缺点:

    1. 一个任务需要占用一个Java类;
    2. 不支持自动扫描任务并注入到执行器容器,需要手动注入。

    开发步骤:

    1. 开发一个继承自com.xxl.job.core.handler.IJobHandler的JobHandler类,实现execute方法:public abstract ReturnT execute(String... var1) throws Exception;
    @Service
    @JobHander("deleteDataJobHandler")
    public class DeleteBoardDataJobHandler extends IJobHandler {
    	@Override
        public ReturnT<String> execute(String... strings) {
        	// biz code
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. 在调度中心新建调度任务,参考上面的截图,运行模式选BEAN模式,JobHandler必填,填写上面的@JobHander注解里面写明的handler。

    BEAN模式(方法形式)

    基于方法的开发方式,一个任务对应一个方法。底层会生成JobHandler代理,和基于类的方式一样,任务也会以JobHandler的形式存在于执行器任务容器中。

    开发步骤:

    1. 开发Job方法
      @XxlJob注解还有initdestroy属性,可分别用来配置初始化和销毁的方法,XxlJobHelper.getJobParam()可用来获取任务参数,可通过XxlJobHelper.log打印执行日志。默认任务结果为成功状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 XxlJobHelper.handleFail/handleSuccess自主设置任务结果。
    2. 任务调度中心,新建调度任务
    @Component
    public class MyJob {
        @XxlJob("demoJobHandler")
        public ReturnT<String> demoJobHandler() throws Exception {
            String param = XxlJobHelper.getJobParam();
            XxlJobHelper.log("XXL-JOB, Hello World:{}",param);
            return ReturnT.SUCCESS;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    BEAN模式(API形式)

    在实际的企业级业务平台开发中,会有这样一种场景:提供一个平台,用户可以填写需要执行的SQL,设置cron定时调度表达式,然后保存。即,任务是由用户提交的,可能多达几百上千个。此时,总不能一个个方法去开发。基于API,可以实现如下的批量任务开发。

    表设计:

    create table auto_job (
        job_id          bigint auto_increment comment '主键' primary key,
        job_name        varchar(100)         not null comment '任务名',
        exec_status     tinyint(1) default 0 not null comment '0-未执行,1-成功,2-失败,3-执行中',
        last_exec_time  timestamp            null comment '最后执行时间',
        is_active       tinyint(1) default 1 not null comment '是否有效',
        cron_exp        varchar(500)         not null comment '定时任务表达式',
        cron_exp_status tinyint(1) default 0 not null comment '是否开启(有效),0-无效,1-有效',
        xxl_job_id      varchar(50)          not null comment 'xxl-job的id'
    ) comment '数据推送';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    保存用户提交的任务:

    // 省略用户提交参数校验:如cron表达式是否合法,jobName是否重复等
    // 省略autoJobMapper.insert(job);
    // 调用上面提到的工具类在xxl-job里新增任务并得到xxlJobId
    Integer id = jobService.createJob(job.getCronExp(), job.getId().toString(), "数据推送-" + job.getId());
    // 回写xxl-job-id
    job.setXxlJobId(id);
    // 更新
    autoJobMapper.updateByPrimaryKeySelective(job);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    最后开发实际的job处理逻辑:

    @Slf4j
    @Service
    @JobHander(value = "autoJobHandler")
    public class AutoJobHandler extends IJobHandler {
        @Resource
        private AutoJobMapper autoJobMapper;
    
        @Override
        public ReturnT<String> execute(String... params) {
            long id = 0L;
            try {
                id = Long.parseLong(params[0]);
                AutoJob job = autoJobMapper.selectByPrimaryKey(id);
                if (job == null) {
                    logger.warn("autoJobHandler execute job id:{}, {}", id, "无效ID");
                    return ReturnT.SUCCESS;
                }
                if (job.getCronExpStatus() == 0) {
                    // 两边状态不一致,xxl-job触发调度,但任务不执行
                    logger.warn("autoJobHandler execute job id:{}, {}", id, "任务暂停");
                    return ReturnT.SUCCESS;
                }
                // 省略若干 biz code
            } catch (Exception e) {
                logger.error("iviewAutoJobHandler execute error id:{}, error info:{}", id, e);
                return ReturnT.FAIL;
            }
            return ReturnT.SUCCESS;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    平台侧用户创建的任务
    在这里插入图片描述
    管理员可见的xxl-job管理平台,其中执行器根据登录用户的角色和权限,会有多个。任务的JobKey = jobGroup + jobId,xxl-job的jobId就是前面数据表里面定义的xxl-job-id,xxl-job有任务描述的幂等性校验,同一个执行器下,任务描述需要唯一,否则无法创建。根据任务描述可以搜索任务。考虑到,业务表的自增主键肯定也是唯一的,也为了可以方便快速搜索定位到出错的任务,任务描述,可定义为任务类型 + 任务主键(auto_job表的主键job_id)。auto_job表的主键job_id和xxl-job那边的jobId,即auto_job表的字段xxl-job-id一一对应,绑定在一起。两个jobId,不要搞混淆。另外,提到任务类型,是因为业务平台不止一种任务类型。
    在这里插入图片描述

    批量开启暂停

    xxl-job核心功能在于调度,管理界面上并未提供批量开启、暂停、乃至删除任务的功能。需基于xxl-job提供的API进行二次开发。

    @RequestMapping("batchOperateJob")
    public String batchOperateJob(@RequestBody JSONObject req) {
        return demoService.batchOperateJob(req);
    }
    
    • 1
    • 2
    • 3
    • 4
    private static final String START = "start";
    private static final String STOP = "stop";
    
    @Resource
    private JobService jobService;
    
    public String batchOperateJob(JSONObject req) {
        String ids = req.getString("ids");
        if (StringUtils.isBlank(ids)) {
            return JSONObject.toJSONString(ServiceUtil.returnSuccess("请指定ids"));
        }
        String ope = req.getString("ope");
        if (StringUtils.isBlank(ope) || (!ope.equals(START) && !ope.equals(STOP))) {
            return JSONObject.toJSONString(ServiceUtil.returnSuccess("请指定正确的ope"));
        }
        try {
            List<Long> idList = Arrays.stream(ids.split(",")).map(Long::parseLong).collect(Collectors.toList());
            List<AutoJob> autoList = autoJobMapper.getXxlJobList(idList);
            if (CollectionUtils.isEmpty(autoList)) {
                return JSONObject.toJSONString(ServiceUtil.returnSuccess(MSG));
            }
            for (AutoJob item : autoList) {
                if (ope.equals(START)) {
                    jobService.enableJob(item.getXxlJobId());
                } else if (ope.equals(STOP)) {
                    jobService.disableJob(item.getXxlJobId());
                }
            }
        } catch (Exception e) {
            logger.error("batchOperateJob failed: ", e);
            return JSONObject.toJSONString(ServiceUtil.returnError(e.getMessage()));
        }
        return JSONObject.toJSONString(ServiceUtil.returnSuccess());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    批量修改负责人和告警人

    
    
    • 1

      报错

      请通知管理员对执行器ID进行sdk相关配置

      通过BEAN模式+API形式,即代码在pre预发环境创建任务报错。需要XXL-JOB负责人操作一下。

      token:[75AC738098833A382A735670EBF89158],没有groupId:[4]的权限

      需要XXL-JOB负责人操作一下。

      RuntimeException: Client-error:Read timed out

      在这里插入图片描述
      在这里插入图片描述

      参考

    • 相关阅读:
      Apache ShardingSphere(二) 基本使用
      怎样理解z-index层叠等级属性?
      Java笔记七(封装,继承与多态)
      重制版 day 18 CSV和excel文件操作
      thymeleaf
      mac本地搭建ollama
      RK3568平台开发系列讲解(图像篇)JPEG图像处理
      电脑键盘功能基础知识汇总
      实战从零开始实现Raft|得物技术
      vue3+TS实现简易组件库
    • 原文地址:https://blog.csdn.net/lonelymanontheway/article/details/126550027