• Spring Batch批量处理数据


    Spring Batch 是一个由 Pivotal Software(原 SpringSource,现属于 VMware)开发的批处理框架,它是 Spring 框架的一部分,主要用于创建高效、健壮的批量数据处理应用。Spring Batch 设计用于处理大量的记录,例如在夜间处理或定期运行的数据加载、转换和整合操作。

    Spring Batch 的主要特性包括:

    1. 事务管理:支持事务边界内的数据处理,确保数据完整性。
    2. 并发处理:允许并行处理数据,提高处理速度。
    3. 重试机制:当出现故障时,可以配置重试策略以重新处理失败的记录。
    4. 跳过机制:能够跳过某些失败的记录而不中断整个批处理作业。
    5. 持久化状态管理:使用 JobRepository 来跟踪作业的状态,即使在系统重启后也能恢复作业。
    6. 分片/分区:可以将数据集分割成小块,并在多个处理器上并行处理。
    7. 远程执行:支持跨机器的作业执行。
    8. 监控和日志:提供详细的日志记录和作业执行的监控能力。

    Spring Batch 的架构包括以下几个核心组件:

    • Job:这是批处理作业的最高级别抽象,可以包含一个或多个步骤。
    • Step:是批处理作业中的一个逻辑单元,可以是任务步骤(如读取、处理、写入数据)或决策步骤。
    • ItemReader:负责从数据源读取数据项。
    • ItemProcessor:对读取的数据项进行处理。
    • ItemWriter:将处理后的数据写入目标数据源。
    • JobLauncher:负责启动和执行作业。
    • JobRepository:管理作业的元数据和状态,通常与数据库交互。

    Spring Batch 不是一个调度框架,它专注于批处理作业的实现细节,通常需要与其他调度框架(如 Quartz 或 Cron)结合使用,以便控制作业何时启动。由于其高度的可配置性和灵活性,Spring Batch 成为了企业级批处理应用的首选框架之一。

    在Spring Boot项目中集成Spring Batch涉及几个关键步骤,下面举个例子,说明如何设置一个基本的Spring Batch环境:

    1. 添加依赖

    首先,在pom.xml文件中添加Spring Batch和Spring Boot Starter Batch的依赖:

    <dependencies>
        
        <dependency>
            <groupId>org.springframework.batchgroupId>
            <artifactId>spring-batch-coreartifactId>
            <version>4.x.y.RELEASEversion> 
        dependency>
        
        
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-batchartifactId>
        dependency>
        
        
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-jdbcartifactId>
        dependency>
    
        
        <dependency>
            <groupId>com.mysqlgroupId>
            <artifactId>mysql-connector-javaartifactId>
        dependency>
        
        
        <dependency>
            <groupId>com.h2databasegroupId>
            <artifactId>h2artifactId>
        dependency>
    dependencies>
    

    2. 配置数据源和JobRepository

    Spring Batch需要一个数据源来存储作业元数据和状态。这通常通过application.propertiesapplication.yml文件配置:

    spring.datasource.url=jdbc:mysql://localhost:3306/batchdb
    spring.datasource.username=batchuser
    spring.datasource.password=batchpassword
    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    
    # Spring Batch配置
    spring.batch.job.enabled=false # 设置为false,避免在启动时自动执行任何job
    

    3. 创建Job和Step

    定义一个Job,并为其创建一个或多个Step。这通常通过一个@Configuration类和@EnableBatchProcessing注解完成:

    @Configuration
    @EnableBatchProcessing
    public class BatchConfig {
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        @Bean
        public Job importUserJob() {
            return jobBuilderFactory.get("importUserJob")
                    .incrementer(new RunIdIncrementer())
                    .flow(importUserDataStep())
                    .end()
                    .build();
        }
    
        @Bean
        public Step importUserDataStep() {
            return stepBuilderFactory.get("importUserDataStep")
                    .<User, User>chunk(10)
                    .reader(userItemReader(null))
                    .processor(userItemProcessor())
                    .writer(userItemWriter())
                    .build();
        }
    }
    

    4. 实现ItemReader, ItemProcessor, 和 ItemWriter

    在上面的示例中,importUserDataStep()使用chunk-oriented步骤,这意味着它将数据分批处理。你需要实现ItemReader, ItemProcessor, 和 ItemWriter来分别读取、处理和写入数据:

    @Bean
    public FlatFileItemReader<User> userItemReader(Resource resource) {
        DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("firstName", "lastName");
        BeanWrapperFieldSetMapper<User> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
        fieldSetMapper.setTargetType(User.class);
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(fieldSetMapper);
    
        FlatFileItemReader<User> itemReader = new FlatFileItemReader<>();
        itemReader.setResource(resource);
        itemReader.setLinesToSkip(1); // 跳过标题行
        itemReader.setLineMapper(lineMapper);
        return itemReader;
    }
    
    @Bean
    public ItemProcessor<User, User> userItemProcessor() {
        return new ItemProcessor<User, User>() {
            @Override
            public User process(User item) throws Exception {
                item.setFirstName(item.getFirstName().toUpperCase());
                return item;
            }
        };
    }
    
    @Bean
    public JpaPagingItemWriter<User> userItemWriter(JpaItemWriterBuilder<User> builder) {
        return builder
                .entityManagerFactory(entityManagerFactory)
                .build();
    }
    

    5. 启动Job

    在你的主类中,你可以注入JobLauncherJob,然后调用它们来启动作业:

    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private Job importUserJob;
    
    // 在适当的地方调用
    jobLauncher.run(importUserJob, new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters());
    

    以上步骤会帮助你在一个Spring Boot项目中集成Spring Batch。请注意,实际的配置可能需要根据你的具体需求和环境进行调整。

  • 相关阅读:
    java--ArrayList快速入门
    Zotero文献管理软件入门使用方法:软件下载、文献导入、引文插入
    移远通信EM060K系列LTE-A Cat 6模组完成全球认证覆盖
    快速搭建 SpringCloud Alibaba Nacos 配置中心
    【数据结构】交换排序
    ios打包,证书获取
    舞伴问题代码
    多通道振弦数据记录仪应用桥梁安全监测的关键要点
    【软件测试】接手一个测试任务,怎么快速开始测试快速找出测试点?(总结)
    第十五届蓝桥杯省赛第二场C/C++B组A题【进制】题解(AC)
  • 原文地址:https://blog.csdn.net/svygh123/article/details/140054687