• SpringBoot-线程池ThreadPoolExecutor异步处理(包含拆分集合工具类)


    ThreadPoolExecutor VS ThreadPoolTaskExecutor

    ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。

    在这里插入图片描述

    配置文件application.yml

    # 异步线程配置 自定义使用参数
    async:
    	executor:
    		thread:
    			core_pool_size: 10
    			max_pool_size:  100   # 配置最大线程数
         	    queue_capacity:  99988  # 配置队列大小
          		keep_alive_seconds:  20  #设置线程空闲等待时间秒s
          		name:
            		prefix: async-thread-  # 配置线程池中的线程的名称前缀
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    配置类

    @Configuration
    @EnableAsync
    @Slf4j
    public class ThreadPoolConfig{
    	
    	//自定义使用参数
        @Value("${async.executor.thread.core_pool_size}")
        private int corePoolSize;   //配置核心线程数
        @Value("${async.executor.thread.max_pool_size}")
        private int maxPoolSize;    //配置最大线程数
        @Value("${async.executor.thread.queue_capacity}")
        private int queueCapacity;
        @Value("${async.executor.thread.name.prefix}")
        private String namePrefix;
        @Value("${async.executor.thread.keep_alive_seconds}")
        private int keepAliveSeconds;
    
    	/**
    		1.自定义asyncServieExecutor线程池
    	*/
    	@Bean(name = "asyncServiceExecutor")
    	public ThreadPoolTaskExecutor asyncServiceExecutor(){
    		
    		log.info("start asyncServiceExecutor......");
    
    		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    		//配置核心线程数
            executor.setCorePoolSize(corePoolSize);
            //配置最大线程数
            executor.setMaxPoolSize(maxPoolSize);
            //设置线程空闲等待时间 s
            executor.setKeepAliveSeconds(keepAliveSeconds);
            //配置队列大小 设置任务等待队列的大小
            executor.setQueueCapacity(queueCapacity);
            //配置线程池中的线程的名称前缀
            //设置线程池内线程名称的前缀-------阿里编码规约推荐--方便出错后进行调试
            executor.setThreadNamePrefix(namePrefix);
    
    		/**
    			rejection-policy:当pool已经达到max size的时候,如何处理新任务
    			CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    		*/
    		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    
    		//执行初始化
    		executor.initialize();
    		return executor;
    	}
    
    	/**
    		公共线程池,利用系统availableProcessors线程数量进行计算
    	*/
    	@Bean(name="commonThreadPoolTaskExecutor")
    	public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor(){
    		
    		ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
    		
    		// 返回可用处理器的Java虚拟机的数量
    		int processNum = Runtime.getRuntime().availableProcessors();
    		int corePoolSize = (int)(processNum / (1-0.2));
    		int maxPoolSize = (int)(processNum / (1-0.5));
    	
    		pool.setCorePoolSize(corePoolSize); // 核心池大小
            pool.setMaxPoolSize(maxPoolSize); // 最大线程数
            pool.setQueueCapacity(maxPoolSize * 1000); // 队列程度
            pool.setThreadPriority(Thread.MAX_PRIORITY);
            pool.setDaemon(false);
            pool.setKeepAliveSeconds(300);// 线程空闲时间		
    		
    		return pool;
    	}
    
    	/**
    		自定义defaultThreadPoolExecutor线程池
    	*/
    	@Bean(name="defaultThreadPoolExecutor",destroyMethod = "shutdown")
    	public ThreadPoolExecutor systemCheckPoolExecutorService(){
    		
    		int maxNumPool=Runtime.getRuntime().availableProcessors();
    		return new ThreadPoolExecutor(3,
                    maxNumPool,
                    60,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(10000),
                    //置线程名前缀,例如设置前缀为hutool-thread-,则线程名为hutool-thread-1之类。
                    new ThreadFactoryBuilder().setNamePrefix("default-executor-thread-%d").build(),
                    (r, executor) -> log.error("system pool is full! "));
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    异步线程业务类

    //自定义asyncServiceExecutor线程池
    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync(List<Student> students,
                             StudentService studentService,
                             CountDownLatch countDownLatch){
    	
    	try{
    		log.info("start executeAsync");
    		//异步线程要做的事情
    		studentService.saveBatch(students);
    		log.info("end executeAsync");
    	}finally{
    		countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    拆分集合工具类

    public class SplitListUtils {
        /**
         * 功能描述:拆分集合
         * @param  泛型对象
         * @MethodName: split
         * @MethodParam: [resList:需要拆分的集合, subListLength:每个子集合的元素个数]
         * @Return: java.util.List>:返回拆分后的各个集合组成的列表
         * 代码里面用到了guava和common的结合工具类
         * @Author: yyalin
         * @CreateDate: 2022/5/6 14:44
         */
        public static <T> List<List<T>> split(List<T> resList, int subListLength) {
            if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {
                return Lists.newArrayList();
            }
            List<List<T>> ret = Lists.newArrayList();
            int size = resList.size();
            if (size <= subListLength) {
                // 数据量不足 subListLength 指定的大小
                ret.add(resList);
            } else {
                int pre = size / subListLength;
                int last = size % subListLength;
                // 前面pre个集合,每个大小都是 subListLength 个元素
                for (int i = 0; i < pre; i++) {
                    List<T> itemList = Lists.newArrayList();
                    for (int j = 0; j < subListLength; j++) {
                        itemList.add(resList.get(i * subListLength + j));
                    }
                    ret.add(itemList);
                }
                // last的进行处理
                if (last > 0) {
                    List<T> itemList = Lists.newArrayList();
                    for (int i = 0; i < last; i++) {
                        itemList.add(resList.get(pre * subListLength + i));
                    }
                    ret.add(itemList);
                }
            }
            return ret;
        }
    
        /**
         * 功能描述:方法二:集合切割类,就是把一个大集合切割成多个指定条数的小集合,方便往数据库插入数据
         * 推荐使用
         * @MethodName: pagingList
         * @MethodParam:[resList:需要拆分的集合, subListLength:每个子集合的元素个数]
         * @Return: java.util.List>:返回拆分后的各个集合组成的列表
         * @Author: yyalin
         * @CreateDate: 2022/5/6 15:15
         */
        public static <T> List<List<T>> pagingList(List<T> resList, int pageSize){
            //判断是否为空
            if (CollectionUtils.isEmpty(resList) || pageSize <= 0) {
                return Lists.newArrayList();
            }
            int length = resList.size();
            int num = (length+pageSize-1)/pageSize;
            List<List<T>> newList =  new ArrayList<>();
            for(int i=0;i<num;i++){
                int fromIndex = i*pageSize;
                int toIndex = (i+1)*pageSize<length?(i+1)*pageSize:length;
                newList.add(resList.subList(fromIndex,toIndex));
            }
            return newList;
        }
    
        // 运行测试代码 可以按顺序拆分为11个集合
        public static void main(String[] args) {
            //初始化数据
            List<String> list = Lists.newArrayList();
            int size = 19;
            for (int i = 0; i < size; i++) {
                list.add("hello-" + i);
            }
            // 大集合里面包含多个小集合
            List<List<String>> temps = pagingList(list, 100);
            int j = 0;
            // 对大集合里面的每一个小集合进行操作
            for (List<String> obj : temps) {
                System.out.println(String.format("row:%s -> size:%s,data:%s", ++j, obj.size(), obj));
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    造数据,进行多线程异步插入

    public int batchInsertWay() throws Exception {
            log.info("开始批量操作.........");
            Random rand = new Random();
            List<Student> list = new ArrayList<>();
            //造100万条数据
            for (int i = 0; i < 1000003; i++) {
                Student student=new Student();
                student.setStudentName("大明:"+i);
                student.setAddr("上海:"+rand.nextInt(9) * 1000);
                student.setAge(rand.nextInt(1000));
                student.setPhone("134"+rand.nextInt(9) * 1000);
                list.add(student);
            }
            //2、开始多线程异步批量导入
            long startTime = System.currentTimeMillis(); // 开始时间
            //boolean a=studentService.batchInsert(list);
            List<List<Student>> list1=SplitListUtils.pagingList(list,100);  //拆分集合
            CountDownLatch countDownLatch = new CountDownLatch(list1.size());
            for (List<Student> list2 : list1) {
                asyncService.executeAsync(list2,studentService,countDownLatch);
            }
            try {
                countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
                long endTime = System.currentTimeMillis(); //结束时间
                log.info("一共耗时time: " + (endTime - startTime) / 1000 + " s");
                // 这样就可以在下面拿到所有线程执行完的集合结果
            } catch (Exception e) {
                log.error("阻塞异常:"+e.getMessage());
            }
            return list.size();
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    Nginx配置ssl证书(https证书)
    阿里云架构实战之ALB(应用型负载均衡)介绍与搭建
    基于PHP的网络教学平台设计与实现
    【SpringMVC】@RequestMapping注解
    QT配置MySQL数据库 && ninja: build stopped: subcommand failed
    从数学到算法
    Worthington经过使用测试的细胞分离系统方案
    7_JS关于数据代理_Object.defineProperty_Vue数据代理_双向绑定
    【老生谈算法】matlab实现语音信号处理与仿真——语音信号处理算法
    model
  • 原文地址:https://blog.csdn.net/usa_washington/article/details/132875191