• java 并行处理任务


    前要

          通常,我们 大的数据量,要对这些数据进行批量处理,采用 多线程(线程池) 的方式 处理任务;

          可以通过如下方式实现:

            1. parallelStream

            2. CompleteFuture

            3. Callable

            4. CompletionService

    实现

    1.         设置线程池配置
    1. /**
    2. * 异步线程配置
    3. *
    4. * @author peng18.wu
    5. * @date 2022-05-09 14:00:26
    6. */
    7. @Getter
    8. @Setter
    9. @Component
    10. public class AsyncProperties {
    11. /**
    12. * 异步核心线程数,默认:5
    13. */
    14. @Value("${poros-async-executor.corePoolSize:5}")
    15. private int corePoolSize = 5;
    16. /**
    17. * 异步最大线程数,默认:50
    18. */
    19. @Value("${poros-async-executor.maxPoolSize:50}")
    20. private int maxPoolSize = 50;
    21. /**
    22. * 队列容量,默认:10000
    23. */
    24. @Value("${poros-async-executor.queueCapacity:10000}")
    25. private int queueCapacity = 10000;
    26. /**
    27. * 线程存活时间,默认:300
    28. */
    29. @Value("${poros-async-executor.keepAliveSeconds:300}")
    30. private int keepAliveSeconds = 300;
    31. /** 线程名前缀 */
    32. private String threadNamePrefix = "poros-async-executor";
    33. }
    34. /**
    35. * 异步任务配置: 线程池配置
    36. *
    37. * @author peng18.wu
    38. * @date 2022-05-09 14:00:26
    39. */
    40. @Setter
    41. @Getter
    42. @AllArgsConstructor
    43. @Configuration
    44. @EnableAsync(proxyTargetClass = true)
    45. public class DefaultAsyncTaskConfig extends AsyncConfigurerSupport {
    46. private final AsyncProperties asyncProperties;
    47. @Override
    48. @Bean("defaultAsyncPool")
    49. public Executor getAsyncExecutor() {
    50. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    51. executor.setCorePoolSize(asyncProperties.getCorePoolSize());
    52. executor.setMaxPoolSize(asyncProperties.getMaxPoolSize());
    53. executor.setQueueCapacity(asyncProperties.getQueueCapacity());
    54. executor.setKeepAliveSeconds(asyncProperties.getKeepAliveSeconds());
    55. executor.setThreadNamePrefix(asyncProperties.getThreadNamePrefix());
    56. /*
    57. rejection-policy:当pool已经达到max size的时候,如何处理新任务
    58. CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    59. */
    60. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    61. return executor;
    62. }
    63. @Override
    64. public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    65. return new SimpleAsyncUncaughtExceptionHandler();
    66. }
    67. }

    2. 四种并行方式,工具类实现

    1. /**
    2. * @author peng18.wu
    3. * @desc 并行任务执行工具汇总:利用多线程 处理
    4. * @date 2022/8/10 14:50
    5. */
    6. @Component
    7. @Slf4j
    8. public class ParallelTaskUtil {
    9. @Autowired
    10. @Lazy
    11. @Qualifier("defaultAsyncPool")
    12. private AsyncTaskExecutor taskExecutor;
    13. /**
    14. * 使用parallel 进行 执行 有参多个任务
    15. *

    16. * 优点:简洁
    17. * 缺点:不能显示的设置 线程池的 配置
    18. *

    19. * (注意事项:parallelStream默认使用了fork-join框架,其默认线程数 = CPU核心数 )
    20. * 修改线程数得方式:https://blog.csdn.net/w1014074794/article/details/119947671
    21. *
    22. * @param params
    23. * @param task
    24. * @return
    25. */
    26. public List<Object> parallelStreamTask(List<Object> params, Function<Object, Object> task) {
    27. return params.parallelStream()
    28. .map(item -> task.apply(item))
    29. .collect(Collectors.toList());
    30. }
    31. /**
    32. * callableTask
    33. * 优点: 比较常规,没什么优点,就是能返回线程池执行的结果
    34. * 缺点: 一个个get 获取结果 会阻塞
    35. * 比较常规的方法:有返回的线程池任务执行的结果怕:
    36. *
    37. * @param params
    38. * @param task
    39. * @return
    40. */
    41. public List<Object> callableTask(List<Object> params, Function<Object, Object> task) {
    42. List<Future<Object>> futureTasks = params.stream()
    43. .map(s -> taskExecutor.submit(() -> task.apply(s)))
    44. .collect(Collectors.toList());
    45. List<Object> results = futureTasks.stream().map(item -> {
    46. try {
    47. return item.get();
    48. } catch (InterruptedException | ExecutionException e) {
    49. log.error("执行 callableTask 任务失败:{}", e);
    50. }
    51. return null;
    52. }).collect(Collectors.toList());
    53. return results;
    54. }
    55. /**
    56. * completeFutureTask :有多种使用api
    57. * 优点:等待所有的任务执行完毕后,获取结果
    58. *
    59. * @param params
    60. * @param task
    61. * @return
    62. */
    63. public List<Object> completeFutureTask(List<Object> params, Function<Object, Object> task) {
    64. List<CompletableFuture<Object>> taskFutures = new ArrayList<>();
    65. CompletableFuture[] futures = params.stream()
    66. .map(param -> CompletableFuture.supplyAsync(() -> task.apply(param), taskExecutor))
    67. .toArray(CompletableFuture[]::new);
    68. CompletableFuture.allOf(futures).join();
    69. List<Object> results = Arrays.stream(futures).map(s -> {
    70. try {
    71. return s.get();
    72. } catch (Exception e) {
    73. log.error("执行completeFutureTask任务失败:{}", e);
    74. }
    75. return null;
    76. }).collect(Collectors.toList());
    77. return results;
    78. }
    79. /**
    80. * CompletionService
    81. * 的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开来进行处理
    82. *

    83. * take()方法取得最先完成任务的Future对象,谁执行时间最短谁最先返回。
    84. * 有点: 如上,且解决了callableTask 的缺点
    85. *
    86. * @param params
    87. * @param task
    88. * @return
    89. */
    90. public List<Object> completionServiceTask(List<Object> params, Function<Object, Object> task) {
    91. ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<>(taskExecutor);
    92. params.stream()
    93. .map(s -> completionService.submit(() -> task.apply(s)))
    94. .collect(Collectors.toList());
    95. List<Object> results = new ArrayList<>();
    96. IntStream.range(1, params.size()).forEach(s -> {
    97. try {
    98. Object o = completionService.take().get();
    99. results.add(o);
    100. } catch (Exception e) {
    101. log.error("执行 callableTask 任务失败:{}", e);
    102. }
    103. });
    104. return results;
    105. }
    106. public void calConsumeTime(String methodName, long startTime) {
    107. long endTime = System.currentTimeMillis();
    108. long consumeReqTime = endTime - startTime;
    109. log.error("执行 {} 方法 耗时:{}", methodName, consumeReqTime);
    110. }
    111. public Integer doTask(String taskParam) {
    112. //log.info("正在处理 参数为 {} 的任务", taskParam);
    113. ThreadUtil.sleep(1);
    114. return Integer.parseInt(taskParam);
    115. }

    3. 测试

    1. @Slf4j
    2. @ActiveProfiles("local-sit")
    3. @RunWith(SpringRunner.class)
    4. @SpringBootTest(classes = {BootApplication.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
    5. public class ConsumableSpecBizImplTest {
    6. @Autowired
    7. private ParallelTaskUtil parallelTaskUtil;
    8. @Test
    9. public void testParallelTaskUtil() {
    10. List params = new ArrayList<>();
    11. for (int i = 0; i < 100000; i++) {
    12. params.add(i + "");
    13. }
    14. long startTime1 = System.currentTimeMillis();
    15. parallelTaskUtil.callableTask(params, s -> parallelTaskUtil.doTask((String) s));
    16. parallelTaskUtil.calConsumeTime("callableTask", startTime1);
    17. long startTime2 = System.currentTimeMillis();
    18. parallelTaskUtil.parallelStreamTask(params, s -> parallelTaskUtil.doTask((String) s));
    19. parallelTaskUtil.calConsumeTime("parallelStreamTask", startTime2);
    20. long startTime3 = System.currentTimeMillis();
    21. parallelTaskUtil.completeFutureTask(params, s -> parallelTaskUtil.doTask((String) s));
    22. parallelTaskUtil.calConsumeTime("completeFutureTask", startTime3);
    23. long startTime4 = System.currentTimeMillis();
    24. parallelTaskUtil.completionServiceTask(params, s -> parallelTaskUtil.doTask((String) s));
    25. parallelTaskUtil.calConsumeTime("completionServiceTask", startTime4);
    26. }
    27. 4. 测试结果: 

              

       * 执行效率对比 : 10000的数据量
        callableTask 方法 耗时:2828
        parallelStreamTask 方法 耗时:1245
        completeFutureTask 方法 耗时:2860
        completionServiceTask 方法 耗时:2691
        
       * 执行效率对比 : 100000的数据量
        callableTask 方法 耗时:3821
        parallelStreamTask 方法 耗时:11621
        completeFutureTask 方法 耗时:3073
        completionServiceTask 方法 耗时:3127
    28. 相关阅读:
      怎样选择青少年护眼灯?推荐最好的青少年护眼灯品牌
      【MySQL 数据库 基础 Ⅱ】基础sql语句 Ⅱ
      Docker----harbor服务
      【FAQ】统一扫码服务常见问题
      github常用搜索指令
      美国FBA海运专线双清包税到门流程是怎样的?
      实践 | 大型基金管理公司数据脱敏体系建设
      5种基本类型之外的数据类型是object——对象、添加、读取
      linux系统安全及应用【上】
      38、RabbitMQ hello world
    29. 原文地址:https://blog.csdn.net/flymoringbird/article/details/126272566