监听器,用于监听作业的执行过程
JobExecutionListener(before,after)
StepExecutionListener(before,after)
ChunkListener(before,after)
ItemReadListener/ItemProcessListener/ItemWriteListener(before,after,error)
package com.it2.springbootspringbatch01.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
@Slf4j
public class MyJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
log.info(jobExecution.getJobInstance().getJobName()+" beforeJob");
}
@Override
public void afterJob(JobExecution jobExecution) {
log.info(jobExecution.getJobInstance().getJobName()+" afterJob");
}
}
package com.it2.springbootspringbatch01.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.annotation.AfterChunk;
import org.springframework.batch.core.annotation.BeforeChunk;
import org.springframework.batch.core.scope.context.ChunkContext;
@Slf4j
public class MyChunkListener {
@BeforeChunk
public void before(ChunkContext context){
log.info(context.getStepContext().getStepName()+" BeforeChunk");
}
@AfterChunk
public void after(ChunkContext context){
log.info(context.getStepContext().getStepName()+" AfterChunk");
}
}
package com.it2.springbootspringbatch01.config;
import com.it2.springbootspringbatch01.listener.MyChunkListener;
import com.it2.springbootspringbatch01.listener.MyJobListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.adapter.ItemProcessorAdapter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
@Configuration
@EnableBatchProcessing
@Slf4j
public class ListenerJobDemo {
//注入任务对象工厂
@Autowired
private JobBuilderFactory jobBuilderFactory;
//任务的执行由Step决定,注入step对象的factory
@Autowired
private StepBuilderFactory stepBuilderFactory;
//创建Job对象
@Bean
public Job jobDemo(){
return jobBuilderFactory.get("jobDemo")
.start(chunk_step1())
.listener(new MyJobListener()) //给job添加监听器
.build();
}
//创建Step对象
@Bean
public Step chunk_step1() {
return stepBuilderFactory.get("chunk_step1")
.<String,Integer> chunk(2) //表示每一次处理reader两条,reader/process/write
.faultTolerant()
.listener(new MyChunkListener())
.reader(read()) //读取
.processor(processor()) //处理
.writer(write()) //写入
.build();
}
public ItemReader<String> read() { //一组字符串,字符内容包含数字和字母
return new ListItemReader<String>(Arrays.asList("a3","b5","cbd7","f9","x10"));
}
public ItemProcessor<? super String,? extends Integer> processor() {
return new ItemProcessor<String, Integer>() { //将字符串中的字母去掉,返回int数值
public Integer process(String s) throws Exception {
Integer i= Integer.parseInt(s.replaceAll("[a-zA-Z]", ""));
log.info("字符串{} 转换{}",s,i);
return i;
}
};
}
public ItemWriter<Integer> write() {
return new ItemWriter<Integer>() { //打印处理完的内容
@Override
public void write(List<? extends Integer> list) throws Exception {
log.info("写入数据:"+list);
}
};
}
}

chunk表示块的意思,比如
tepBuilderFactory.get("chunk_step1")
.<String,Integer> chunk(2) //表示每一次处理reader两条,reader/process/write
.faultTolerant()
.listener(new MyChunkListener())
.reader(read()) //读取
.processor(processor()) //处理
.writer(write()) //写入
.build();
chunk(2) 表示2个数据为一个块。那么 read就会读取到ItemReader两条记录(从最开始执行),a3和b5为一个块,cbd7和f9为一个块,x10为一个块(最后不足数量也按照一个块算)
public ItemReader<String> read() {
return new ListItemReader<String>(Arrays.asList("a3","b5","cbd7","f9","x10"));
}
而processor则是一条一条处理的,和chunk无关
public ItemProcessor<? super String,? extends Integer> processor() {
return new ItemProcessor<String, Integer>() {
public Integer process(String s) throws Exception {
Integer i= Integer.parseInt(s.replaceAll("[a-zA-Z]", ""));
log.info("字符串{} 转换{}",s,i);
return i;
}
};
}

writer方法,read读取数据之后,经过processor逐条处理之后,再按照块发送给write方法进行写操作,同样也按照chunk的块大小进行分批操作。
public ItemWriter<Integer> write() {
return new ItemWriter<Integer>() {
@Override
public void write(List<? extends Integer> list) throws Exception {
log.info("写入数据:"+list);
}
};
}
