Spring Batch 处理处理数据,分为ItemReader 读 ItemProessor 处理 ItemWriter
ItemProcessor 是两者之间的一个过程,它可以做很多事情。
获取偶数id的User
package com.it2.springbootspringbatch01.itemprocessor;
import com.alibaba.fastjson.JSONObject;
import com.it2.springbootspringbatch01.itemreaderdb.User;
import com.it2.springbootspringbatch01.listener.MyChunkListener;
import com.it2.springbootspringbatch01.listener.MyJobListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Configuration
@EnableBatchProcessing
@Slf4j
public class ItemProcessorDemo {
//注入任务对象工厂
@Autowired
private JobBuilderFactory jobBuilderFactory;
//任务的执行由Step决定,注入step对象的factory
@Autowired
private StepBuilderFactory stepBuilderFactory;
//创建Job对象
@Bean
public Job jobDemo() {
return jobBuilderFactory.get("jobDemo")
.start(chunk_step1())
.listener(new MyJobListener()) //给job添加监听器
.build();
}
//创建Step对象
@Bean
public Step chunk_step1() {
return stepBuilderFactory.get("chunk_step1")
.<User, User>chunk(2) //表示每一次处理reader两条,reader/process/write
.faultTolerant()
.reader(read()) //读取
.processor(processor()) //处理
.writer(write()) //写入
.build();
}
public ItemReader<User> read() { //一组字符串,字符内容包含数字和字母
List<User> users = new ArrayList<>();
for (int i = 0; i < 10; i++) {
User user = new User();
user.setId(i);
user.setName("java-" + i);
user.setOrderId(10000 + i);
users.add(user);
}
return new ListItemReader<User>(users);
}
/**
* 数据处理
*
* @return
*/
public ItemProcessor<? super User, ? extends User> processor() {
return new ItemProcessor<User, User>() { //只要偶数id的user
public User process(User user) throws Exception {
if (user.getId() % 2 == 0) {
return user;
}
return null;
}
};
}
public ItemWriter<User> write() {
return new ItemWriter<User>() { //打印处理完的内容
@Override
public void write(List<? extends User> list) throws Exception {
log.info("写入数据:" + list);
System.out.println(JSONObject.toJSONString(list));
}
};
}
}
启动服务器运行,ItemProcessor是逐条处理的,与chunk的大小无关。

package com.it2.springbootspringbatch01.itemprocessor;
import com.alibaba.fastjson.JSONObject;
import com.it2.springbootspringbatch01.itemreaderdb.User;
import com.it2.springbootspringbatch01.listener.MyChunkListener;
import com.it2.springbootspringbatch01.listener.MyJobListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Configuration
@EnableBatchProcessing
@Slf4j
public class ItemProcessorDemo {
//注入任务对象工厂
@Autowired
private JobBuilderFactory jobBuilderFactory;
//任务的执行由Step决定,注入step对象的factory
@Autowired
private StepBuilderFactory stepBuilderFactory;
//创建Job对象
@Bean
public Job jobDemo() {
return jobBuilderFactory.get("jobDemo")
.start(chunk_step1())
.listener(new MyJobListener()) //给job添加监听器
.build();
}
//创建Step对象
@Bean
public Step chunk_step1() {
return stepBuilderFactory.get("chunk_step1")
.<User, User>chunk(2) //表示每一次处理reader两条,reader/process/write
.faultTolerant()
.reader(read()) //读取
.processor(processor()) //处理
.processor(myCompositeItemProcessor())
.writer(write()) //写入
.build();
}
public ItemReader<User> read() { //一组字符串,字符内容包含数字和字母
List<User> users = new ArrayList<>();
for (int i = 0; i < 10; i++) {
User user = new User();
user.setId(i);
user.setName("java-" + i);
user.setOrderId(10000 + i);
users.add(user);
}
return new ListItemReader<User>(users);
}
/**
* 组合的处理器
* @return
*/
public CompositeItemProcessor<? super User, ? extends User> myCompositeItemProcessor() {
CompositeItemProcessor<User, User> compositeItemProcessor = new CompositeItemProcessor<>();
compositeItemProcessor.setDelegates(Arrays.asList(processor(),processor2()));//委派模式,设置多种处理方式
return compositeItemProcessor;
}
/**
* 数据处理
*
* @return
*/
public ItemProcessor<? super User, ? extends User> processor() {
return new ItemProcessor<User, User>() { //只要偶数id的user
public User process(User user) throws Exception {
if (user.getId() % 2 == 0) {
return user;
}
return null;
}
};
}
/**
* 数据处理
*
* @return
*/
public ItemProcessor<? super User, ? extends User> processor2() {
return new ItemProcessor<User, User>() { //只要偶数id的user
public User process(User user) throws Exception {
user.setName(user.getName()+"9999"); //修改用户名
return user;
}
};
}
public ItemWriter<User> write() {
return new ItemWriter<User>() { //打印处理完的内容
@Override
public void write(List<? extends User> list) throws Exception {
log.info("写入数据:" + list);
System.out.println(JSONObject.toJSONString(list));
}
};
}
}
我们可以使用CompositeItemProcessor,可以为其指定多个ItemProcessor,形成组合处理。每个ItemProcessor都会被执行。
启动服务器,可以查看到处理日志,两个ItemProcessor都被调用到。

我们可以看到ItemProcessor 拥有多个实现,可以用这些子类来实现各种处理逻辑。
