• 如何优雅的整合定时批量任务(荣耀典藏版)


     

    目录

    前言

    一、代码具体实现

    1、pom文件配置 

    2、application.yaml文件

    3、Service实现类

    4、SpringBatch配置类

    5、Processor设置

    6、封装实体Bean

    7、启动类上要加上注解

    总结


    前言

    最近赶一个紧急需求,需求内容如下:

    PC网页触发一条设备升级记录(下图),后台要定时批量设备更新。这里定时要用到Quartz,批量数据处理要用到SpringBatch,二者结合,可以完成该需求。

    由于之前,没有用过SpringBatch,于是上网查了下资料,发现可参考的不是很多,于是只能去慢慢的翻看官方文档。 

    https://docs.spring.io/spring-batch/4.1.x/reference/html

    遇到不少问题,就记录一下吧。

    一、代码具体实现

    1、pom文件配置 

    1.  
    2.    org.springframework.boot
    3.    spring-boot-starter-web
    4.  
    5.  
    6.    org.postgresql
    7.    postgresql
    8.  
    9.  
    10.    org.springframework.boot
    11.    spring-boot-starter-jdbc
    12.  
    13.  
    14.    org.springframework.boot
    15.    spring-boot-starter-batch
    16.  
    17.  
    18.    org.projectlombok
    19.    lombok
    20.  
    21.  
    22.    org.springframework.boot
    23.    spring-boot-starter-batch
    24.  

    2、application.yaml文件

    1. spring:
    2.   datasource:
    3.     username: thinklink
    4.     password: thinklink
    5.     url: jdbc:postgresql://172.16.205.54:5432/thinklink
    6.     driver-class-name: org.postgresql.Driver
    7.   batch:
    8.     job:
    9.       enabled: false
    10. server:
    11.   port: 8073
    12. #upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/
    13. upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/
    14. # 每次批量处理的数据量,默认为5000
    15. batch-size: 5000

    3、Service实现类

    触发批处理任务的入口,执行一个job。

    1. @Service("batchService")
    2. public class BatchServiceImpl implements BatchService {
    3. // 框架自动注入
    4.     @Autowired
    5.     private JobLauncher jobLauncher;
    6.     @Autowired
    7.     private Job updateDeviceJob;
    8.     /**
    9.      * 根据 taskId 创建一个Job
    10.      * @param taskId
    11.      * @throws Exception
    12.      */
    13.     @Override
    14.     public void createBatchJob(String taskId) throws Exception {
    15.         JobParameters jobParameters = new JobParametersBuilder()
    16.                 .addString("taskId", taskId)
    17.                 .addString("uuid", UUID.randomUUID().toString().replace("-",""))
    18.                 .toJobParameters();
    19.         // 传入一个Job任务和任务需要的参数
    20.         jobLauncher.run(updateDeviceJob, jobParameters);
    21.     }
    22. }

    4、SpringBatch配置类

    注意:此部分最重要

    1. @Configuration
    2. public class BatchConfiguration {
    3.     private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);
    4.     @Value("${batch-size:5000}")
    5.     private int batchSize;
    6. // 框架自动注入
    7.     @Autowired
    8.     public JobBuilderFactory jobBuilderFactory;
    9. // 框架自动注入
    10.     @Autowired
    11.     public StepBuilderFactory stepBuilderFactory;
    12. // 数据过滤器,对从数据库读出来的数据,注意进行操作
    13.     @Autowired
    14.     public TaskItemProcessor taskItemProcessor;
    15.     // 接收job参数
    16.     public Map parameters;
    17.     public Object taskId;
    18.     @Autowired
    19.     private JdbcTemplate jdbcTemplate;
    20. // 读取数据库操作
    21.     @Bean
    22.     @StepScope
    23.     public JdbcCursorItemReader itemReader(DataSource dataSource) {
    24.         String querySql = " SELECT " +
    25.                 " e. ID AS taskId, " +
    26.                 " e.user_id AS userId, " +
    27.                 " e.timing_startup AS startTime, " +
    28.                 " u.device_id AS deviceId, " +
    29.                 " d.app_name AS appName, " +
    30.                 " d.compose_file AS composeFile, " +
    31.                 " e.failure_retry AS failureRetry, " +
    32.                 " e.tetry_times AS retryTimes, " +
    33.                 " e.device_managered AS deviceManagered " +
    34.                 " FROM " +
    35.                 " eiot_upgrade_task e " +
    36.                 " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +
    37.                 " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +
    38.                 " WHERE " +
    39.                 " ( " +
    40.                 " u.device_upgrade_status = 0 " +
    41.                 " OR u.device_upgrade_status = 2" +
    42.                 " )" +
    43.                 " AND e.tetry_times > u.retry_times " +
    44.                 " AND e. ID = ?";
    45.         return new JdbcCursorItemReaderBuilder()
    46.                 .name("itemReader")
    47.                 .sql(querySql)
    48.                 .dataSource(dataSource)
    49.                 .queryArguments(new Object[]{parameters.get("taskId").getValue()})
    50.                 .rowMapper(new DispatchRequest.DispatchRequestRowMapper())
    51.                 .build();
    52.     }
    53. // 将结果写回数据库
    54.     @Bean
    55.     @StepScope
    56.     public ItemWriter itemWriter() {
    57.         return new ItemWriter() {
    58.             private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {
    59.                 log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);
    60.                 Integer retryTimes = jdbcTemplate.queryForObject(
    61.                         "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",
    62.                         new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class
    63.                 );
    64.                 retryTimes += 1;
    65.                 int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +
    66.                         "where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());
    67.                 if (updateCount <= 0) {
    68.                     log.warn("no task updated");
    69.                 } else {
    70.                     log.info("count of {} task updated", updateCount);
    71.                 }
    72.                 // 最后一次重试
    73.                 if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {
    74.                     log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());
    75.                     return 1;
    76.                 } else {
    77.                     return 0;
    78.                 }
    79.             }
    80.             @Override
    81.             @Transactional
    82.             public void write(List list) throws Exception {
    83.                 Map taskMap = jdbcTemplate.queryForMap(
    84.                         "select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",
    85.                         list.get(0).getDispatchRequest().getTaskId() // 我们认定一个批量里面,taskId都是一样的
    86.                         );
    87.                 int deviceManagered = (int)taskMap.get("device_managered");
    88.                 Integer deviceCount = (Integer) taskMap.get("device_count");
    89.                 if (deviceCount == null) {
    90.                     log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());
    91.                 }
    92.                 int taskStatus = (int)taskMap.get("task_status");
    93.                 for (ProcessResult result: list) {
    94.                     deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());
    95.                 }
    96.                 if (deviceCount != null && deviceManagered == deviceCount) {
    97.                     taskStatus = 2//任务状态 0:待升级,1:升级中,2:已完成
    98.                 }
    99.                 jdbcTemplate.update("update eiot_upgrade_task set device_managered = ?, task_status = ? " +
    100.                         "where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());
    101.             }
    102.         };
    103.     }
    104.     /**
    105.      * 定义一个下发更新的 job
    106.      * @return
    107.      */
    108.     @Bean
    109.     public Job updateDeviceJob(Step updateDeviceStep) {
    110.         return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-"""))
    111.                 .listener(new JobListener()) // 设置Job的监听器
    112.                 .flow(updateDeviceStep)// 执行下发更新的Step
    113.                 .end()
    114.                 .build();
    115.     }
    116.     /**
    117.      * 定义一个下发更新的 step
    118.      * @return
    119.      */
    120.     @Bean
    121.     public Step updateDeviceStep(JdbcCursorItemReader itemReader,ItemWriter itemWriter) {
    122.         return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-"""))
    123.                 . chunk(batchSize)
    124.                 .reader(itemReader) //根据taskId从数据库读取更新设备信息
    125.                 .processor(taskItemProcessor) // 每条更新信息,执行下发更新接口
    126.                 .writer(itemWriter)
    127.                 .build();
    128.     }
    129.     // job 监听器
    130.     public class JobListener implements JobExecutionListener {
    131.         @Override
    132.         public void beforeJob(JobExecution jobExecution) {
    133.             log.info(jobExecution.getJobInstance().getJobName() + " before... ");
    134.             parameters = jobExecution.getJobParameters().getParameters();
    135.             taskId = parameters.get("taskId").getValue();
    136.             log.info("job param taskId : " + parameters.get("taskId"));
    137.         }
    138.         @Override
    139.         public void afterJob(JobExecution jobExecution) {
    140.             log.info(jobExecution.getJobInstance().getJobName() + " after... ");
    141.             // 当所有job执行完之后,查询设备更新状态,如果有失败,则要定时重新执行job
    142.             String sql = " SELECT " +
    143.                     " count(*) " +
    144.                     " FROM " +
    145.                     " eiot_upgrade_device d " +
    146.                     " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +
    147.                     " WHERE " +
    148.                     " u. ID = ? " +
    149.                     " AND d.retry_times < u.tetry_times " +
    150.                     " AND ( " +
    151.                     " d.device_upgrade_status = 0 " +
    152.                     " OR d.device_upgrade_status = 2 " +
    153.                     " ) ";
    154.             // 获取更新失败的设备个数
    155.             Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
    156.             log.info("update device failure count : " + count);
    157.             // 下面是使用Quartz触发定时任务
    158.             // 获取任务时间,单位秒
    159. // String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
    160.             // 此处方便测试,应该从数据库中取taskId对应的重试间隔,单位秒
    161.             Integer millSecond = 10;
    162.             if(count != null && count > 0){
    163.                 String jobName = "UpgradeTask_" + taskId;
    164.                 String reTaskId = taskId.toString();
    165.                 Map params = new HashMap<>();
    166.                 params.put("jobName",jobName);
    167.                 params.put("taskId",reTaskId);
    168.                 if (QuartzManager.checkNameNotExist(jobName))
    169.                 {
    170.                     QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);
    171.                 }
    172.             }
    173.         }
    174.     }
    175. }

    5、Processor设置

    处理每条数据,可以在此对数据进行过滤操作。

    1. @Component("taskItemProcessor")
    2. public class TaskItemProcessor implements ItemProcessor {
    3.     public static final int STATUS_DISPATCH_FAILED = 2;
    4.     public static final int STATUS_DISPATCH_SUCC = 1;
    5.     private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);
    6.     @Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")
    7.     private String dispatchUrl;
    8.     @Autowired
    9.     JdbcTemplate jdbcTemplate;
    10.     /**
    11.      * 在这里,执行 下发更新指令 的操作
    12.      * @param dispatchRequest
    13.      * @return
    14.      * @throws Exception
    15.      */
    16.     @Override
    17.     public ProcessResult process(final DispatchRequest dispatchRequest) {
    18.         // 调用接口,下发指令
    19.         String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();
    20.         log.info("request url:" + url);
    21.         RestTemplate restTemplate = new RestTemplate();
    22.         HttpHeaders headers = new HttpHeaders();
    23.         headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
    24.         MultiValueMap params = new LinkedMultiValueMap();
    25.         JSONObject jsonOuter = new JSONObject();
    26.         JSONObject jsonInner = new JSONObject();
    27.         try {
    28.             jsonInner.put("jobId",dispatchRequest.getTaskId());
    29.             jsonInner.put("name",dispatchRequest.getName());
    30.             jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));
    31.             jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));
    32.             jsonInner.put("timestamp",dispatchRequest.getTimestamp());
    33.             jsonOuter.put("method","updateApp");
    34.             jsonOuter.put("params",jsonInner);
    35.         } catch (JSONException e) {
    36.             log.info("JSON convert Exception :" + e);
    37.         }catch (IOException e) {
    38.             log.info("Base64Util bytesToBase64Str :" + e);
    39.         }
    40.         log.info("request body json :" + jsonOuter);
    41.         HttpEntity requestEntity = new HttpEntity(jsonOuter.toString(),headers);
    42.         int status;
    43.         try {
    44.             ResponseEntity response = restTemplate.postForEntity(url,requestEntity,String.class);
    45.             log.info("response :" + response);
    46.             if (response.getStatusCode() == HttpStatus.OK) {
    47.                 status = STATUS_DISPATCH_SUCC;
    48.             } else {
    49.                 status = STATUS_DISPATCH_FAILED;
    50.             }
    51.         }catch (Exception e){
    52.             status = STATUS_DISPATCH_FAILED;
    53.         }
    54.         return new ProcessResult(dispatchRequest, status);
    55.     }
    56. }

    6、封装实体Bean

    封装数据库返回数据的实体Bean,注意静态内部类

    1. public class DispatchRequest {
    2.     private String taskId;
    3.     private String deviceId;
    4.     private String userId;
    5.     private String name;
    6.     private byte[] composeFile;
    7.     private String policy;
    8.     private String timestamp;
    9.     private String md5;
    10.     private int failureRetry;
    11.     private int retryTimes;
    12.     private int deviceManagered;
    13.    // 省略构造函数,setter/getter/tostring方法
    14.    //......
    15.    
    16.     public static class DispatchRequestRowMapper implements RowMapper {
    17.         @Override
    18.         public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {
    19.             DispatchRequest dispatchRequest = new DispatchRequest();
    20.             dispatchRequest.setTaskId(resultSet.getString("taskId"));
    21.             dispatchRequest.setUserId(resultSet.getString("userId"));
    22.             dispatchRequest.setPolicy(resultSet.getString("startTime"));
    23.             dispatchRequest.setDeviceId(resultSet.getString("deviceId"));
    24.             dispatchRequest.setName(resultSet.getString("appName"));
    25.             dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));
    26.             dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));
    27.             dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));
    28.             dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));
    29.             dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));
    30.             return dispatchRequest;
    31.         }
    32.     }
    33. }

    7、启动类上要加上注解

    1. @SpringBootApplication
    2. @EnableBatchProcessing
    3. public class Application {
    4.     public static void main(String[] args) {
    5.         SpringApplication.run(Application.class, args);
    6.     }
    7. }

    总结

    其实SpringBatch并没有想象中那么好用,当从数据库中每次取5000条数据后,进入processor中是逐条处理的,这个时候不能不行操作,等5000条数据处理完之后,再一次性执行ItemWriter方法。

    在使用的过程中,最坑的地方是ItemReader和ItemWriter这两个地方,如何执行自定义的Sql,参考文中代码就行。

    至于Quartz定时功能,很简单,只要定时创建SpringBatch里面的Job,让这个job启动就好了。

    只有当你开始,你才会到达你的理想和目的地,只有当你努力,
    你才会获得辉煌的成功,只有当你播种,你才会有所收获。只有追求,
    才能尝到成功的味道,坚持在昨天叫立足,坚持在今天叫进取,坚持在明天叫成功。欢迎所有小伙伴们点赞+收藏!!! 

     

  • 相关阅读:
    C++编译期循环获取变量类型
    C语言的静态库和的动态库
    Java 面试八股文有必要背吗?要背多久
    React进阶
    Java线程面试题
    Linux ARM平台开发系列讲解(platform平台子系统) 2.10.1 platform平台子系统介绍
    [vue3.x]实战问题--vue2.x升级vue3.x遇到的问题
    Vue.js+SpringBoot开发生活废品回收系统
    记录一次我虚拟机好不容易连上后的配置
    一文详解视觉Transformer模型压缩和加速策略(量化/低秩近似/蒸馏/剪枝)
  • 原文地址:https://blog.csdn.net/weixin_48321993/article/details/125907098