• java 并发执行批量异步任务(Future、 CompletableFuture 实现)



    前言

    当我们需要批量执行一些比较耗时任务时,使用并发的方式减少业务处理的整体时间,防止客户端响应时间过长。


    一、创建线程池

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @ClassName : ThreadPoolConfig
     * @Description : ThreadPoolConfig
     * @Author : zhuguangkui
     * @Date: 2022-08-03
     */
    @Configuration
    @Slf4j
    public class ThreadPoolConfig {
    
        @Autowired
        ThreadPoolProperties threadPoolProperties;
    
        /**
         * 获得Java虚拟机可用的处理器个数 + 1
         */
        private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;
        
        /**
         *   默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
         *  当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
         *  当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
         */
        @Bean(name = "varHandleThreadPool")
        public ThreadPoolTaskExecutor varHandleThreadPool(){
            int corePoolSizeConfig = threadPoolProperties.getCorePoolSizeConfig();
            //核心线程数
            int corePoolSize = corePoolSizeConfig ==0 ? THREADS : corePoolSizeConfig;
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setMaxPoolSize(2 * corePoolSize);
            executor.setCorePoolSize(corePoolSize);
            executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
            executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
            executor.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix());
            // 线程池对拒绝任务(无线程可用)的处理策略
            // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            // 初始化
            executor.initialize();
            log.info("doc 线程池初始化配置:{},THREADS:{}", threadPoolProperties, THREADS);
            return executor;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    二、Future 类并发实现

    /**
     * 批量并发处理业务
     */
    @Override
    public void generateBatchFile(List<String> fileNameList) {
        List<Future<IdocDoc>> futureList = new ArrayList<>(); // 并发处理结果集
        // 批量处理业务
        for (String fileName : fileNameList) {
            Future<IdocDoc> future = generateFile(fileName);
            futureList.add(future);
        }
    
       // 依次获取异步结果
       while (true) {
    		for (Future<IdocDoc> future : futureList) {
    	       if (future.isDone() && !future.isCancelled()) { // 判断任务执行是否完成
    	           IdocDoc idocDoc = future.get(); // 获取异步结果
    	           idocDocList.add(idocDoc);
    	           futureList.remove(future);
    	       }
    	    }
    	    if (CollectionUtil.isEmpty()) {
    	    	break;
    	    }
       		Thread.sleep(1); // 每次轮询休息1毫秒,避免CPU占用
       }
    }
    
    /**
     * 子业务
     */
    @Async("varHandleThreadPool")
    public Future<IdocDoc> generateFile(String fileName) {
        IdocDoc idoDoc = new IdoDoc();
        idocDoc.setName(fileName);
        ... // 业务操作
    
        // 返回异步结果
        return new AsyncResult<>(idocDoc);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    三、CompletableFuture 类并发实现

    /**
     * 批量并发处理业务
     */
    @Override
    public void generateBatchFile(List<String> fileNameList) {
        List<CompletableFuture<IdocDoc>> futureList = new ArrayList<>(); // 并发处理结果集
        // 批量处理业务
        for (String fileName : fileNameList) {
    	    CompletableFuture<IdocDoc> future = CompletableFuture.supplyAsync(() -> {
    			return generateFile(fileName);
    		}, threadPoolTaskExecutor);
    	    futureList.add(future);
        }
    
       // 依次获取异步结果
       while (true) {
    		for (CompletableFuture<IdocDoc> future : futureList) {
    	       if (future.isDone() && !future.isCancelled()) { // 判断任务执行是否完成
    	           IdocDoc idocDoc = future.get(); // 获取异步结果
    	           idocDocList.add(idocDoc);
    	           futureList.remove(future);
    	       }
    	    }
    	    if (CollectionUtil.isEmpty()) {
    	    	break;
    	    }
       		Thread.sleep(1); // 每次轮询休息1毫秒,避免CPU占用
       }
    }
    
    /**
     * 子业务
     */
    public IdocDoc generateFile(String fileName) {
        IdocDoc idoDoc = new IdoDoc();
        idocDoc.setName(fileName);
        ... // 业务操作
    
        // 返回异步结果
        return idocDoc;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
  • 相关阅读:
    【Autopsy数字取证篇】Autopsy案例更改时区
    html5+css3
    分享125个ASP源码,总有一款适合你
    虹科案例 | EtherCAT运动控制器与IO在半导体封装设备固晶机上的应用
    计算机网络_第五章_运输层
    Vue3配置router路由步骤
    JavaScript 基本语法及概念
    CNN发展的主要tag
    多线程并发编程
    React 模态框的设计(三)拖动组件的完善
  • 原文地址:https://blog.csdn.net/demo_yo/article/details/133859920