• SpringBatch入门


    SpringBatch入门

    案例借鉴的是官方案例,有兴趣的可以自己去看一看。
    springbatch

    需求

    将现有文件中的字母转换大小写,文件格式为csv格式 spring可以自动识别

    xml配置

    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.5.3version>
            <relativePath/> 
        parent>
        <groupId>com.llfgroupId>
        <artifactId>banthDemoartifactId>
        <version>0.0.1-SNAPSHOTversion>
        <name>banthDemoname>
        <description>banthDemodescription>
        <properties>
            <java.version>1.8java.version>
        properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-batchartifactId>
            dependency>
    
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <optional>trueoptional>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
            <dependency>
                <groupId>org.springframework.batchgroupId>
                <artifactId>spring-batch-testartifactId>
                <scope>testscope>
            dependency>
            <dependency>
                <groupId>mysqlgroupId>
                <artifactId>mysql-connector-javaartifactId>
            dependency>
        dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-maven-pluginartifactId>
                    <configuration>
                        <excludes>
                            <exclude>
                                <groupId>org.projectlombokgroupId>
                                <artifactId>lombokartifactId>
                            exclude>
                        excludes>
                    configuration>
                plugin>
            plugins>
        build>
    
    project>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    目录结构

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FDJxcDHA-1662866749735)(E:/Typroa图片/image-20220911102557776.png)]

    JobListener

    监听器,监听job运行时的状态信息,只需要继承JobExecutionListenerSupport类 然后重写它的方法即可。

    方法一共有两个 分别是afterJobbeforeJob

    package com.llf.batch;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.batch.core.BatchStatus;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.listener.JobExecutionListenerSupport;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * @author llf
     * @Description
     * @date 2022/9/11 9:11
     */
    @Component
    public class JobListener extends JobExecutionListenerSupport {
        private static final Logger log = LoggerFactory.getLogger(JobListener.class);
    
        private final JdbcTemplate jdbcTemplate;
    
        @Autowired
        public JobListener(JdbcTemplate jdbcTemplate) {
            this.jdbcTemplate = jdbcTemplate;
        }
    
        @Override
        public void afterJob(JobExecution jobExecution) {
            if (jobExecution.getStatus().equals(BatchStatus.COMPLETED)){
                log.info("JOB 已结束");
    
                jdbcTemplate.query("SELECT first_name, last_name FROM people",
                        (rs, row) -> new Person(
                                rs.getString(1),
                                rs.getString(2))
                ).forEach(person -> log.info("Found <" + person + "> in the database."));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    person

    javaBean对象,用于存储文件信息的对象

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Accessors(chain = true)
    public class Person {
        private String lastName;
        private String firstName;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    personItemProcessor

    job任务的主逻辑,每个xxxxItemPercessor都需要实现ItemProcessor接口 并定义出参、入参对象,实现process方法。

    public class PersonItemProcessor implements ItemProcessor<Person,Person> {
        private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
        @Override
        public Person process(Person person) throws Exception {
            final String firstName = person.getFirstName().toUpperCase();
            final String lastName = person.getLastName().toUpperCase();
            final Person transformedPerson = new Person(firstName, lastName);
            log.info("Converting (" + person + ") into (" + transformedPerson + ")");
            return transformedPerson;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    springBatchConfig

    job任务的配置类,一个job的创建、构造在这里实现。每一个job都需要将JobBuilderFactoryStepBuilderFactory 这两个重要的工厂自动注入,方便后续的job与step的创建。

        @Autowired
        public JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        public StepBuilderFactory stepBuilderFactory;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    通常来讲。一个job分为多个step,一个step对文件的操作又包括read、processor、write三个步骤,每个步骤都需要对应的Bulder构造器来实现

    read
        @Bean
        public FlatFileItemReader<Person> reader(){
            return new FlatFileItemReaderBuilder<Person>()
                    .name("personItemReader")
                    //ClassPathResource 从系统的类路径中加载 
    				//FileSystemResource 从文件系统加载,比如说自己指定配置文件的全路径
    				//FileUrlResource从指定的Url加载
                    .resource(new ClassPathResource("sample-data.csv"))
                    .delimited()
                    .names(new String[]{"firstName", "lastName"})
                    .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>(){{
                        setTargetType(Person.class);
                    }})
                    .build();
        }
        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    由于这是一个@configration的配置类,因此我们需要@bean 将我们的方法构建成bean对象,以供后续框架能够识别,并进行相对应逻辑处理。

    这里的resource 可以是类路径 也可以是URL绝对路径 对应的API分别是ClassPathResource与FileUrlResource

    processor

    详见上面的PersonItemProcessor

     @Bean
        public PersonItemProcessor processor(){
            return new PersonItemProcessor();
        }
    
    • 1
    • 2
    • 3
    • 4
    write

    写分为写文件和写数据库两种,写文件常见的就是写报表文件或excel文件,写报表文件需要提前定义好模板,写数据库相对比较容易,当数据经过processor处理之后,我们直接通过对象对数据库操作即可。本文的例子是对数据库进行操作。

       @Bean
        public JdbcBatchItemWriter<Person> writer(DataSource dataSource){
            return new JdbcBatchItemWriterBuilder<Person>()
                    .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                    .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
                    .dataSource(dataSource)
                    .build();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    step

    将上面的readprocessorwrite进行拼接 ,组装成一个step

        @Bean
        public Step personStep(JdbcBatchItemWriter<Person> writer){
            return stepBuilderFactory.get("personStep1") //给这个step取名字 任意都行
                    .<Person,Person>chunk(10)
                    .reader(reader())
                    .processor(processor())
                    .writer(writer)
                    .build();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    job

    将多个step组装成一个job

        @Bean
        public Job personJob(JobListener listener,Step personStep){
            return jobBuilderFactory.get("personJob")
                    .incrementer(new RunIdIncrementer())
                    .listener(listener)
                    .flow(personStep)//在这里可以通过.next方法 一直传递step
                    .end()
                    .build();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    BanthDemoApplication

    springBatch启动类

    application.yml

    spring:
      datasource:
        username: root
        password: 123456
        url: jdbc:mysql://localhost:3306/spring_batch?serverTimezone=UTC
        driver-class-name: com.mysql.cj.jdbc.Driver      
    =======================================================    上面是数据库配置文件
      batch:
        jdbc:
          initialize-schema: always
    =================================  springbatch配置文件,表示自动创建springBatch相关的系统表数据    
      sql:
        init:
          mode: always
          encoding: utf-8
          username: root
          password: 123456
          schema-locations:
            - classpath*:schema-all.sql
    ======sql脚本自动执行配置 将sql文件放在resouces目录下,服务启动时 会自动执行指定文件下的sql语句
    
    这个是springboot2.5.x之后的版本,由于在springboot2.5.x中重构了 DataSourceProperties ,所以才出现的上面的配置
    2.5.x之前的版本是
    spring:
      datasource:
        schema: classpath*:schema.sql
        initialization-mode: always
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    在这里插入图片描述

    sample-data.csv

    业务处理文件

    Jill,Doe
    Joe,Doe
    Justin,Doe
    Jane,Doe
    John,Doe
    
    • 1
    • 2
    • 3
    • 4
    • 5

    schema-all.sql

    数据库sql脚本语句,用于创建数据库表 ,这里注意mysql5和mysql8的语法区别

    DROP TABLE IF EXISTS people;
    
    CREATE TABLE people  (
        person_id BIGINT NOT NULL AUTO_INCREMENT ,
        PRIMARY KEY (`person_id`) USING BTREE,
        first_name VARCHAR(20),
        last_name VARCHAR(20)
    )ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    查看结果

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JExZ1XdE-1662866749737)(E:/Typroa图片/image-20220911112213200.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5QgK8L7w-1662866749737)(E:/Typroa图片/image-20220911112235079.png)]

  • 相关阅读:
    浅谈AVL树,红黑树,B树,B+树原理及应用
    macOS 安装使用 python 虚拟机
    【无标题】Delayed延迟队列不工作
    字节跳动企业软件:用日活评判企业软件,跟95后打行业
    这种动态规划你见过吗——状态机动态规划之股票问题(上)
    JavaSE学习——网络编程
    移植RTOS的大体思路
    C 标准库 - <string.h>
    解决Spring Cloud Config API 暴露密码的问题
    多态 polymorphism
  • 原文地址:https://blog.csdn.net/Amazing66/article/details/126803703