• SpringBatch(7):监听器


    前言

    监听器,用于监听作业的执行过程

    监听器有哪些

    JobExecutionListener(before,after)
    StepExecutionListener(before,after)
    ChunkListener(before,after)
    ItemReadListener/ItemProcessListener/ItemWriteListener(before,after,error)

    监听器示例

    1. 定义用于监听Job的监听器
    package com.it2.springbootspringbatch01.listener;
    
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobExecutionListener;
    
    @Slf4j
    public class MyJobListener implements JobExecutionListener {
        @Override
        public void beforeJob(JobExecution jobExecution) {
          log.info(jobExecution.getJobInstance().getJobName()+"  beforeJob");
        }
    
        @Override
        public void afterJob(JobExecution jobExecution) {
            log.info(jobExecution.getJobInstance().getJobName()+"  afterJob");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    1. 定义用于监听chunk的监听器
    package com.it2.springbootspringbatch01.listener;
    
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.annotation.AfterChunk;
    import org.springframework.batch.core.annotation.BeforeChunk;
    import org.springframework.batch.core.scope.context.ChunkContext;
    
    @Slf4j
    public class MyChunkListener {
    
        @BeforeChunk
        public void before(ChunkContext context){
            log.info(context.getStepContext().getStepName()+"  BeforeChunk");
        }
    
        @AfterChunk
        public void after(ChunkContext context){
            log.info(context.getStepContext().getStepName()+"  AfterChunk");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    1. 编写demo,使用监听器
    package com.it2.springbootspringbatch01.config;
    
    import com.it2.springbootspringbatch01.listener.MyChunkListener;
    import com.it2.springbootspringbatch01.listener.MyJobListener;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.StepContribution;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.scope.context.ChunkContext;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.adapter.ItemProcessorAdapter;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Arrays;
    import java.util.List;
    
    @Configuration
    @EnableBatchProcessing
    @Slf4j
    public class ListenerJobDemo {
    
        //注入任务对象工厂
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        //任务的执行由Step决定,注入step对象的factory
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        //创建Job对象
        @Bean
        public Job jobDemo(){
            return jobBuilderFactory.get("jobDemo")
                    .start(chunk_step1())
                    .listener(new MyJobListener()) //给job添加监听器
                    .build();
        }
    
        //创建Step对象
        @Bean
        public Step chunk_step1() {
            return  stepBuilderFactory.get("chunk_step1")
                    .<String,Integer> chunk(2) //表示每一次处理reader两条,reader/process/write
                    .faultTolerant()
                    .listener(new MyChunkListener())
                    .reader(read()) //读取
                    .processor(processor()) //处理
                    .writer(write()) //写入
                    .build();
    
        }
    
        public ItemReader<String> read() { //一组字符串,字符内容包含数字和字母
            return new ListItemReader<String>(Arrays.asList("a3","b5","cbd7","f9","x10"));
        }
    
        public ItemProcessor<? super String,? extends Integer> processor() {
            return new ItemProcessor<String, Integer>() {  //将字符串中的字母去掉,返回int数值
                public Integer process(String s) throws Exception {
                    Integer i= Integer.parseInt(s.replaceAll("[a-zA-Z]", ""));
                    log.info("字符串{} 转换{}",s,i);
                    return i;
                }
            };
        }
    
        public ItemWriter<Integer> write() {
            return new ItemWriter<Integer>() { //打印处理完的内容
                @Override
                public void write(List<? extends Integer> list) throws Exception {
                    log.info("写入数据:"+list);
                }
            };
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    1. 运行启动服务器,观察日志我们发现,JobExecutionListener在任务的前和后被执行了,而MyChunkListener则是按照chunk的批次来执行。
      在这里插入图片描述

    chunk

    chunk表示块的意思,比如

    tepBuilderFactory.get("chunk_step1")
                    .<String,Integer> chunk(2) //表示每一次处理reader两条,reader/process/write
                    .faultTolerant()
                    .listener(new MyChunkListener())
                    .reader(read()) //读取
                    .processor(processor()) //处理
                    .writer(write()) //写入
                    .build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    chunk(2) 表示2个数据为一个块。那么 read就会读取到ItemReader两条记录(从最开始执行),a3和b5为一个块,cbd7和f9为一个块,x10为一个块(最后不足数量也按照一个块算)

        public ItemReader<String> read() {
            return new ListItemReader<String>(Arrays.asList("a3","b5","cbd7","f9","x10"));
        }
    
    • 1
    • 2
    • 3

    而processor则是一条一条处理的,和chunk无关

    public ItemProcessor<? super String,? extends Integer> processor() {
            return new ItemProcessor<String, Integer>() {
                public Integer process(String s) throws Exception {
                    Integer i= Integer.parseInt(s.replaceAll("[a-zA-Z]", ""));
                    log.info("字符串{} 转换{}",s,i);
                    return i;
                }
            };
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    writer方法,read读取数据之后,经过processor逐条处理之后,再按照块发送给write方法进行写操作,同样也按照chunk的块大小进行分批操作。

    public ItemWriter<Integer> write() {
            return new ItemWriter<Integer>() {
                @Override
                public void write(List<? extends Integer> list) throws Exception {
                    log.info("写入数据:"+list);
                }
            };
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述

  • 相关阅读:
    Go语言开发实战课后编程题
    2023年7月工作经历三
    使用mybatisplus进行分页查询total总为0的原因而且分页不起任何作用 是因为mybatis-plus插件没有起作用
    PLC远程调试
    为什么要在时钟输出上预留电容的工位?
    在OCP集群中安装NSX ALB AKO
    1204. 最后一个能进入电梯的人
    基于Java毕业设计影院网上售票系统源码+系统+mysql+lw文档+部署软件
    支付宝推出AI毛发自测工具,上传照片即可自测脱发等级
    部署Docker私有镜像仓库Harbor
  • 原文地址:https://blog.csdn.net/u011628753/article/details/126836045