• Java 多线程批量处理数据


    1 需求
    在项目开发中需要处理100万多的数据,这些数据需要从mysql数据库中读取出来,再通过调用其他平台的接口推送数据。由于时间紧迫,数据需要在短时间内完成推送,采用单线程推送很慢,所以采用多线程推送来提高效率。

    2 配置多线程
    2.1 application.yml

    thread-pool:
      core-pool-size: 4
      max-pool-size: 16
      queue-capacity: 80
      keep-alive-seconds: 120
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.2 创建ThreadPoolProperties

    import lombok.Data;
    import org.springframework.stereotype.Component;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    @Data
    @Component
    @ConfigurationProperties(prefix = "thread-pool")
    public class ThreadPoolProperties {
        /**
         * 线程池创建时候初始化的线程数
         */
        private int corePoolSize;
    
        /**
         * 线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
         */
        private int maxPoolSize;
    
        /**
         * 用来缓冲执行任务的队列
         */
        private int queueCapacity;
    
        /**
         * 允许线程的空闲时间:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
         */
        private int keepAliveSeconds;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    2.3 创建ThreadPoolConfig

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    @EnableAsync
    @Configuration
    public class ThreadPoolConfig {
        private final ThreadPoolProperties threadPoolProperties;
    
        @Autowired
        public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) {
            this.threadPoolProperties = threadPoolProperties;
        }
    
        @Bean(name = "threadPoolTaskExecutor")
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
            executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
            executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
            executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
            executor.setThreadNamePrefix("thread-pool-");
            return executor;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    3 多线程批量数据处理

    public RequestResult multiThreadPush() {
        List<HistoryStudent> historyStudentList = historyStudentMapper.getList(0, 65867);
        // 分割集合
        List<List<HistoryStudent>> partitionData = partitionData(historyStudentList, 4);
    
        ThreadPoolTaskExecutor executor = SpringUtil.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class);
      	// 计数器
        CountDownLatch latch = new CountDownLatch(partitionData.size());
    
        for (List<HistoryStudent> historyStudents : partitionData) {
            executor.execute(() -> {
                try {
                    for (HistoryStudent historyStudent : historyStudents) {
                    	// 单个数据处理
                        //processSingleData(historyStudent);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            });
        }
    
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        return RequestResult.success();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    private List<List<HistoryStudent>> partitionData(List<HistoryStudent> dataList, int partitionSize) {
        List<List<HistoryStudent>> partitions = new ArrayList<>();
    
        int size = dataList.size();
        int batchSize = size / partitionSize;
    
        for (int i = 0; i < partitionSize; i++) {
            int fromIndex = i * batchSize;
            int toIndex = (i == partitionSize - 1) ? size : fromIndex + batchSize;
    
            partitions.add(dataList.subList(fromIndex, toIndex));
        }
    
        return partitions;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    4 参考博客
    Java多线程批量处理、线程池的使用
    Java多线程处理大批量数据
    java多线程批量处理数据

  • 相关阅读:
    Spring Task使用介绍
    新手指南|如何快速参与Moonbeam Ignite
    JPA Audit and Envers
    精灵图和 base64 之间如何选择?
    续-一个请求的过程
    @DateTimeFormat和@JsonFormat
    进程信号之产生
    深度学习之Tensorboard的详细使用
    实践torch.fx第一篇——基于Pytorch的模型优化量化神器
    线性代数的本质(四)——行列式
  • 原文地址:https://blog.csdn.net/weixin_44917045/article/details/134264942