• 集成elastic-job分布式调度定时任务


    前言

    定时任务这一组件在工作过程中经常使用到,在单机节点上可以直接选择使用Spring自带的定时任务组件hubble-task,而这种定时任务一旦确定固化了定时触发策略,也无法动态开启关闭,所以后来有了Quartz

    Quartz是定时任务领域的一个开源项目,由JAVA开发,可以通过API调度定时任务的启停及策略,还有对JTA事务跟集群的支持等等强大功能。

    但是Quartz又有它的一些缺点:

    • Quartz调整定时任务需要通过API的方式进行调度,本质上还是没有脱离业务系统。
    • Quartz需要持久化数据到底层数据表,对业务系统的数据侵入较高。
    • Quartz也并没有支持分布式的调度执行,只能做到单个任务单个执行

    elastic-job就是当当在Quartz的基础上进行了二次封装,elastic-job有两种版本:

    • Elastic-Job-Cloud:针对微服务的部署方式
    • Elastic-Job-Lite:基于zookeeper作为注册中心的部署方式

    这两个版本除了部署方式不一样在api上是一样的,elastic-job相对于Quartz增加了很多新特性:

    1. 支持UI页面,可以在web页面上动态调整定时策略跟启停
    2. 基于Zookeeper作为分布式的调度,调度跟任务解耦
    3. 支持了分布式调度分片,同一个任务可以分成多片执行
    4. 作业类型多种,支持Simple、DataFLow数据流、Script脚本
    5. 失效转移,下线的机器的任务会重新分片执行
    6. 作业分片的一致性,任务分片后不会重复执行
    7. 错过执行的作业补偿

    安装

    安装elastic-job-lite方式,需要提前安装zookeeper,如果需要安装教程可以看这篇文章:Linux在线安装Zookeeper

    elastic-job在apache的地址:elasticjob

    然后就需要运行包含Elastic-Job-Lite和业务代码的jar文件。不限于jar或war的启动方式。

    源码地址:elastic-job-lite

    启动elastic-job-lite-console

    下载2.1.4版本的源码:

    https://codeload.github.com/apache/shardingsphere-elasticjob/zip/refs/tags/2.1.4
    
    • 1

    下载完成解压后有如下目录:

    在这里插入图片描述

    进入elastic-job-lite文件下的elastic-job-lite-console。在此目录下进行打包,打包命令

    mvn clean install -Dmaven.test.skip=true  
    
    • 1

    打包好了启动jar包即可,也可以直接启动源码,找到console模块下的ConsoleBootstrap类进行启动

    在这里插入图片描述

    启动完成后访问ip:8899,账密为:root/root

    进入系统后进入注册中心配置,填写需要注册的zookeeper地址进行连接。

    在这里插入图片描述

    下面是在linux中安装配置。也可以直接将打好的包放到linux中执行。

    安装控制台

    elastic-job 3.0后没有了console模块,有了更加优美的ui控制台

    这里直接下载控制台包:

    https://archive.apache.org/dist/shardingsphere/elasticjob-ui-3.0.0-RC1/apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz
    
    • 1

    下载后上传到服务器进行解压

     tar -zxvf apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz
    
    • 1

    到bin文件里进行启动

    ./start.sh
    
    • 1

    启动成功后访问ip地址:8088,默认账号密码为root/root

    在这里插入图片描述

    进入后需要在全局配置-注册中心配置添加注册中心

    在这里插入图片描述

    新增完成后建立连接,elastic-job初步就搭建好了,如果想要引入数据源需要修改conf文件下的application.properties配置文件

    我想要修改为mysql作为数据库,需要再lib文件中加入连接包,这个手动上传即可。

    vim application.properties
    
    • 1

    修改为mysql的驱动跟连接方式

    在这里插入图片描述

    保存文件然后重新启动elastic-job,在事件追踪数据源配置中添加数据源,如下图:

    在这里插入图片描述

    点击建立连接,后面定时任务的配置及日志会记录在表里

    在这里插入图片描述

    集成

    简单集成

    引入pom依赖

    <dependency>
    	<groupId>com.cxytiandigroupId>
    	<artifactId>elastic-job-spring-boot-starterartifactId>
    	<version>1.0.0version>
    dependency>
    <dependency>
    	<groupId>org.apache.curatorgroupId>
    	<artifactId>curator-frameworkartifactId>
    	<version>2.10.0version>
    dependency>
    <dependency>
    	<groupId>org.apache.curatorgroupId>
    	<artifactId>curator-recipesartifactId>
    	<version>2.10.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在配置文件application.properties添加配置:

    elasticJob.zk.serverLists=localhost:2181
    elasticJob.zk.namespace=user-sync
    
    • 1
    • 2

    然后就可以直接定义一个job类来验证了,代码如下:

    @Component
    @ElasticJobConf(name = "TestJob",cron="0 0 0 * * ?",shardingTotalCount=5) // 每天零点执行
    public class TestJob extends SimpleJob{
      @Override
      public void execute(ShardingContext shardingContext) {
      	// 要执行的逻辑
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这种方式的定时策略是依赖在ElasticJobConf注解上,调整注解的配置即可。

    常规集成

    常规集成有三个类

    • ElasticJobConfig:elastic-job组件的配置,举例zookeeper配置中心
    • ElasticJobHandler:job任务的具体执行类可以配置在这里
    • ElasticJobListener:job任务的监听,开始跟结束
    • ElasticJobProperties:从配置文件读取zookeeper配置

    引入依赖

    <dependency>
        <groupId>com.dangdanggroupId>
        <artifactId>elastic-job-common-coreartifactId>
        <version>${elasticjob.version}version>
        <exclusions>
            <exclusion>
                <artifactId>guavaartifactId>
                <groupId>com.google.guavagroupId>
            exclusion>
            <exclusion>
                <artifactId>curator-frameworkartifactId>
                <groupId>org.apache.curatorgroupId>
            exclusion>
        exclusions>
    dependency>
    <dependency>
        <groupId>com.dangdanggroupId>
        <artifactId>elastic-job-lite-coreartifactId>
        <version>${elasticjob.version}version>
    dependency>
    
    <dependency>
        <groupId>com.dangdanggroupId>
        <artifactId>elastic-job-lite-springartifactId>
        <version>${elasticjob.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.curatorgroupId>
        <artifactId>curator-recipesartifactId>
        <version>2.10.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    ElasticJobConfig

    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    @Component
    @ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true")
    public class ElasticJobConfig {
        private final ElasticJobProperties jobProperties;
    
        public ElasticJobConfig(ElasticJobProperties jobProperties) {
            this.jobProperties = jobProperties;
        }
    
        @Bean(initMethod = "init")
        public ZookeeperRegistryCenter regCenter() {
            return new ZookeeperRegistryCenter(new ZookeeperConfiguration(jobProperties.getServerLists(),
                    jobProperties.getNamespace()));
        }
    
        @Bean
        public ElasticJobListener elasticJobListener() {
            return new ElasticJobListener(100, 100);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    ElasticJobHandler

    import com.dangdang.ddframe.job.api.simple.SimpleJob;
    import com.dangdang.ddframe.job.config.JobCoreConfiguration;
    import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
    import com.dangdang.ddframe.job.lite.api.JobScheduler;
    import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
    import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.autoconfigure.AutoConfigureAfter;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Slf4j
    @Configuration
    @AutoConfigureAfter(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class)
    @ConditionalOnBean(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class)
    public class ElasticJobHandlerConfig {
        private final ZookeeperRegistryCenter zookeeperRegistryCenter;
    
        public ElasticJobHandlerConfig(ZookeeperRegistryCenter zookeeperRegistryCenter) {
            this.zookeeperRegistryCenter = zookeeperRegistryCenter;
        }
    
        /**
         * 配置任务详细信息
         *
         * @param jobClass               定时任务实现类
         * @param cron                   表达式
         * @param shardingTotalCount     分片数
         * @param shardingItemParameters 分片参数
         * @return
         */
        private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                             final String cron,
                                                             final int shardingTotalCount,
                                                             final String shardingItemParameters,
                                                             final String jobParameters,
                                                             final String description) {
            // 定义作业核心配置
            JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount).
                    shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).description(description).build();
            // 定义SIMPLE类型配置
            SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
            // 定义Lite作业根配置
            return LiteJobConfiguration.newBuilder(simpleJobConfig).build();
        }
    
    
        /**
         * 具体任务
         */
        @Bean(initMethod = "init")
        public JobScheduler pushHrhbJobScheduler(final TestJob testjob,
                                                 @Value("${job.test.cron}") final String cron,
                                                 @Value("${job.test.shardingTotalCount}") final int shardingTotalCount,
                                                 @Value("${job.test.description}") final String description) {
    
            return new SpringJobScheduler(testjob, zookeeperRegistryCenter, getLiteJobConfiguration(testjob.getClass(),
                    cron, shardingTotalCount, "", "", description));
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    ElasticJobListener

    import com.dangdang.ddframe.job.executor.ShardingContexts;
    import com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
    
        /**
         * 设置间隔时间
         *
         * @param startedTimeoutMilliseconds
         * @param completedTimeoutMilliseconds
         */
        public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
            super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
        }
    
        @Override
        public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
            log.info("任务名:{}开始", shardingContexts.getJobParameter());
        }
    
        @Override
        public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
            log.info("任务名:{}结束", shardingContexts.getJobParameter());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    ElasticJobProperties

    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @ConfigurationProperties(prefix = "elasticjob")
    public class ElasticJobProperties {
        private boolean enabled = true;
    
        private String serverLists;
    
        private String namespace;
    
        public boolean isEnabled() {
            return enabled;
        }
    
        public void setEnabled(boolean enabled) {
            this.enabled = enabled;
        }
    
        public String getServerLists() {
            return serverLists;
        }
    
        public void setServerLists(String serverLists) {
            this.serverLists = serverLists;
        }
    
        public String getNamespace() {
            return namespace;
        }
    
        public void setNamespace(String namespace) {
            this.namespace = namespace;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
  • 相关阅读:
    【【萌新的FPGA学习之快速回顾 水 水 】】
    当初吃土建起来的“中台”,现在为啥不香了?
    es6新增-Generator(异步编程的解决方案2)
    CMake入门(一)Ubuntu下使用和Window下使用
    博客摘录「 vue中调接口的方式:this.$api、直接调用、axios」2023年11月14日
    C语言经典题目之青蛙跳台阶问题
    2022-04-25-ElasticSearch
    SQL 注入笔记
    redis的java客户端之jedis
    面包店收银系统怎么选
  • 原文地址:https://blog.csdn.net/AnNanDu/article/details/127451645