• 工具篇--分布式定时任务springBoot--elasticjob简单使用(1)



    前言

    本文对 elasticjob 的简单使用进行介绍。


    一、elasticjob 介绍:

    ElasticJob 是一个分布式任务调度框架,由当当网开发并开源。它基于 Zookeeper 实现分布式协调,采用经典的分片算法,能够实现弹性扩容和缩容的分布式任务调度。ElasticJob 能够灵活地应用于各种环境中,如易用性、稳定性、弹性可伸缩等方面表现优异。

    以下是 ElasticJob 的一些主要特性:

    1. 分布式任务调度:ElasticJob 基于 Zookeeper 实现分布式协调,支持分布式自动负载均衡调度。

    2. 弹性扩缩容:ElasticJob 支持弹性扩容和缩容,在任务节点伸缩时能够自动调整任务分片。

    3. 丰富的定时调度策略:ElasticJob 提供了各种灵活的调度策略,如简单的固定频率、CRON 等。

    4. 支持多种任务处理逻辑:ElasticJob 支持处理数据流、打印日志、脚本处理等多种任务处理逻辑。

    5. 统计和监控:ElasticJob 提供了完善的统计和监控功能,可以监控任务执行状态和运行情况。

    总体来说,ElasticJob 是一个功能丰富、可靠且易于集成的分布式任务调度框架,广泛应用于企业系统中的定时任务调度、数据处理等场景。

    二、elasticjob 使用:

    2.1 部署zookeeper:

    因为job需要注册到zk 上,依赖于zk 的leader选举,所以需要先进行zk 的安装;
    阿里云轻量服务器–Docker–Zookeeper&Kafka安装;
    window Zookeeper zk 启动;

    2.2 引入库

    
    
    
    <dependency>
       <groupId>org.apache.shardingsphere.elasticjobgroupId>
       <artifactId>elasticjob-lite-coreartifactId>
       <version>3.0.4version>
       
    dependency>
    
    
    <dependency>
       <groupId>org.yamlgroupId>
       <artifactId>snakeyamlartifactId>
       
       <version>2.2version>
    dependency>
    
    
    <dependency>
       <groupId>org.codehaus.groovygroupId>
       <artifactId>groovy-allartifactId>
       <version>2.4.15version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2.2 定义任务:

    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.shardingsphere.elasticjob.api.ShardingContext;
    import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
    import org.springframework.stereotype.Component;
    
    
    @Slf4j
    @Component
    public class MyJob implements SimpleJob {
        @Override
        public void execute(ShardingContext shardingContext) {
    
            // 分片参数 0=text,1=image,2=radio,3=vedio
            String  shardingParameter= shardingContext.getShardingParameter();
            String  jobParameter= shardingContext.getJobParameter();
    
            log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{},jobParamer:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
                    shardingContext.getShardingItem(), shardingParameter,jobParameter);
            if ("text".equals(jobParameter)) {
                // do something by sharding
            }
            switch (shardingContext.getShardingItem()) {
                case 0:
                    // do something by sharding item 0
                    break;
                case 1:
                    // do something by sharding item 1
                    break;
                case 2:
                    // do something by sharding item 2
                    break;
                // case n: ...
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    2.3 任务执行:

    
    import groovy.lang.GroovyShell;
    import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
    import org.apache.shardingsphere.elasticjob.api.JobExtraConfiguration;
    import org.apache.shardingsphere.elasticjob.infra.env.HostException;
    import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
    import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
    import org.apache.shardingsphere.elasticjob.lite.internal.snapshot.SnapshotService;
    import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
    import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
    import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
    import org.apache.shardingsphere.elasticjob.tracing.event.JobEvent;
    import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
    import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
    import org.springframework.util.StringUtils;
    
    
    public class MyJobDemo {
        public static void main(String[] args) {
        	// 电脑连接无线网时 连接zk 可能出现ip is null 错误,此时设置一个ip给到zk
            shieldElasticjobIpIsNull();
            // 运行任务
            new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createJobConfiguration()).schedule();
        }
    	// 配置zookeeper 连接
        private static CoordinatorRegistryCenter createRegistryCenter() {
            ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "my-job");
            zookeeperConfiguration.setConnectionTimeoutMilliseconds(10000);
            zookeeperConfiguration.setSessionTimeoutMilliseconds(10000);
            zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000);
    
            CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
            regCenter.init();
            return regCenter;
            // ... 分片参数 分片从0开始到分片总数-1
        }
    	// 配置job 运行时机
        private static JobConfiguration createJobConfiguration() {
    
            // 创建作业配置 
            /**
            * myjob-param job 的名称 同一个zk命名空间下 需要唯一
            * 1 分片个数
            * cron 任务运行的cron 表达式
            * overwrite 运行job 的配置被覆盖写入,默认为false
            * shardingItemParameters  分片参数(随后同 分片个数一同介绍)
            * jobParameter job的参数(job 业务端在执行任务的时候可以接收到该参数)
            **/
            JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-param", 1).cron("0/5 * * * * ?")
                    .overwrite(true)
                  //  .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
                    .jobParameter("jobparamer")
                    .build();
            return jobConfiguration;
    
        }
    
        /**
         * 屏蔽org.apache.shardingsphere.elasticjob.infra.env.IpUtils.getIp()抛出
         * HostException(ip is null) 的异常导致windows本地程序无法启动
         */
        private static void shieldElasticjobIpIsNull(){
            try {
                IpUtils.getIp();
            } catch (HostException e) {
                //抛出HostException 且 异常信息为 "ip is null" 时,设置ip地址为 0.0.0.0
                if("ip is null".equals(e.getMessage())){
                    String code = "org.apache.shardingsphere.elasticjob.infra.env.IpUtils.cachedIpAddress=\"0.0.0.0\";";
                    GroovyShell groovy = new GroovyShell();
                    groovy.evaluate(code);
                }
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    2.4 任务执行控制台输出:

    在这里插入图片描述

    三、elasticjob 启动错误:

    3.1 KeeperErrorCode = OperationTimeout:

    在这里插入图片描述
    报错位置在 ZookeeperRegistryCenter 的 init() 方法中:
    在这里插入图片描述
    这里等待一段时间后如果还没有连接到zk 就会报错,默认的等待时间是 3000ms * 3 = 9s ,此时可以考虑增加 maxSleepTimeMilliseconds 的时间:

     private static CoordinatorRegistryCenter createRegistryCenter() {
            ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("139.196.92.249:2181", "my-job");
            zookeeperConfiguration.setConnectionTimeoutMilliseconds(10000);
            zookeeperConfiguration.setSessionTimeoutMilliseconds(10000);
            // 增加 maxSleepTimeMilliseconds  时间
            zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000);
    
            CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
            regCenter.init();
            return regCenter;
            // ... 分片参数 分片从0开始到分片总数-1
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.2 .HostException: ip is null:

    在这里插入图片描述

    错误代码:
    在这里插入图片描述
    这里会会获取到本机的ip 进行遍历找到一个符合要求的ip 然后进行返回,如果所有的ip 都不通过,则抛出ip is null 的问题;
    处理:引入: groovy-all

    <dependency>
        <groupId>org.codehaus.groovygroupId>
        <artifactId>groovy-allartifactId>
        <version>2.4.15version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    编写如下方法,目的只是为了在特殊情况下改变cachedIpAddress的值:

    /**
     * 屏蔽org.apache.shardingsphere.elasticjob.infra.env.IpUtils.getIp()抛出
     * HostException(ip is null) 的异常导致windows本地程序无法启动
     */
    private static void shieldElasticjobIpIsNull(){
        try {
            IpUtils.getIp();
        } catch (HostException e) {
            //抛出HostException 且 异常信息为 "ip is null" 时,设置ip地址为 0.0.0.0
            if("ip is null".equals(e.getMessage())){
                String code = "org.apache.shardingsphere.elasticjob.infra.env.IpUtils.cachedIpAddress=\"0.0.0.0\";";
                GroovyShell groovy = new GroovyShell();
                groovy.evaluate(code);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    启动类的main方法内部一开始就调用上面这个shieldElasticjobIpIsNull初始化cachedIpAddress:

    public static void main(String[] args) {
     //屏蔽org.apache.shardingsphere.elasticjob.infra.env.IpUtils.getIp()抛出
        //HostException(ip is null) 的异常导致windowes本地程序无法启动的问题
        shieldElasticjobIpIsNull();
        new ScheduleJobBootstrap(createRegistryCenter(), new MyJob(), createJobConfiguration()).schedule();
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    总结

    本文对 elasticjob 的简单使用进行介绍。

  • 相关阅读:
    llama factory 大数据量下训练失败
    人与人之间的差异在于外界
    查找算法思想及代码——C语言
    C#中使用CAS实现无锁算法
    [python opencv video抠图并更换背景]
    13.Spring security权限管理
    Postman 全局配置接口路径变量等
    定制ASP.NET Core的身份认证
    dashboard报错 错误:无法获取网络列表、dashboard报错 错误:无法获取云主机列表 解决流程
    C++ 类和对象(上)------超详细解析,小白必看系列
  • 原文地址:https://blog.csdn.net/l123lgx/article/details/136596530