定时任务这一组件在工作过程中经常使用到,在单机节点上可以直接选择使用Spring自带的定时任务组件hubble-task,而这种定时任务一旦确定固化了定时触发策略,也无法动态开启关闭,所以后来有了Quartz。
Quartz是定时任务领域的一个开源项目,由JAVA开发,可以通过API调度定时任务的启停及策略,还有对JTA事务跟集群的支持等等强大功能。
但是Quartz又有它的一些缺点:
而elastic-job就是当当在Quartz的基础上进行了二次封装,elastic-job有两种版本:
这两个版本除了部署方式不一样在api上是一样的,elastic-job相对于Quartz增加了很多新特性:
安装elastic-job-lite方式,需要提前安装zookeeper,如果需要安装教程可以看这篇文章:Linux在线安装Zookeeper
elastic-job在apache的地址:elasticjob
然后就需要运行包含Elastic-Job-Lite和业务代码的jar文件。不限于jar或war的启动方式。
源码地址:elastic-job-lite
下载2.1.4版本的源码:
https://codeload.github.com/apache/shardingsphere-elasticjob/zip/refs/tags/2.1.4
下载完成解压后有如下目录:
进入elastic-job-lite文件下的elastic-job-lite-console。在此目录下进行打包,打包命令
mvn clean install -Dmaven.test.skip=true
打包好了启动jar包即可,也可以直接启动源码,找到console模块下的ConsoleBootstrap类进行启动
启动完成后访问ip:8899,账密为:root/root
进入系统后进入注册中心配置,填写需要注册的zookeeper地址进行连接。
下面是在linux中安装配置。也可以直接将打好的包放到linux中执行。
在elastic-job 3.0后没有了console模块,有了更加优美的ui控制台
这里直接下载控制台包:
https://archive.apache.org/dist/shardingsphere/elasticjob-ui-3.0.0-RC1/apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz
下载后上传到服务器进行解压
tar -zxvf apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz
到bin文件里进行启动
./start.sh
启动成功后访问ip地址:8088,默认账号密码为root/root
进入后需要在全局配置-注册中心配置添加注册中心
新增完成后建立连接,elastic-job初步就搭建好了,如果想要引入数据源需要修改conf文件下的application.properties配置文件
我想要修改为mysql作为数据库,需要再lib文件中加入连接包,这个手动上传即可。
vim application.properties
修改为mysql的驱动跟连接方式
保存文件然后重新启动elastic-job,在事件追踪数据源配置中添加数据源,如下图:
点击建立连接,后面定时任务的配置及日志会记录在表里
引入pom依赖
<dependency>
<groupId>com.cxytiandigroupId>
<artifactId>elastic-job-spring-boot-starterartifactId>
<version>1.0.0version>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-frameworkartifactId>
<version>2.10.0version>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-recipesartifactId>
<version>2.10.0version>
dependency>
在配置文件application.properties添加配置:
elasticJob.zk.serverLists=localhost:2181
elasticJob.zk.namespace=user-sync
然后就可以直接定义一个job类来验证了,代码如下:
@Component
@ElasticJobConf(name = "TestJob",cron="0 0 0 * * ?",shardingTotalCount=5) // 每天零点执行
public class TestJob extends SimpleJob{
@Override
public void execute(ShardingContext shardingContext) {
// 要执行的逻辑
}
}
这种方式的定时策略是依赖在ElasticJobConf注解上,调整注解的配置即可。
常规集成有三个类
引入依赖
<dependency>
<groupId>com.dangdanggroupId>
<artifactId>elastic-job-common-coreartifactId>
<version>${elasticjob.version}version>
<exclusions>
<exclusion>
<artifactId>guavaartifactId>
<groupId>com.google.guavagroupId>
exclusion>
<exclusion>
<artifactId>curator-frameworkartifactId>
<groupId>org.apache.curatorgroupId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>com.dangdanggroupId>
<artifactId>elastic-job-lite-coreartifactId>
<version>${elasticjob.version}version>
dependency>
<dependency>
<groupId>com.dangdanggroupId>
<artifactId>elastic-job-lite-springartifactId>
<version>${elasticjob.version}version>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-recipesartifactId>
<version>2.10.0version>
dependency>
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true")
public class ElasticJobConfig {
private final ElasticJobProperties jobProperties;
public ElasticJobConfig(ElasticJobProperties jobProperties) {
this.jobProperties = jobProperties;
}
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter() {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(jobProperties.getServerLists(),
jobProperties.getNamespace()));
}
@Bean
public ElasticJobListener elasticJobListener() {
return new ElasticJobListener(100, 100);
}
}
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
@AutoConfigureAfter(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class)
@ConditionalOnBean(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class)
public class ElasticJobHandlerConfig {
private final ZookeeperRegistryCenter zookeeperRegistryCenter;
public ElasticJobHandlerConfig(ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
}
/**
* 配置任务详细信息
*
* @param jobClass 定时任务实现类
* @param cron 表达式
* @param shardingTotalCount 分片数
* @param shardingItemParameters 分片参数
* @return
*/
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters,
final String jobParameters,
final String description) {
// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount).
shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).description(description).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
// 定义Lite作业根配置
return LiteJobConfiguration.newBuilder(simpleJobConfig).build();
}
/**
* 具体任务
*/
@Bean(initMethod = "init")
public JobScheduler pushHrhbJobScheduler(final TestJob testjob,
@Value("${job.test.cron}") final String cron,
@Value("${job.test.shardingTotalCount}") final int shardingTotalCount,
@Value("${job.test.description}") final String description) {
return new SpringJobScheduler(testjob, zookeeperRegistryCenter, getLiteJobConfiguration(testjob.getClass(),
cron, shardingTotalCount, "", "", description));
}
}
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
/**
* 设置间隔时间
*
* @param startedTimeoutMilliseconds
* @param completedTimeoutMilliseconds
*/
public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
}
@Override
public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
log.info("任务名:{}开始", shardingContexts.getJobParameter());
}
@Override
public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
log.info("任务名:{}结束", shardingContexts.getJobParameter());
}
}
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "elasticjob")
public class ElasticJobProperties {
private boolean enabled = true;
private String serverLists;
private String namespace;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getServerLists() {
return serverLists;
}
public void setServerLists(String serverLists) {
this.serverLists = serverLists;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}