当我们需要批量执行一些比较耗时任务时,使用并发的方式减少业务处理的整体时间,防止客户端响应时间过长。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @ClassName : ThreadPoolConfig
* @Description : ThreadPoolConfig
* @Author : zhuguangkui
* @Date: 2022-08-03
*/
@Configuration
@Slf4j
public class ThreadPoolConfig {
@Autowired
ThreadPoolProperties threadPoolProperties;
/**
* 获得Java虚拟机可用的处理器个数 + 1
*/
private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;
/**
* 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
* 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
* 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
*/
@Bean(name = "varHandleThreadPool")
public ThreadPoolTaskExecutor varHandleThreadPool(){
int corePoolSizeConfig = threadPoolProperties.getCorePoolSizeConfig();
//核心线程数
int corePoolSize = corePoolSizeConfig ==0 ? THREADS : corePoolSizeConfig;
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(2 * corePoolSize);
executor.setCorePoolSize(corePoolSize);
executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
executor.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix());
// 线程池对拒绝任务(无线程可用)的处理策略
// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
log.info("doc 线程池初始化配置:{},THREADS:{}", threadPoolProperties, THREADS);
return executor;
}
}
/**
* 批量并发处理业务
*/
@Override
public void generateBatchFile(List<String> fileNameList) {
List<Future<IdocDoc>> futureList = new ArrayList<>(); // 并发处理结果集
// 批量处理业务
for (String fileName : fileNameList) {
Future<IdocDoc> future = generateFile(fileName);
futureList.add(future);
}
// 依次获取异步结果
while (true) {
for (Future<IdocDoc> future : futureList) {
if (future.isDone() && !future.isCancelled()) { // 判断任务执行是否完成
IdocDoc idocDoc = future.get(); // 获取异步结果
idocDocList.add(idocDoc);
futureList.remove(future);
}
}
if (CollectionUtil.isEmpty()) {
break;
}
Thread.sleep(1); // 每次轮询休息1毫秒,避免CPU占用
}
}
/**
* 子业务
*/
@Async("varHandleThreadPool")
public Future<IdocDoc> generateFile(String fileName) {
IdocDoc idoDoc = new IdoDoc();
idocDoc.setName(fileName);
... // 业务操作
// 返回异步结果
return new AsyncResult<>(idocDoc);
}
/**
* 批量并发处理业务
*/
@Override
public void generateBatchFile(List<String> fileNameList) {
List<CompletableFuture<IdocDoc>> futureList = new ArrayList<>(); // 并发处理结果集
// 批量处理业务
for (String fileName : fileNameList) {
CompletableFuture<IdocDoc> future = CompletableFuture.supplyAsync(() -> {
return generateFile(fileName);
}, threadPoolTaskExecutor);
futureList.add(future);
}
// 依次获取异步结果
while (true) {
for (CompletableFuture<IdocDoc> future : futureList) {
if (future.isDone() && !future.isCancelled()) { // 判断任务执行是否完成
IdocDoc idocDoc = future.get(); // 获取异步结果
idocDocList.add(idocDoc);
futureList.remove(future);
}
}
if (CollectionUtil.isEmpty()) {
break;
}
Thread.sleep(1); // 每次轮询休息1毫秒,避免CPU占用
}
}
/**
* 子业务
*/
public IdocDoc generateFile(String fileName) {
IdocDoc idoDoc = new IdoDoc();
idocDoc.setName(fileName);
... // 业务操作
// 返回异步结果
return idocDoc;
}