前要
通常,我们 大的数据量,要对这些数据进行批量处理,采用 多线程(线程池) 的方式 处理任务;
可以通过如下方式实现:
1. parallelStream
2. CompleteFuture
3. Callable
4. CompletionService
- /**
- * 异步线程配置
- *
- * @author peng18.wu
- * @date 2022-05-09 14:00:26
- */
- @Getter
- @Setter
- @Component
- public class AsyncProperties {
-
- /**
- * 异步核心线程数,默认:5
- */
- @Value("${poros-async-executor.corePoolSize:5}")
- private int corePoolSize = 5;
- /**
- * 异步最大线程数,默认:50
- */
- @Value("${poros-async-executor.maxPoolSize:50}")
- private int maxPoolSize = 50;
- /**
- * 队列容量,默认:10000
- */
- @Value("${poros-async-executor.queueCapacity:10000}")
- private int queueCapacity = 10000;
- /**
- * 线程存活时间,默认:300
- */
- @Value("${poros-async-executor.keepAliveSeconds:300}")
- private int keepAliveSeconds = 300;
-
- /** 线程名前缀 */
- private String threadNamePrefix = "poros-async-executor";
- }
-
- /**
- * 异步任务配置: 线程池配置
- *
- * @author peng18.wu
- * @date 2022-05-09 14:00:26
- */
- @Setter
- @Getter
- @AllArgsConstructor
- @Configuration
- @EnableAsync(proxyTargetClass = true)
- public class DefaultAsyncTaskConfig extends AsyncConfigurerSupport {
- private final AsyncProperties asyncProperties;
-
- @Override
- @Bean("defaultAsyncPool")
- public Executor getAsyncExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(asyncProperties.getCorePoolSize());
- executor.setMaxPoolSize(asyncProperties.getMaxPoolSize());
- executor.setQueueCapacity(asyncProperties.getQueueCapacity());
- executor.setKeepAliveSeconds(asyncProperties.getKeepAliveSeconds());
- executor.setThreadNamePrefix(asyncProperties.getThreadNamePrefix());
- /*
- rejection-policy:当pool已经达到max size的时候,如何处理新任务
- CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
- */
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- return executor;
- }
-
- @Override
- public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
- return new SimpleAsyncUncaughtExceptionHandler();
- }
- }
2. 四种并行方式,工具类实现
- /**
- * @author peng18.wu
- * @desc 并行任务执行工具汇总:利用多线程 处理
- * @date 2022/8/10 14:50
- */
- @Component
- @Slf4j
- public class ParallelTaskUtil {
-
- @Autowired
- @Lazy
- @Qualifier("defaultAsyncPool")
- private AsyncTaskExecutor taskExecutor;
-
-
- /**
- * 使用parallel 进行 执行 有参多个任务
- *
- * 优点:简洁
- * 缺点:不能显示的设置 线程池的 配置
- *
- * (注意事项:parallelStream默认使用了fork-join框架,其默认线程数 = CPU核心数 )
- * 修改线程数得方式:https://blog.csdn.net/w1014074794/article/details/119947671
- *
- * @param params
- * @param task
- * @return
- */
- public List<Object> parallelStreamTask(List<Object> params, Function<Object, Object> task) {
- return params.parallelStream()
- .map(item -> task.apply(item))
- .collect(Collectors.toList());
- }
-
-
- /**
- * callableTask
- * 优点: 比较常规,没什么优点,就是能返回线程池执行的结果
- * 缺点: 一个个get 获取结果 会阻塞
- * 比较常规的方法:有返回的线程池任务执行的结果怕:
- *
- * @param params
- * @param task
- * @return
- */
- public List<Object> callableTask(List<Object> params, Function<Object, Object> task) {
-
- List<Future<Object>> futureTasks = params.stream()
- .map(s -> taskExecutor.submit(() -> task.apply(s)))
- .collect(Collectors.toList());
-
- List<Object> results = futureTasks.stream().map(item -> {
- try {
- return item.get();
- } catch (InterruptedException | ExecutionException e) {
- log.error("执行 callableTask 任务失败:{}", e);
- }
- return null;
- }).collect(Collectors.toList());
-
- return results;
- }
-
- /**
- * completeFutureTask :有多种使用api
- * 优点:等待所有的任务执行完毕后,获取结果
- *
- * @param params
- * @param task
- * @return
- */
- public List<Object> completeFutureTask(List<Object> params, Function<Object, Object> task) {
- List<CompletableFuture<Object>> taskFutures = new ArrayList<>();
-
- CompletableFuture[] futures = params.stream()
- .map(param -> CompletableFuture.supplyAsync(() -> task.apply(param), taskExecutor))
- .toArray(CompletableFuture[]::new);
-
- CompletableFuture.allOf(futures).join();
-
- List<Object> results = Arrays.stream(futures).map(s -> {
- try {
- return s.get();
- } catch (Exception e) {
- log.error("执行completeFutureTask任务失败:{}", e);
- }
- return null;
- }).collect(Collectors.toList());
-
- return results;
- }
-
-
- /**
- * CompletionService
- * 的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开来进行处理
- *
- * take()方法取得最先完成任务的Future对象,谁执行时间最短谁最先返回。
- * 有点: 如上,且解决了callableTask 的缺点
- *
- * @param params
- * @param task
- * @return
- */
- public List<Object> completionServiceTask(List<Object> params, Function<Object, Object> task) {
- ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<>(taskExecutor);
-
- params.stream()
- .map(s -> completionService.submit(() -> task.apply(s)))
- .collect(Collectors.toList());
-
-
- List<Object> results = new ArrayList<>();
- IntStream.range(1, params.size()).forEach(s -> {
- try {
- Object o = completionService.take().get();
- results.add(o);
- } catch (Exception e) {
- log.error("执行 callableTask 任务失败:{}", e);
- }
- });
-
- return results;
- }
-
-
-
- public void calConsumeTime(String methodName, long startTime) {
- long endTime = System.currentTimeMillis();
- long consumeReqTime = endTime - startTime;
- log.error("执行 {} 方法 耗时:{}", methodName, consumeReqTime);
- }
-
- public Integer doTask(String taskParam) {
- //log.info("正在处理 参数为 {} 的任务", taskParam);
- ThreadUtil.sleep(1);
- return Integer.parseInt(taskParam);
- }
3. 测试
- @Slf4j
- @ActiveProfiles("local-sit")
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = {BootApplication.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- public class ConsumableSpecBizImplTest {
-
- @Autowired
- private ParallelTaskUtil parallelTaskUtil;
-
- @Test
- public void testParallelTaskUtil() {
-
- List
- for (int i = 0; i < 100000; i++) {
- params.add(i + "");
- }
-
- long startTime1 = System.currentTimeMillis();
- parallelTaskUtil.callableTask(params, s -> parallelTaskUtil.doTask((String) s));
- parallelTaskUtil.calConsumeTime("callableTask", startTime1);
-
- long startTime2 = System.currentTimeMillis();
- parallelTaskUtil.parallelStreamTask(params, s -> parallelTaskUtil.doTask((String) s));
- parallelTaskUtil.calConsumeTime("parallelStreamTask", startTime2);
-
- long startTime3 = System.currentTimeMillis();
- parallelTaskUtil.completeFutureTask(params, s -> parallelTaskUtil.doTask((String) s));
- parallelTaskUtil.calConsumeTime("completeFutureTask", startTime3);
-
- long startTime4 = System.currentTimeMillis();
- parallelTaskUtil.completionServiceTask(params, s -> parallelTaskUtil.doTask((String) s));
- parallelTaskUtil.calConsumeTime("completionServiceTask", startTime4);
-
- }
4. 测试结果:
* 执行效率对比 : 10000的数据量 callableTask 方法 耗时:2828 parallelStreamTask 方法 耗时:1245 completeFutureTask 方法 耗时:2860 completionServiceTask 方法 耗时:2691 * 执行效率对比 : 100000的数据量 callableTask 方法 耗时:3821 parallelStreamTask 方法 耗时:11621 completeFutureTask 方法 耗时:3073 completionServiceTask 方法 耗时:3127