分为调度中心 + 执行器
调度中心:提供可视化界面,配置定时任务,定时去调用执行器
调度中心执行器管理:每个springboot作为执行器, 也就是执行器的标识
任务管理:选中执行器,创建改该执行器下的任务
分片广播:集群情况,并行处理任务
场景:大数据表并行处理任务,分散服务器压力
!!现在一张数据表里有大量数据需要某个服务端应用来处理,要求:
2能够并行处理;
3能够较灵活地控制并行任务数量。
4压力较均衡地分散到不同的服务器节点;
因为需要并行处理同一张数据表里的数据,所以比较自然地想到了分片查询数据,可以利用对 id 取模的方法进行分片,避免同一条数据被重复处理。
根据第 1、2 点要求,本来想通过对线程池的动态配置来实现,但结合第 3 点来考虑,服务器节点数量有可能会变化,节点之间相互无感知无通信,自己在应用内实现一套调度机制可能会很复杂。
如果有现成的独立于这些服务器节点之外的调度器就好了——顺着这个思路,就想到了已经接入的分布式任务调度平台 XXL-JOB,而在阅读其 官方文档 后发现「分片广播 & 动态分片」很贴合这种场景。
/**
* 2、分片广播任务
*/
@XxlJob(“shardingJobHandler”)
public void shardingJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log(“分片参数:当前分片序号 = {}, 总分片数 = {}”, shardIndex, shardTotal);
// 业务逻辑
for (int i = 0; i < shardTotal; i++) {
if (i == shardIndex) {
XxlJobHelper.log(“第 {} 片, 命中分片开始处理”, i);
} else {
XxlJobHelper.log(“第 {} 片, 忽略”, i);
}
}
}
防止重复执行: 路由策略选择一致性hash,只会固定走1台机器
任务调度中心,去官网clone,跑起来即可
springboot整合 执行器
pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!-- xxl-job-core -->
<!--定时器xxljob-->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
yml
#执行器配置
xxl:
job:
accessToken: default_token #token
admin:
# xxl-job后台管理界面的地址 调度中心地址
addresses: http://127.0.0.1:8080/xxl-job-admin
executor:
# 此执行器的名称
appname: exectorNameHaHa
#执行器地址 http://本机ip:port/
address: http://127.0.0.1:9997/
# 此执行器的ip 执行器IP默认为空,表示自动获取IP 多网卡时可手动设置指定IP,手动设置IP时将会绑定Host。
ip: 127.0.0.1
# 此执行器的端口
port: 9997
# 此执行器的日志存放路径
logpath: /data/applogs/xxl-job/jobhandler
# 此执行器的日志保存时间
logretentiondays: 7
server:
port: 8082
配置文件
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
*
* org.springframework.cloud
* spring-cloud-commons
* ${version}
*
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
执行定时业务逻辑
@Component
public class JobToPrint extends IJobHandler {
@Override
@XxlJob("JobToPrint")
public ReturnT<String> execute(String param) throws Exception {
try {
System.out.println("测试job");
return SUCCESS;
} catch (Exception e){
e.printStackTrace();
return FAIL;
}
}
}