项目完整代码:https://github.com/YYYUUU42/Yu-dynamic-thread-pool
如果该项目对你有帮助,可以在 github 上点个 ⭐ 喔 🥰🥰
2.1. ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务的
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如Tomcat。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。
使用线程池好处

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。
任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:

线程池在业务系统应该都有使用到,帮助业务流程提升效率以及管理线程,多数场景应用于大量的异步任务处理。虽然线程池提供了我们许多便利,但也并非尽善尽美,比如下面这些问题就无法很好解决。
常见的线程池配置:
执行线程池执行任务的类型

但是并没有通用的线程池计算方式。并发任务的执行情况和任务类型相关,IO密集型和CPU密集型的任务运行起来的情况差异非常大,但这种占比是较难合理预估的,这导致很难有一个简单有效的通用公式帮我们直接计算出结果。
既然不能够保证一次计算出来合适的参数,那么是否可以将修改线程池参数的成本降下来,这样至少可以发生故障的时候可以快速调整从而缩短故障恢复的时间呢?基于这个思考,我们是否可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下:

简化线程池配置:线程池构造参数有8个,但是最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。
为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。
将线程池的配置放置在平台侧,允许简单的查看、修改线程池配置。

这里主要就是利用 Redis 的发布订阅功能来实现的
针对消息订阅发布功能,大部分使用的是kafka、RabbitMQ、ActiveMQ, RocketMQ等这几种,redis的订阅发布功能跟这三者相比,相对轻量,针对数据准确和安全性要求没有那么高可以直接使用

首先在使用线程池的业务端创建相对应的线程池bean
- @Slf4j
- @EnableAsync
- @Configuration
- @EnableConfigurationProperties(ThreadPoolConfigProperties.class)
- public class ThreadPoolConfig {
-
- /**
- * 创建线程池
- */
- @Bean("threadPoolExecutor01")
- public ThreadPoolExecutor threadPoolExecutor01(ThreadPoolConfigProperties properties) {
- // 线程池拒绝策略
- RejectedExecutionHandler handler;
- switch (properties.getPolicy()){
- case "AbortPolicy":
- handler = new ThreadPoolExecutor.AbortPolicy();
- break;
- case "DiscardPolicy":
- handler = new ThreadPoolExecutor.DiscardPolicy();
- break;
- case "DiscardOldestPolicy":
- handler = new ThreadPoolExecutor.DiscardOldestPolicy();
- break;
- case "CallerRunsPolicy":
- handler = new ThreadPoolExecutor.CallerRunsPolicy();
- break;
- default:
- handler = new ThreadPoolExecutor.AbortPolicy();
- break;
- }
-
- // 创建线程池
- return new ThreadPoolExecutor(properties.getCorePoolSize(),
- properties.getMaxPoolSize(),
- properties.getKeepAliveTime(),
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(properties.getBlockQueueSize()),
- Executors.defaultThreadFactory(),
- handler);
- }
-
- @Bean("threadPoolExecutor02")
- public ThreadPoolExecutor threadPoolExecutor02(ThreadPoolConfigProperties properties) {
- // 线程池拒绝策略
- RejectedExecutionHandler handler;
- switch (properties.getPolicy()){
- case "AbortPolicy":
- handler = new ThreadPoolExecutor.AbortPolicy();
- break;
- case "DiscardPolicy":
- handler = new ThreadPoolExecutor.DiscardPolicy();
- break;
- case "DiscardOldestPolicy":
- handler = new ThreadPoolExecutor.DiscardOldestPolicy();
- break;
- case "CallerRunsPolicy":
- handler = new ThreadPoolExecutor.CallerRunsPolicy();
- break;
- default:
- handler = new ThreadPoolExecutor.AbortPolicy();
- break;
- }
-
- // 创建线程池
- return new ThreadPoolExecutor(properties.getCorePoolSize(),
- properties.getMaxPoolSize(),
- properties.getKeepAliveTime(),
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(properties.getBlockQueueSize()),
- Executors.defaultThreadFactory(),
- handler);
- }
- }
然后就是线程池的一些操作(查询线程池列表、根据线程池名称查询线程池配置、更新线程池配置)
其中,由于在业务模块定义了线程池的 Bean,这个 Bean 是 ThreadPoolExecutor 类型的。当 Spring 启动时,它会创建这个Bean,并将其添加到内部的Bean容器中。
其中有个 Map集合。这个 Map 是由 Spring 自动注入的,它包含了所有类型为 ThreadPoolExecutor 的 Bean。键是 Bean 的名称,值是对应的 Bean 实例。因此,这个 Map 中会包含业务模块中定义的线程池。

- @Slf4j
- public class DynamicThreadPoolServiceImpl implements IDynamicThreadPoolService {
-
- /**
- * 服务名称
- */
- private final String applicationName;
-
- /**
- * 线程池集合
- */
- private final Map
threadPoolExecutorMap; -
- public DynamicThreadPoolServiceImpl(String applicationName, Map
threadPoolExecutorMap) { - this.applicationName = applicationName;
- this.threadPoolExecutorMap = threadPoolExecutorMap;
- }
-
- /**
- * 查询线程池列表
- */
- @Override
- public List
queryThreadPoolList() { - Set
threadPoolBeanNames = threadPoolExecutorMap.keySet(); - List
threadPoolList = new ArrayList<>(threadPoolBeanNames.size()); - for (String beanName : threadPoolBeanNames) {
- ThreadPoolConfigEntity threadPoolConfigVO = getThreadPoolConfig(beanName);
- threadPoolList.add(threadPoolConfigVO);
- }
- return threadPoolList;
- }
-
- /**
- * 根据线程池名称查询线程池配置
- */
- @Override
- public ThreadPoolConfigEntity queryThreadPoolConfigByName(String threadPoolName) {
- return getThreadPoolConfig(threadPoolName);
- }
-
- /**
- * 更新线程池配置
- */
- @Override
- public void updateThreadPoolConfig(ThreadPoolConfigEntity threadPoolConfigEntity) {
- if (threadPoolConfigEntity == null || !applicationName.equals(threadPoolConfigEntity.getAppName())) return;
- ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolConfigEntity.getThreadPoolName());
- if (threadPoolExecutor == null) {
- return;
- }
-
- // 设置参数 「调整核心线程数和最大线程数」
- threadPoolExecutor.setCorePoolSize(threadPoolConfigEntity.getCorePoolSize());
- threadPoolExecutor.setMaximumPoolSize(threadPoolConfigEntity.getMaximumPoolSize());
- }
-
- /**
- * 获取线程池配置
- */
- private ThreadPoolConfigEntity getThreadPoolConfig(String beanName) {
- ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(beanName);
- if (threadPoolExecutor == null) {
- return new ThreadPoolConfigEntity(applicationName, beanName);
- }
-
- ThreadPoolConfigEntity threadPoolConfigVO = new ThreadPoolConfigEntity(applicationName, beanName);
- threadPoolConfigVO.setCorePoolSize(threadPoolExecutor.getCorePoolSize());
- threadPoolConfigVO.setMaximumPoolSize(threadPoolExecutor.getMaximumPoolSize());
- threadPoolConfigVO.setActiveCount(threadPoolExecutor.getActiveCount());
- threadPoolConfigVO.setPoolSize(threadPoolExecutor.getPoolSize());
- threadPoolConfigVO.setQueueType(threadPoolExecutor.getQueue().getClass().getSimpleName());
- threadPoolConfigVO.setQueueSize(threadPoolExecutor.getQueue().size());
- threadPoolConfigVO.setRemainingCapacity(threadPoolExecutor.getQueue().remainingCapacity());
-
- return threadPoolConfigVO;
- }
- }
这些线程池的操作其实都是 Listener 来操作的
- @Slf4j
- public class RedisAdjustListener implements MessageListener
{ -
- private final IDynamicThreadPoolService dynamicThreadPoolService;
-
- private final IRegistry registry;
-
- public RedisAdjustListener(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
- this.dynamicThreadPoolService = dynamicThreadPoolService;
- this.registry = registry;
- }
-
- @Override
- public void onMessage(CharSequence charSequence, ThreadPoolConfigEntity threadPoolConfigEntity) {
- log.info("动态线程池,调整线程池配置。线程池名称:{} 核心线程数:{} 最大线程数:{}", threadPoolConfigEntity.getThreadPoolName(), threadPoolConfigEntity.getPoolSize(), threadPoolConfigEntity.getMaximumPoolSize());
- dynamicThreadPoolService.updateThreadPoolConfig(threadPoolConfigEntity);
-
- // 更新后上报最新数据
- List
threadPoolConfigEntities = dynamicThreadPoolService.queryThreadPoolList(); - registry.reportThreadPool(threadPoolConfigEntities);
-
- ThreadPoolConfigEntity threadPoolConfigEntityCurrent = dynamicThreadPoolService.queryThreadPoolConfigByName(threadPoolConfigEntity.getThreadPoolName());
- registry.reportThreadPoolConfigParameter(threadPoolConfigEntityCurrent);
- log.info("动态线程池,上报线程池配置:{}", JSON.toJSONString(threadPoolConfigEntity));
- }
- }
然后将最新的数据放到注册中心去
- public class RedisRegistry implements IRegistry {
-
- private final RedissonClient redissonClient;
-
- public RedisRegistry(RedissonClient redissonClient) {
- this.redissonClient = redissonClient;
- }
-
- /**
- * 线程池配置列表
- */
- @Override
- public void reportThreadPool(List
threadPoolEntities) { - RList
redisList = redissonClient.getList(RegistryEnumVO.THREAD_POOL_CONFIG_LIST_KEY.getKey()); - redisList.clear();
- redisList.addAll(threadPoolEntities);
- }
-
- /**
- * 线程池配置参数
- */
- @Override
- public void reportThreadPoolConfigParameter(ThreadPoolConfigEntity threadPoolConfigEntity) {
- String cacheKey = RegistryEnumVO.THREAD_POOL_CONFIG_PARAMETER_LIST_KEY.getKey() + ":" + threadPoolConfigEntity.getAppName() + ":" + threadPoolConfigEntity.getThreadPoolName();
- RBucket
bucket = redissonClient.getBucket(cacheKey); - bucket.set(threadPoolConfigEntity, Duration.ofDays(30));
- }
- }
因为其实管理端读取到的线程池数据都是从Redis中获取到的,所以也需要有一个定时任务更新注册中心的数据
- @Slf4j
- public class ThreadPoolDataReportJob {
-
-
- private final IDynamicThreadPoolService dynamicThreadPoolService;
-
- private final IRegistry registry;
-
- public ThreadPoolDataReportJob(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
- this.dynamicThreadPoolService = dynamicThreadPoolService;
- this.registry = registry;
- }
-
- /**
- * 每 10 秒上报一次线程池信息
- */
- @Scheduled(cron = "0/10 * * * * ?")
- public void execReportThreadPoolList() {
- List
threadPoolConfigEntities = dynamicThreadPoolService.queryThreadPoolList(); - registry.reportThreadPool(threadPoolConfigEntities);
- log.info("动态线程池,上报线程池信息:{}", JSON.toJSONString(threadPoolConfigEntities));
-
- for (ThreadPoolConfigEntity threadPoolConfigEntity : threadPoolConfigEntities) {
- registry.reportThreadPoolConfigParameter(threadPoolConfigEntity);
- log.info("动态线程池,上报线程池配置:{}", JSON.toJSONString(threadPoolConfigEntity));
- }
-
- }
-
- }
这些写一个方法模拟线程执行
- public ApplicationRunner applicationRunner(ExecutorService threadPoolExecutor01) throws InterruptedException {
- return new ApplicationRunner() {
- @Override
- public void run(ApplicationArguments args) throws Exception {
- while (true) {
- Random random = new Random();
- int randomInitialDelay = random.nextInt(3) + 1;
- int randomSleepTime = random.nextInt(3) + 1;
- threadPoolExecutor01.submit(new Runnable() {
- @Override
- public void run() {
- try {
- TimeUnit.SECONDS.sleep(randomInitialDelay);
- log.info("Task started after " + randomInitialDelay + " seconds.");
- TimeUnit.SECONDS.sleep(randomSleepTime);
- log.info("Task executed for " + randomSleepTime + " seconds.");
- } catch (Exception ex) {
- Thread.currentThread().interrupt();
- }
- }
- });
- Thread.sleep(random.nextInt(10) + 1);
- }
- }
- };
- }
修改前

修改最大线程数

修改后
