需求是想在线程池执行任务的时候,在开始前将调用线程的信息传到子线程中,在子线程完成后,再清除传入的数据。
下面使用了spring
的ThreadPoolTaskExecutor
来实现这个需求.
在jdk
中使用的是ThreadPoolExecutor
,用于自定义线程池。
在spring
中则是对ThreadPoolExecutor
又包了一层,加了一些参数进去ThreadPoolTaskExecutor
,然后作为bean
注入到spring
的ioc
容器中.
通常在使用线程池的时候想把调用线程的一些信息传递给子线程(线程池中的线程),一般都是要自己写一个装饰器,然后把装饰器传递给线程池的execute
方法。
不过spring
中已经有现成的方法了,就在ThreadPoolTaskExecutor
中,可以给定自定义的装饰器。
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor#initializeExecutor
可以看到在初始化的时候会判断是否存在装饰器
这样线程池中的线程在执行的时候都会经过装饰器处理,要注意的是在线程执行完成之后需要把信息清理,不然信息会串的
package org.xxx.common.core.executor.decorator;
import org.slf4j.MDC;
import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import java.util.Map;
/**
* 对spring的线程中的线程进行装饰
*/
public class ContextCopyingDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
try {
//当前请求上下文
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
//copy当前调用线程的 ThreadLocalMap 中保存的信息
Map<String,String> previous = MDC.getCopyOfContextMap();
return () -> {
try {
//http request上下文塞到当前线程中
RequestContextHolder.setRequestAttributes(context);
//将调用线程的 ThreadLocalMap 塞到当前线程
MDC.setContextMap(previous);
runnable.run();
} finally {
//clear
RequestContextHolder.resetRequestAttributes();
MDC.clear();
}
};
} catch (IllegalStateException e) {
return runnable;
}
}
}
//线程池配置
/**
* 核心线程数 = cpu 核心数 + 1
*/
private final int core = Runtime.getRuntime().availableProcessors() + 1;
private ScheduledExecutorService scheduledExecutorService;
@Bean(name = "threadPoolTaskExecutor")
@ConditionalOnProperty(prefix = "thread-pool", name = "enabled", havingValue = "true")
public ThreadPoolTaskExecutor threadPoolTaskExecutor(ThreadPoolProperties threadPoolProperties) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threadPoolProperties.getCoreSize());
executor.setMaxPoolSize(threadPoolProperties.getMaxCoreSize());
if(threadPoolProperties.getCoreSize() == 0) executor.setCorePoolSize(core);
if(threadPoolProperties.getMaxCoreSize() == 0) executor.setMaxPoolSize(core * 2);
//线程池队列大小
executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
//线程空闲存活时间
executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
//线程池拒绝时交由调用线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//装饰线程池中的线程
executor.setTaskDecorator(new ContextCopyingDecorator());
return executor;
}