• SpringBatch(11):ItemProcessor


    前言

    Spring Batch 处理处理数据,分为ItemReader 读 ItemProessor 处理 ItemWriter
    ItemProcessor 是两者之间的一个过程,它可以做很多事情。

    第一节 ItemProcessor 使用

    获取偶数id的User

    package com.it2.springbootspringbatch01.itemprocessor;
    
    import com.alibaba.fastjson.JSONObject;
    import com.it2.springbootspringbatch01.itemreaderdb.User;
    import com.it2.springbootspringbatch01.listener.MyChunkListener;
    import com.it2.springbootspringbatch01.listener.MyJobListener;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.CompositeItemProcessor;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    @Configuration
    @EnableBatchProcessing
    @Slf4j
    public class ItemProcessorDemo {
    
        //注入任务对象工厂
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        //任务的执行由Step决定,注入step对象的factory
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        //创建Job对象
        @Bean
        public Job jobDemo() {
            return jobBuilderFactory.get("jobDemo")
                    .start(chunk_step1())
                    .listener(new MyJobListener()) //给job添加监听器
                    .build();
        }
    
        //创建Step对象
        @Bean
        public Step chunk_step1() {
            return stepBuilderFactory.get("chunk_step1")
                    .<User, User>chunk(2) //表示每一次处理reader两条,reader/process/write
                    .faultTolerant()
                    .reader(read()) //读取
                    .processor(processor()) //处理
                    .writer(write()) //写入
                    .build();
    
        }
    
        public ItemReader<User> read() { //一组字符串,字符内容包含数字和字母
            List<User> users = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                User user = new User();
                user.setId(i);
                user.setName("java-" + i);
                user.setOrderId(10000 + i);
                users.add(user);
            }
            return new ListItemReader<User>(users);
        }
    
        /**
         * 数据处理
         *
         * @return
         */
        public ItemProcessor<? super User, ? extends User> processor() {
            return new ItemProcessor<User, User>() {  //只要偶数id的user
                public User process(User user) throws Exception {
                    if (user.getId() % 2 == 0) {
                        return user;
                    }
                    return null;
                }
            };
        }
    
    
    
        public ItemWriter<User> write() {
            return new ItemWriter<User>() { //打印处理完的内容
                @Override
                public void write(List<? extends User> list) throws Exception {
                    log.info("写入数据:" + list);
                    System.out.println(JSONObject.toJSONString(list));
                }
            };
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    启动服务器运行,ItemProcessor是逐条处理的,与chunk的大小无关。
    在这里插入图片描述

    第二节 CompositeItemProcessor 组合处理器

    package com.it2.springbootspringbatch01.itemprocessor;
    
    import com.alibaba.fastjson.JSONObject;
    import com.it2.springbootspringbatch01.itemreaderdb.User;
    import com.it2.springbootspringbatch01.listener.MyChunkListener;
    import com.it2.springbootspringbatch01.listener.MyJobListener;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.CompositeItemProcessor;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    @Configuration
    @EnableBatchProcessing
    @Slf4j
    public class ItemProcessorDemo {
    
        //注入任务对象工厂
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        //任务的执行由Step决定,注入step对象的factory
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        //创建Job对象
        @Bean
        public Job jobDemo() {
            return jobBuilderFactory.get("jobDemo")
                    .start(chunk_step1())
                    .listener(new MyJobListener()) //给job添加监听器
                    .build();
        }
    
        //创建Step对象
        @Bean
        public Step chunk_step1() {
            return stepBuilderFactory.get("chunk_step1")
                    .<User, User>chunk(2) //表示每一次处理reader两条,reader/process/write
                    .faultTolerant()
                    .reader(read()) //读取
                    .processor(processor()) //处理
                    .processor(myCompositeItemProcessor())
                    .writer(write()) //写入
                    .build();
    
        }
    
        public ItemReader<User> read() { //一组字符串,字符内容包含数字和字母
            List<User> users = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                User user = new User();
                user.setId(i);
                user.setName("java-" + i);
                user.setOrderId(10000 + i);
                users.add(user);
            }
            return new ListItemReader<User>(users);
        }
    
        /**
         * 组合的处理器
         * @return
         */
        public CompositeItemProcessor<? super User, ? extends User> myCompositeItemProcessor() {
            CompositeItemProcessor<User, User> compositeItemProcessor = new CompositeItemProcessor<>();
            compositeItemProcessor.setDelegates(Arrays.asList(processor(),processor2()));//委派模式,设置多种处理方式
            return compositeItemProcessor;
        }
    
        /**
         * 数据处理
         *
         * @return
         */
        public ItemProcessor<? super User, ? extends User> processor() {
            return new ItemProcessor<User, User>() {  //只要偶数id的user
                public User process(User user) throws Exception {
                    if (user.getId() % 2 == 0) {
                        return user;
                    }
                    return null;
                }
            };
        }
        /**
         * 数据处理
         *
         * @return
         */
        public ItemProcessor<? super User, ? extends User> processor2() {
            return new ItemProcessor<User, User>() {  //只要偶数id的user
                public User process(User user) throws Exception {
                    user.setName(user.getName()+"9999"); //修改用户名
                    return user;
                }
            };
        }
    
    
    
        public ItemWriter<User> write() {
            return new ItemWriter<User>() { //打印处理完的内容
                @Override
                public void write(List<? extends User> list) throws Exception {
                    log.info("写入数据:" + list);
                    System.out.println(JSONObject.toJSONString(list));
                }
            };
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124

    我们可以使用CompositeItemProcessor,可以为其指定多个ItemProcessor,形成组合处理。每个ItemProcessor都会被执行。
    启动服务器,可以查看到处理日志,两个ItemProcessor都被调用到。
    在这里插入图片描述

    第三节 ItemProcessor大家族

    我们可以看到ItemProcessor 拥有多个实现,可以用这些子类来实现各种处理逻辑。
    在这里插入图片描述

  • 相关阅读:
    网络流基础概念与算法总结
    2021年NPDP产品经理认证考试报考指南
    PHP 有趣的函数与功能
    loongarch下iso定制
    A. United We Stand
    BP神经网络应用案例
    python3字符串内建方法split()心得
    【批处理DOS-CMD命令-汇总和小结】-cmd扩展命令、扩展功能(cmd /e:on、cmd /e:off)
    《你不可不知的人性》经典语录
    Linux内核之workqueue机制
  • 原文地址:https://blog.csdn.net/u011628753/article/details/126862524