• 动态线程池 dynamic-tp 源码


    目录

    1. 介绍

    2. Spring 环境整合配置源码

    2.1 DtpBeanDefinitionRegistrar

    2.2 DtpPostProcessorRegistrar

    2.3 ApplicationContextHolder

    2.4 DtpBaseBeanConfiguration

    2. 动态线程池 DtpLifecycle 生命周期源码

    3. 动态线程池 DtpRegistry 注册源码

    4. 动态线程池 DtpMonitor 监听源码

    4.1 检查报警 checkAlarm(executorNames);

    4.1.1 ALARM_INVOKER_CHAIN

    4.1.2 是否报警

    4.1.3 阈值是否超过

    4.1.4 发送告警信息

    4.2 指标收集 collect(executorNames);

    4.2.1 MetricsCollector

    4.2.2 InternalLogCollector

    4.2.3 LogCollector

    4.2.4 MicroMeterCollector

    5. 动态线程池 直接修改线程池参数原理

    6. 三方组件线程池管理源码

    6.1 DtpAdapterListener


    1. 介绍

    官方文档  https://dynamictp.cn/guide/introduction/background.html

    动态线程池 在 传统线程池 基础上增加了两大功能:

    (1)支持运行时动态修改线程池参数

            字段包括:核心线程数、最大线程数等

    (2)近实时监控线程池情况,如果发现超过指定阈值就告警通知用户

            告知用户后,用户就可以修改线程池配置,怎么修改呢?直接修改配置中心的数据,也就是将线程池字段存储在配置中心,线程池也从配置中心读取,用户也从配置中心修改。

    如想了解更多使用信息,请查看上方官方文档。

    2. Spring 环境整合配置源码

    本次使用 Nacos 作为配置中心

    源码中的  example-nacos-cloud 模块就是 nacos 配置中的示例工程

    在主类上使用 @EnableDynamicTp 注解来启用 动态线程池功能。

    1. @Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
    2. @Retention(RetentionPolicy.RUNTIME)
    3. @Documented
    4. @Import(DtpConfigurationSelector.class)
    5. public @interface EnableDynamicTp {
    6. }

    在 @EnableDynamicTp 又引入了 DtpConfigurationSelector 类,负责整合导入当前 动态线程池的所有配置项

    1. public class DtpConfigurationSelector implements DeferredImportSelector {
    2. // 简化代码
    3. @Override
    4. public String[] selectImports(AnnotationMetadata metadata) {
    5. return new String[] {
    6. // 将配置中心的线程池注册为 bean
    7. DtpBeanDefinitionRegistrar.class.getName(),
    8. // 将直接定义的线程池注册到 DtpRegistry 中
    9. DtpPostProcessorRegistrar.class.getName(),
    10. // 存储 ApplicationContext
    11. ApplicationContextHolder.class.getName(),
    12. // 基本 bean 配置
    13. DtpBaseBeanConfiguration.class.getName()
    14. };
    15. }
    16. }

    下面来讲解以下 DtpConfigurationSelector 中导入的每个配置的作用

    2.1 DtpBeanDefinitionRegistrar

    DtpBeanDefinitionRegistrar 类做的事情就是 将配置中心的线程池注册为bean

    在配置中心配置了 线程池 

    DtpBeanDefinitionRegistrar 就是将这个配置注册为一个 Bean 放入 Spring 容器,bean name 就是 线程池名称。

    到时候用的时候就是直接注入一个线程池,指定线程池名就可以使用了。

    看看源码如何实现:

    1. @Slf4j
    2. public class DtpBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {
    3. // 简化代码
    4. @Override
    5. public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    6. DtpProperties dtpProperties = new DtpProperties();
    7. // 将配置中心的属性 加载到 DtpProperties
    8. PropertiesBinder.bindDtpProperties(environment, dtpProperties);
    9. val executors = dtpProperties.getExecutors();
    10. executors.forEach(e -> {
    11. // 循环线程池数组,一个个加入 bean
    12. Class executorTypeClass = ExecutorType.getClass(e.getExecutorType());
    13. Map propertyValues = buildPropertyValues(e);
    14. // 构建构造器参数数组(corePoolSize、maximumPoolSize 等)
    15. Object[] args = buildConstructorArgs(executorTypeClass, e);
    16. // 注册 bean
    17. BeanUtil.registerIfAbsent(registry, e.getThreadPoolName(), executorTypeClass, propertyValues, args);
    18. });
    19. }
    20. }

    BeanUtil.registerIfAbsent () 源码

    1. public final class BeanUtil {
    2. public static void registerIfAbsent(BeanDefinitionRegistry registry,
    3. String beanName,
    4. Class clazz,
    5. Map propertyValues,
    6. Object... constructorArgs) {
    7. // 简化代码
    8. doRegister(registry, beanName, clazz, propertyValues, constructorArgs);
    9. }
    10. public static void doRegister(BeanDefinitionRegistry registry,
    11. String beanName,
    12. Class clazz,
    13. Map propertyValues,
    14. Object... constructorArgs) {
    15. // 简化代码
    16. // 使用 Spring 自带功能注册
    17. registry.registerBeanDefinition(beanName, builder.getBeanDefinition());
    18. }
    19. }

    2.2 DtpPostProcessorRegistrar

    DtpPostProcessorRegistrar 将直接定义的线程池注册到 DtpRegistry 中

    什么意思呢,本来动态线程池是在 配置中心定义的。而动态线程池也可以使用代码编程方式定义。

    定义方式可查看这里 代码使用 | dynamic-tp (dynamictp.cn) 

    DtpPostProcessorRegistrar 要处理的是图中如此方式定义的,即对 @Bean 修饰的线程池 使用 @DynamicTp("commonExecutor") 注解,括号内参数为动态线程池名称。

    DtpPostProcessorRegistrar 要做的事情是什么呢,就是将普通的 JUC ThreadPoolExecutor 线程封装为 支持运行时修改参数、支持告警监控的 动态线程池。

    看看源码如何实现

    1. public class DtpPostProcessorRegistrar implements ImportBeanDefinitionRegistrar {
    2. private static final String BEAN_NAME = "dtpPostProcessor";
    3. @Override
    4. public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,
    5. BeanDefinitionRegistry registry) {
    6. if (!registry.containsBeanDefinition(BEAN_NAME)) {
    7. AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder
    8. .genericBeanDefinition(DtpPostProcessor.class,
    9. DtpPostProcessor::new)
    10. .getBeanDefinition();
    11. // 完全后台角色
    12. beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
    13. beanDefinition.setSynthetic(true);
    14. registry.registerBeanDefinition(BEAN_NAME, beanDefinition);
    15. }
    16. }
    17. }

    其实主要是注册了另一个 bean DtpPostProcessor

    BeanPostProcessor 是 Spring Bean 初始化阶段的回调接口

    阅读 DtpPostProcessor 源码,其实就是将普通的线程池注册为动态线程池。

    1. public class DtpPostProcessor implements BeanPostProcessor {
    2. private DefaultListableBeanFactory beanFactory;
    3. @Override
    4. public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException {
    5. if (!(bean instanceof ThreadPoolExecutor) && !(bean instanceof ThreadPoolTaskExecutor)) {
    6. return bean;
    7. }
    8. if (bean instanceof DtpExecutor) {
    9. // 处理 DtpExecutor
    10. registerDtp(bean);
    11. } else {
    12. // 处理 ThreadPoolExecutor or ThreadPoolTaskExecutor
    13. registerCommon(bean, beanName);
    14. }
    15. return bean;
    16. }
    17. private void registerDtp(Object bean) {
    18. DtpExecutor dtpExecutor = (DtpExecutor) bean;
    19. if (bean instanceof EagerDtpExecutor) {
    20. ((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor);
    21. }
    22. // 注册为动态线程池
    23. DtpRegistry.registerExecutor(ExecutorWrapper.of(dtpExecutor), "beanPostProcessor");
    24. }
    25. private void registerCommon(Object bean, String beanName) {
    26. String dtpAnnotationVal;
    27. try {
    28. // 获取到注解
    29. DynamicTp dynamicTp = beanFactory.findAnnotationOnBean(beanName, DynamicTp.class);
    30. // 省略
    31. String poolName = StringUtils.isNotBlank(dtpAnnotationVal) ? dtpAnnotationVal : beanName;
    32. Executor executor;
    33. if (bean instanceof ThreadPoolTaskExecutor) {
    34. executor = ((ThreadPoolTaskExecutor) bean).getThreadPoolExecutor();
    35. } else {
    36. executor = (Executor) bean;
    37. }
    38. // 注册为动态线程池
    39. DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, executor), "beanPostProcessor");
    40. }
    41. }

    2.3 ApplicationContextHolder

    就是存储一下 ApplicationContext

    1. public class ApplicationContextHolder implements ApplicationContextAware {
    2. private static ApplicationContext context;
    3. @Override
    4. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    5. context = applicationContext;
    6. }
    7. public static T getBean(Class clazz) {
    8. if (Objects.isNull(context)) {
    9. throw new NullPointerException("ApplicationContext is null, please check if the spring container is started.");
    10. }
    11. return context.getBean(clazz);
    12. }
    13. public static T getBean(String name, Class clazz) {
    14. return context.getBean(name, clazz);
    15. }
    16. public static Map getBeansOfType(Class clazz) {
    17. return context.getBeansOfType(clazz);
    18. }
    19. public static ApplicationContext getInstance() {
    20. return context;
    21. }
    22. public static Environment getEnvironment() {
    23. return getInstance().getEnvironment();
    24. }
    25. public static void publishEvent(ApplicationEvent event) {
    26. context.publishEvent(event);
    27. }
    28. }

    2.4 DtpBaseBeanConfiguration

    DtpBaseBeanConfiguration 配置了 dynamic-tp 要用的 bean。

    1. /**
    2. * 动态线程池基本 bean 配置
    3. */
    4. @Configuration(proxyBeanMethods = false)
    5. @EnableConfigurationProperties(DtpProperties.class)
    6. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    7. public class DtpBaseBeanConfiguration {
    8. /**
    9. * 动态线程池生命周期
    10. */
    11. @Bean
    12. public DtpLifecycle dtpLifecycle() {
    13. return new DtpLifecycle();
    14. }
    15. /**
    16. * 动态线程池注册器
    17. */
    18. @Bean
    19. public DtpRegistry dtpRegistry(DtpProperties dtpProperties) {
    20. return new DtpRegistry(dtpProperties);
    21. }
    22. /**
    23. * 动态线程池监控
    24. */
    25. @Bean
    26. public DtpMonitor dtpMonitor(DtpProperties dtpProperties) {
    27. return new DtpMonitor(dtpProperties);
    28. }
    29. /**
    30. * 动态线程池暴露的端点
    31. */
    32. @Bean
    33. @ConditionalOnAvailableEndpoint
    34. public DtpEndpoint dtpEndpoint() {
    35. return new DtpEndpoint();
    36. }
    37. /**
    38. * banner 打印
    39. */
    40. @Bean
    41. public DtpBannerPrinter dtpBannerPrinter() {
    42. return new DtpBannerPrinter();
    43. }
    44. /**
    45. * hash 时间轮
    46. * 用于实现 任务超时监控、队列超时监控功能
    47. */
    48. @Bean
    49. public HashedWheelTimer hashedWheelTimer() {
    50. return new HashedWheelTimer(new NamedThreadFactory("dtpRunnable-timeout", true), 10, TimeUnit.MILLISECONDS);
    51. }
    52. }

    Dtp 是 dynamic thread pool 的缩写

    这里简单介绍一下这些类的作用

    1. DtpLifecycle 动态线程池生命周期,负责全部动态线程池的启动、停止

    2. DtpRegistry 动态线程池注册器,负责注册线程池

    3. DtpMonitor 动态线程池监控,负责监控当前线程池,检查报警、收集监控指标

    4. DtpEndpoint 就是 spring-boot-actuator 向外暴露指标的端点配置

    5. DtpBannerPrinter 用于打印 dynamic-tp 的 logo

    6. HashedWheelTimer hash 时间轮 用于实现 任务超时监控、队列超时监控功能

    2. 动态线程池 DtpLifecycle 生命周期源码

    DtpLifecycle 负责管理动态线程池的生命周期。如:初始化、销毁。

    源码如下

    1. public class DtpLifecycle implements SmartLifecycle {
    2. private final AtomicBoolean running = new AtomicBoolean(false);
    3. @Override
    4. public void start() {
    5. if (this.running.compareAndSet(false, true)) {
    6. // 初始化全部线程池
    7. DtpRegistry.listAllExecutors().forEach((k, v) -> DtpLifecycleSupport.initialize(v));
    8. }
    9. }
    10. @Override
    11. public void stop() {
    12. if (this.running.compareAndSet(true, false)) {
    13. shutdownInternal();
    14. // 销毁全部线程池
    15. DtpRegistry.listAllExecutors().forEach((k, v) -> DtpLifecycleSupport.destroy(v));
    16. }
    17. }
    18. @Override
    19. public boolean isRunning() {
    20. return this.running.get();
    21. }
    22. public void shutdownInternal() {
    23. DtpMonitor.destroy();
    24. AlarmManager.destroy();
    25. NoticeManager.destroy();
    26. }
    27. }

    可见,由 Spring 容器回调当前容器的状态(start、stop)。

    在回调里获取到全部动态线程池后,循环遍历交由 DtpLifecycleSupport 实现:

    初始化实现 initialize

    1. public class DtpLifecycleSupport {
    2. public static void initialize(ExecutorWrapper executorWrapper) {
    3. if (executorWrapper.isDtpExecutor()) {
    4. DtpExecutor dtpExecutor = (DtpExecutor) executorWrapper.getExecutor();
    5. dtpExecutor.initialize();
    6. }
    7. }
    8. }

    最终委托给了 DtpExecutor#initialize

    1. public class DtpExecutor extends ThreadPoolExecutor
    2. implements SpringExecutor, ExecutorAdapter {
    3. public void initialize() {
    4. // 初始化通知
    5. NotifyHelper.initNotify(this);
    6. if (preStartAllCoreThreads) {
    7. // 预启动全部核心线程
    8. prestartAllCoreThreads();
    9. }
    10. // 设置拒绝策略
    11. setRejectHandler(RejectHandlerGetter.buildRejectedHandler(getRejectHandlerType()));
    12. }
    13. }

    销毁实现 destroy

    1. public class DtpLifecycleSupport {
    2. public static void destroy(ExecutorWrapper executorWrapper) {
    3. if (executorWrapper.isDtpExecutor()) {
    4. destroy((DtpExecutor) executorWrapper.getExecutor());
    5. } else if (executorWrapper.isThreadPoolExecutor()) {
    6. internalShutdown(((ThreadPoolExecutorAdapter) executorWrapper.getExecutor()).getOriginal(),
    7. executorWrapper.getThreadPoolName(),
    8. true,
    9. 0);
    10. }
    11. }
    12. // 关闭线程池
    13. public static void internalShutdown(ThreadPoolExecutor executor,
    14. String threadPoolName,
    15. boolean waitForTasksToCompleteOnShutdown,
    16. int awaitTerminationSeconds) {
    17. if (Objects.isNull(executor)) {
    18. return;
    19. }
    20. log.info("Shutting down ExecutorService, threadPoolName: {}", threadPoolName);
    21. if (waitForTasksToCompleteOnShutdown) {
    22. // waitForTasksToCompleteOnShutdown 在 shutdown 线程池的时候等待任务完成
    23. // executor.shutdown(); 表示不接受新任务了,有序的执行完之前提交的任务后再关闭
    24. executor.shutdown();
    25. } else {
    26. // 线程池关闭的时候不等待任务完成
    27. // 直接强行关闭任务
    28. for (Runnable remainingTask : executor.shutdownNow()) {
    29. // 取消剩余任务
    30. cancelRemainingTask(remainingTask);
    31. }
    32. }
    33. // 如果必要,等待 awaitTerminationSeconds 秒后关闭线程池
    34. awaitTerminationIfNecessary(executor, threadPoolName, awaitTerminationSeconds);
    35. }
    36. }

    3. 动态线程池 DtpRegistry 注册源码

    动态线程池注册到底做了个什么事情呢,来看看源码

    本质就是将动态线程池存到 map 里,也即是全部收集起来。

    1. public class DtpRegistry implements ApplicationRunner {
    2. // 简化代码
    3. /**
    4. * 维护所有自动注册和手动注册的 动态线程池
    5. */
    6. private static final Map EXECUTOR_REGISTRY
    7. = new ConcurrentHashMap<>();
    8. public static void registerExecutor(ExecutorWrapper wrapper, String source) {
    9. EXECUTOR_REGISTRY.putIfAbsent(wrapper.getThreadPoolName(), wrapper);
    10. }
    11. }

    4. 动态线程池 DtpMonitor 监听源码

    到此位置,动态线程池环境配好了,也收集起来了,现在就是该监听了动态线程池内容了。

    当监听到到超过配置阈值后,就得告警了,现在先做到监听到。

    监听功能的入口类是 DtpMonitor

    观察该类源码,可见做的核心事情就是 周期性执行一个监控检查任务(每 5 秒执行一次)

    1. public class DtpMonitor implements ApplicationRunner {
    2. // 简化代码
    3. private static final ScheduledExecutorService MONITOR_EXECUTOR = new ScheduledThreadPoolExecutor(
    4. 1, new NamedThreadFactory("dtp-monitor", true));
    5. @Override
    6. public void run(ApplicationArguments args) {
    7. MONITOR_EXECUTOR.scheduleWithFixedDelay(this::run,
    8. 0, dtpProperties.getMonitorInterval(), TimeUnit.SECONDS);
    9. }
    10. /**
    11. * 周期性的检查
    12. * 默认 5 秒检查一次
    13. */
    14. private void run() {
    15. // 获取全部线程池名字
    16. Set executorNames = DtpRegistry.listAllExecutorNames();
    17. // 检查报警
    18. checkAlarm(executorNames);
    19. // 指标收集
    20. collect(executorNames);
    21. }
    22. }

    run 方法做了两个事情

    4.1 检查报警 checkAlarm(executorNames);

    1. public class DtpMonitor implements ApplicationRunner {
    2. private void checkAlarm(Set executorNames) {
    3. executorNames.forEach(x -> {
    4. // 遍历循环 对每一个线程池检查报警
    5. ExecutorWrapper wrapper = DtpRegistry.getExecutorWrapper(x);
    6. // 1. 异步检查报警
    7. AlarmManager.doAlarmAsync(wrapper, SCHEDULE_NOTIFY_ITEMS);
    8. });
    9. // 2. 发布监控检查事件
    10. publishAlarmCheckEvent();
    11. }
    12. }

    首先说一下第二步的 发布监控检查事件 就是使用的 Spring 自带的事件通知机制。发布这个事件是为了给 第三方组件线程池监控用的,就是让那个模块直到当前要做这个 报警检查事情。这里使用事件机制,就是为了减耦合的。

    1. public class DtpMonitor implements ApplicationRunner {
    2. private void publishAlarmCheckEvent() {
    3. AlarmCheckEvent event = new AlarmCheckEvent(this, dtpProperties);
    4. ApplicationContextHolder.publishEvent(event);
    5. }
    6. }

    回到之前的正文,再说第一步的 异步检查报警源码 AlarmManager.doAlarmAsync

    1. public class AlarmManager {
    2. public static void doAlarmAsync(ExecutorWrapper executorWrapper, List notifyItemEnums) {
    3. ALARM_EXECUTOR.execute(() -> notifyItemEnums.forEach(x -> doAlarm(executorWrapper, x)));
    4. }
    5. /**
    6. * 检查报警
    7. */
    8. public static void doAlarm(ExecutorWrapper executorWrapper, NotifyItemEnum notifyItemEnum) {
    9. NotifyHelper.getNotifyItem(executorWrapper, notifyItemEnum).ifPresent(notifyItem -> {
    10. // 如果当前线程池存在这个报警项目
    11. val alarmCtx = new AlarmCtx(executorWrapper, notifyItem);
    12. ALARM_INVOKER_CHAIN.proceed(alarmCtx);
    13. });
    14. }
    15. }

    这里就只看最后这里的 ALARM_INVOKER_CHAIN.proceed(alarmCtx) 核心源码,重点只有这一行。前面的都是优化写法、优化手段。

    4.1.1 ALARM_INVOKER_CHAIN

    ALARM_INVOKER_CHAIN 是什么呢

    其实就是一个 调用链,第一个元素一定是 AlarmBaseFilter 负责做 前置判断逻辑,第二个元素就是做真正的业务操作了。

    1. public class AlarmManager {
    2. private static final InvokerChain ALARM_INVOKER_CHAIN;
    3. static {
    4. // 默认初始化,构建一个调用链
    5. ALARM_INVOKER_CHAIN = NotifyFilterBuilder.getAlarmInvokerChain();
    6. }
    7. }
    8. public class NotifyFilterBuilder {
    9. public static InvokerChain getAlarmInvokerChain() {
    10. val filters = ApplicationContextHolder.getBeansOfType(NotifyFilter.class);
    11. Collection alarmFilters = Lists.newArrayList(filters.values());
    12. // 添加 base filter
    13. alarmFilters.add(new AlarmBaseFilter());
    14. // 做排序
    15. alarmFilters = alarmFilters.stream()
    16. .filter(x -> x.supports(NotifyTypeEnum.ALARM))
    17. .sorted(Comparator.comparing(Filter::getOrder))
    18. .collect(Collectors.toList());
    19. // 构建调用链
    20. return InvokerChainFactory.buildInvokerChain(new AlarmInvoker(), alarmFilters.toArray(new NotifyFilter[0]));
    21. }
    22. }

    这里的代码最初是由 周期性定时任务调度过来的(每隔5秒)。

    看看调用链的第一个元素 AlarmBaseFilter 源码

    1. public class AlarmBaseFilter implements NotifyFilter {
    2. private static final Object SEND_LOCK = new Object();
    3. @Override
    4. public void doFilter(BaseNotifyCtx context, Invoker nextInvoker) {
    5. // 简化代码
    6. val executorWrapper = context.getExecutorWrapper();
    7. val notifyItem = context.getNotifyItem();
    8. // 是否报警
    9. boolean ifAlarm = AlarmLimiter.ifAlarm(executorWrapper.getThreadPoolName(), notifyItem.getType());
    10. if (!ifAlarm) {
    11. // 不报警,直接返回,不执行后面了
    12. return;
    13. }
    14. // 检查阈值是否触发
    15. if (!AlarmManager.checkThreshold(executorWrapper, context.getNotifyItemEnum(), notifyItem)) {
    16. return;
    17. }
    18. synchronized (SEND_LOCK) {
    19. // 简化代码
    20. // 存储当前线程池的通知项目
    21. AlarmLimiter.putVal(executorWrapper.getThreadPoolName(), notifyItem.getType());
    22. }
    23. nextInvoker.invoke(context);
    24. }
    25. }

    其实直接分为 2 大块

    1. 是否报警
    2. 阈值是否超过

    如果这两个判断都满足了,就走入下一个调用链了。

    这两个判断都有必要细说一下

    4.1.2 是否报警

    是否报警进入了 AlarmLimiter 类

    这个类主要是实现,指定间隔时间之后才报警一次功能。

    比如,配置中心如下配置,代表 120 秒才告警一次。否则的话每触发一次就直接告警一次,就可能导致短时间发送了大量相同的警告,其实挺无用的。

    下来看看这个类源码

    1. public class AlarmLimiter {
    2. /**
    3. * Cache<线程池名称,通知项目>
    4. */
    5. private static final Map> ALARM_LIMITER = new ConcurrentHashMap<>();
    6. private AlarmLimiter() { }
    7. /**
    8. * 初始化
    9. */
    10. public static void initAlarmLimiter(String threadPoolName, NotifyItem notifyItem) {
    11. if (NotifyItemEnum.CHANGE.getValue().equalsIgnoreCase(notifyItem.getType())) {
    12. // 内容改变报警项,本类不处理
    13. return;
    14. }
    15. // threadPoolName + ":" + type
    16. String key = genKey(threadPoolName, notifyItem.getType());
    17. Cache cache = CacheBuilder.newBuilder()
    18. .expireAfterWrite(notifyItem.getInterval(), TimeUnit.SECONDS)
    19. .build();
    20. ALARM_LIMITER.put(key, cache);
    21. }
    22. public static void putVal(String threadPoolName, String type) {
    23. String key = genKey(threadPoolName, type);
    24. ALARM_LIMITER.get(key).put(type, type);
    25. }
    26. public static String getAlarmLimitInfo(String key, String type) {
    27. val cache = ALARM_LIMITER.get(key);
    28. if (Objects.isNull(cache)) {
    29. return null;
    30. }
    31. return cache.getIfPresent(type);
    32. }
    33. public static boolean ifAlarm(String threadPoolName, String type) {
    34. String key = genKey(threadPoolName, type);
    35. // 返回 null 才报警
    36. return StringUtils.isBlank(getAlarmLimitInfo(key, type));
    37. }
    38. public static String genKey(String threadPoolName, String type) {
    39. return threadPoolName + ":" + type;
    40. }
    41. }

    其实本质就是用了 Guava的缓存功能,这个缓存的有效期就配置指定的间隔时间。

    每一次触发的时候,就put进去(如果 key 存在了,put 操作只是替换 value 值)

    所以过期时间就是从第一次put 不存在的 key 的时候开始计算的。当过期时间到了,缓存中的缓存就自动清除了。就是这样一个机制。

    所以这个过滤器的第一个节点 AlarmBaseFilter#doFilter 的 是否报警 判断的是什么呢

    ifAlarm 判断的就是缓存中是否存在这个key,不存在才去才往下走真正的告警逻辑。

    如果存在了代表这个时间间隔里已经处理过了,就不能往下继续处理了。

    1. public class AlarmLimiter {
    2. private static final Map> ALARM_LIMITER = new ConcurrentHashMap<>();
    3. // 是否报警
    4. public static boolean ifAlarm(String threadPoolName, String type) {
    5. // key 是 threadPoolName + ":" + type;
    6. String key = genKey(threadPoolName, type);
    7. // 返回 null 才报警
    8. return StringUtils.isBlank(getAlarmLimitInfo(key, type));
    9. }
    10. public static String getAlarmLimitInfo(String key, String type) {
    11. val cache = ALARM_LIMITER.get(key);
    12. if (Objects.isNull(cache)) {
    13. return null;
    14. }
    15. return cache.getIfPresent(type);
    16. }
    17. }

    4.1.3 阈值是否超过

    这里就是真正的判断是否到底是否需要告警了吗,因为当超过阈值了才去告警。

    阈值在这里配置。

    首先告警分为 不同的告警项目,由 NotifyItemEnum 类列出来全部告警项目。

    1. public enum NotifyItemEnum {
    2. /**
    3. * 变更通知
    4. */
    5. CHANGE("change"),
    6. /**
    7. * 线程池活跃度通知
    8. * 活性报警
    9. * ThreadPool liveness notify.
    10. * liveness = activeCount / maximumPoolSize
    11. */
    12. LIVENESS("liveness"),
    13. /**
    14. * 容量报警
    15. */
    16. CAPACITY("capacity"),
    17. /**
    18. * 任务被拒绝报警
    19. */
    20. REJECT("reject"),
    21. /**
    22. * 任务超时报警
    23. */
    24. RUN_TIMEOUT("run_timeout"),
    25. /**
    26. * 任务队列等待超时报警
    27. */
    28. QUEUE_TIMEOUT("queue_timeout");
    29. private final String value;
    30. NotifyItemEnum(String value) {
    31. this.value = value;
    32. }
    33. }

     检查阈值源码如下

    1. /**
    2. * 检查阈值
    3. * @return false 没触发
    4. */
    5. public static boolean checkThreshold(ExecutorWrapper executor, NotifyItemEnum itemEnum, NotifyItem notifyItem) {
    6. switch (itemEnum) {
    7. case CAPACITY:
    8. // 检查容量
    9. return checkCapacity(executor, notifyItem);
    10. case LIVENESS:
    11. // 检查活性
    12. return checkLiveness(executor, notifyItem);
    13. case REJECT:
    14. case RUN_TIMEOUT:
    15. case QUEUE_TIMEOUT:
    16. return checkWithAlarmInfo(executor, notifyItem);
    17. default:
    18. log.error("Unsupported alarm type, type: {}", itemEnum);
    19. return false;
    20. }
    21. }

    检查容量 checkCapacity

    1. public class AlarmManager {
    2. private static boolean checkCapacity(ExecutorWrapper executorWrapper, NotifyItem notifyItem) {
    3. // 获取到当前线程池
    4. val executor = executorWrapper.getExecutor();
    5. if (executor.getQueueSize() <= 0) {
    6. return false;
    7. }
    8. // 阈值比对判断
    9. double div = NumberUtil.div(executor.getQueueSize(), executor.getQueueCapacity(), 2) * 100;
    10. return div >= notifyItem.getThreshold();
    11. }
    12. }

    检查活性 checkLiveness

    1. public class AlarmManager {
    2. private static boolean checkLiveness(ExecutorWrapper executorWrapper, NotifyItem notifyItem) {
    3. val executor = executorWrapper.getExecutor();
    4. int maximumPoolSize = executor.getMaximumPoolSize();
    5. // 阈值判断
    6. double div = NumberUtil.div(executor.getActiveCount(), maximumPoolSize, 2) * 100;
    7. return div >= notifyItem.getThreshold();
    8. }
    9. }

    其他

    任务被拒绝报警、任务运行超时报警、任务队列等待超时报警

    1. public class AlarmManager {
    2. private static boolean checkWithAlarmInfo(ExecutorWrapper executorWrapper, NotifyItem notifyItem) {
    3. // 获取当前线程当前报警项目的 报警信息
    4. AlarmInfo alarmInfo = AlarmCounter.getAlarmInfo(executorWrapper.getThreadPoolName(), notifyItem.getType());
    5. return alarmInfo.getCount() >= notifyItem.getThreshold();
    6. }
    7. }

    可见 ,这里由 alarmInfo 的 count 字段判断。

    counte 字段是在 AlarmManager 提供的 供外部直接调用的 doAlarmAsync 方法里直接累加的。 

    追踪这个方法的调用方,正好是那两个事件(任务运行超时、队列排队超时),也就是那两个事件检测到超时后,就来调用这个类了,并完成一次次数的累加用于判断阈值。

    任务被拒绝 是上面的方法调用方调用的。

     至此就完成了 阈值检查的判断。

    4.1.4 发送告警信息

    现在两个关键判断都走完了,就可以进入真正的发送告警的下一个链节点了。

    也就是 AlarmInvoker

    1. public class AlarmInvoker implements Invoker {
    2. @Override
    3. public void invoke(BaseNotifyCtx context) {
    4. val alarmCtx = (AlarmCtx) context;
    5. val executorWrapper = alarmCtx.getExecutorWrapper();
    6. // 当前报警项目
    7. val notifyItem = alarmCtx.getNotifyItem();
    8. // 获取当前线程当前报警项目的 报警信息
    9. val alarmInfo = AlarmCounter.getAlarmInfo(executorWrapper.getThreadPoolName(), notifyItem.getType());
    10. alarmCtx.setAlarmInfo(alarmInfo);
    11. try {
    12. // 设置环境
    13. DtpNotifyCtxHolder.set(context);
    14. // 发送告警
    15. NotifierHandler.getInstance().sendAlarm(NotifyItemEnum.of(notifyItem.getType()));
    16. // 重置报警次数
    17. AlarmCounter.reset(executorWrapper.getThreadPoolName(), notifyItem.getType());
    18. } finally {
    19. DtpNotifyCtxHolder.remove();
    20. }
    21. }
    22. }

     真正干活的代码时 try 中间的 发送告警

    NotifierHandler.getInstance().sendAlarm(NotifyItemEnum.of(notifyItem.getType()));

    1. public final class NotifierHandler {
    2. /**
    3. * 发送告警
    4. * @param notifyItemEnum 当前通知的报警项目
    5. */
    6. public void sendAlarm(NotifyItemEnum notifyItemEnum) {
    7. NotifyItem notifyItem = DtpNotifyCtxHolder.get().getNotifyItem();
    8. for (String platformId : notifyItem.getPlatformIds()) {
    9. NotifyHelper.getPlatform(platformId).ifPresent(p -> {
    10. DtpNotifier notifier = NOTIFIERS.get(p.getPlatform().toLowerCase());
    11. if (notifier != null) {
    12. notifier.sendAlarmMsg(p, notifyItemEnum);
    13. }
    14. });
    15. }
    16. }
    17. }

    这段代码做了一个循环遍历判断,这是什么意思呢

    配置中心可以配置当前报警项目的通知平台

     循环判断是为了,只拿到当前配置的 通知器对象,拿到通知器对象后,使用通知器发送通知、告警。

    最终得到的是 DtpNotifier 对象,看看类图。

     DtpNotifier 代表一个通知器

    AbstractDtpNotifier 实现了通用方法

    子类代表不同的通知类型 

    • DtpWechatNotifier 企业微信通知

    • DtpEmailNotifier 邮件通知

    • DtpDingNotifier 钉钉通过

    • DtpLarkNotifier 飞书通知

    不过真正干活的(发送web请求)还是 Notifier 子类

    这里就是纯粹的发送通知类,不包含 dynamic-tp 相关业务代码的类,上面的 DtpNotifier 其实相当于一个适配器了,用来分离耦合代码。

    举其中一个例子 LarkNotifier

    本质就是发送一个 api 请求 

    4.2 指标收集 collect(executorNames);

    回到这里,该看第二部的 指标收集源码了。

    指标收集后可以用于 grafana 查看

    1. public class DtpMonitor implements ApplicationRunner {
    2. private void run() {
    3. // 获取全部线程池名字
    4. Set executorNames = DtpRegistry.listAllExecutorNames();
    5. // 1. 检查报警
    6. checkAlarm(executorNames);
    7. // 2. 指标收集
    8. collect(executorNames);
    9. }
    10. private void collect(Set executorNames) {
    11. if (!dtpProperties.isEnabledCollect()) {
    12. return;
    13. }
    14. executorNames.forEach(x -> {
    15. ExecutorWrapper wrapper = DtpRegistry.getExecutorWrapper(x);
    16. doCollect(ExecutorConverter.toMetrics(wrapper));
    17. });
    18. publishCollectEvent();
    19. }
    20. private void doCollect(ThreadPoolStats threadPoolStats) {
    21. try {
    22. CollectorHandler.getInstance().collect(threadPoolStats, dtpProperties.getCollectorTypes());
    23. } catch (Exception e) {
    24. log.error("DynamicTp monitor, metrics collect error.", e);
    25. }
    26. }
    27. }

    最后的 doCollect 是核心了。

    第一个参数是 当前线程池的统计信息,也就是要收集的指标

    第二个参数是 配置的指标收集类型

    进入 collect 方法源码

    1. public final class CollectorHandler {
    2. private static final Map COLLECTORS = Maps.newHashMap();
    3. private CollectorHandler() {
    4. // 系统初始化 加入 收集器
    5. ServiceLoader loader = ServiceLoader.load(MetricsCollector.class);
    6. for (MetricsCollector collector : loader) {
    7. COLLECTORS.put(collector.type(), collector);
    8. }
    9. MetricsCollector microMeterCollector = new MicroMeterCollector();
    10. LogCollector logCollector = new LogCollector();
    11. InternalLogCollector internalLogCollector = new InternalLogCollector();
    12. COLLECTORS.put(microMeterCollector.type(), microMeterCollector);
    13. COLLECTORS.put(logCollector.type(), logCollector);
    14. COLLECTORS.put(internalLogCollector.type(), internalLogCollector);
    15. }
    16. public void collect(ThreadPoolStats poolStats, List types) {
    17. if (poolStats == null || CollectionUtils.isEmpty(types)) {
    18. return;
    19. }
    20. for (String collectorType : types) {
    21. MetricsCollector collector = COLLECTORS.get(collectorType.toLowerCase());
    22. if (collector != null) {
    23. // 获取到当前收集器
    24. collector.collect(poolStats);
    25. }
    26. }
    27. }
    28. }

    最后,得到收集器,开始进行收集 。

    收集器类图如下

    4.2.1 MetricsCollector

    MetricsCollector 代表一个收集器

    4.2.2 InternalLogCollector

    内部日志收集器 直接把指标打印到当前项目的日志里

    1. @Slf4j
    2. public class InternalLogCollector extends AbstractCollector {
    3. @Override
    4. public void collect(ThreadPoolStats poolStats) {
    5. log.info("dynamic.tp metrics: {}", JsonUtil.toJson(poolStats));
    6. }
    7. @Override
    8. public String type() {
    9. return CollectorTypeEnum.INTERNAL_LOGGING.name().toLowerCase();
    10. }
    11. }

    4.2.3 LogCollector

    日志收集器 将指标打印到外部日志

    1. @Slf4j
    2. public class LogCollector extends AbstractCollector {
    3. @Override
    4. public void collect(ThreadPoolStats threadPoolStats) {
    5. String metrics = JsonUtil.toJson(threadPoolStats);
    6. if (LogHelper.getMonitorLogger() == null) {
    7. log.error("Cannot find monitor logger...");
    8. return;
    9. }
    10. LogHelper.getMonitorLogger().info("{}", metrics);
    11. }
    12. @Override
    13. public String type() {
    14. return CollectorTypeEnum.LOGGING.name().toLowerCase();
    15. }
    16. }

    4.2.4 MicroMeterCollector

    指标收集 调用 Metrics.gauge api 以 HTTP 方式暴露指标,到时候可以使用 grafana 客户端查看

    1. @Slf4j
    2. public class MicroMeterCollector extends AbstractCollector {
    3. /**
    4. * Prefix used for all dtp metric names.
    5. */
    6. public static final String DTP_METRIC_NAME_PREFIX = "thread.pool";
    7. public static final String POOL_NAME_TAG = DTP_METRIC_NAME_PREFIX + ".name";
    8. public static final String APP_NAME_TAG = "app.name";
    9. private static final Map GAUGE_CACHE = new ConcurrentHashMap<>();
    10. @Override
    11. public void collect(ThreadPoolStats threadPoolStats) {
    12. // metrics must be held with a strong reference, even though it is never referenced within this class
    13. ThreadPoolStats oldStats = GAUGE_CACHE.get(threadPoolStats.getPoolName());
    14. if (Objects.isNull(oldStats)) {
    15. GAUGE_CACHE.put(threadPoolStats.getPoolName(), threadPoolStats);
    16. } else {
    17. BeanUtil.copyProperties(threadPoolStats, oldStats);
    18. }
    19. gauge(GAUGE_CACHE.get(threadPoolStats.getPoolName()));
    20. }
    21. @Override
    22. public String type() {
    23. return CollectorTypeEnum.MICROMETER.name().toLowerCase();
    24. }
    25. public void gauge(ThreadPoolStats poolStats) {
    26. Iterable tags = Lists.newArrayList(
    27. Tag.of(POOL_NAME_TAG, poolStats.getPoolName()),
    28. Tag.of(APP_NAME_TAG, CommonUtil.getInstance().getServiceName()));
    29. Metrics.gauge(metricName("core.size"), tags, poolStats, ThreadPoolStats::getCorePoolSize);
    30. Metrics.gauge(metricName("maximum.size"), tags, poolStats, ThreadPoolStats::getMaximumPoolSize);
    31. Metrics.gauge(metricName("current.size"), tags, poolStats, ThreadPoolStats::getPoolSize);
    32. Metrics.gauge(metricName("largest.size"), tags, poolStats, ThreadPoolStats::getLargestPoolSize);
    33. Metrics.gauge(metricName("active.count"), tags, poolStats, ThreadPoolStats::getActiveCount);
    34. Metrics.gauge(metricName("task.count"), tags, poolStats, ThreadPoolStats::getTaskCount);
    35. Metrics.gauge(metricName("completed.task.count"), tags, poolStats, ThreadPoolStats::getCompletedTaskCount);
    36. Metrics.gauge(metricName("wait.task.count"), tags, poolStats, ThreadPoolStats::getWaitTaskCount);
    37. Metrics.gauge(metricName("queue.size"), tags, poolStats, ThreadPoolStats::getQueueSize);
    38. Metrics.gauge(metricName("queue.capacity"), tags, poolStats, ThreadPoolStats::getQueueCapacity);
    39. Metrics.gauge(metricName("queue.remaining.capacity"), tags, poolStats, ThreadPoolStats::getQueueRemainingCapacity);
    40. Metrics.gauge(metricName("reject.count"), tags, poolStats, ThreadPoolStats::getRejectCount);
    41. Metrics.gauge(metricName("run.timeout.count"), tags, poolStats, ThreadPoolStats::getRunTimeoutCount);
    42. Metrics.gauge(metricName("queue.timeout.count"), tags, poolStats, ThreadPoolStats::getQueueTimeoutCount);
    43. }
    44. private static String metricName(String name) {
    45. return String.join(".", DTP_METRIC_NAME_PREFIX, name);
    46. }
    47. }

    5. 动态线程池 直接修改线程池参数原理

    由于线程池配置都存储在了配置中心,所以直接在配置中心修改就行。

     修改后,nacos 就可以监听到修改的内容然后就可以告知 dynamic-tp 了

    dynamic-tp 就要做一个 refresh 操作,也即是刷新最新的配置内容。

    刷新操作定义在 Refresher 类中,类图如下

     查看关键类 AbstractRefresher 源码

    doRefresh 方法由子类调用,不同的子类,不同的配置中心当监听到变化后,就来调用父类的 doRefresh 方法,父类完成刷新操作。

    1. public abstract class AbstractRefresher implements Refresher {
    2. protected void doRefresh(DtpProperties dtpProperties) {
    3. // 刷新注册器
    4. DtpRegistry.refresh(dtpProperties);
    5. // 发布刷新事件
    6. publishEvent(dtpProperties);
    7. }
    8. private void publishEvent(DtpProperties dtpProperties) {
    9. RefreshEvent event = new RefreshEvent(this, dtpProperties);
    10. ApplicationContextHolder.publishEvent(event);
    11. }
    12. }

    观察源码,做了两件事情

    1. DtpRegistry.refresh 这个就是做的真正的刷新操作
    2. 发布刷新事件,这个事件是给 三方线程池监控用的,为了松耦合的。

    看看刷新操作做了什么吧

    1. public class DtpRegistry implements ApplicationRunner {
    2. public static void refresh(DtpProperties dtpProperties) {
    3. // 简化代码
    4. dtpProperties.getExecutors().forEach(x -> {
    5. // 遍历所有配置的线程池
    6. // 根据名称获取到当前线程池
    7. ExecutorWrapper executorWrapper = EXECUTOR_REGISTRY.get(x.getThreadPoolName());
    8. if (Objects.nonNull(executorWrapper)) {
    9. // 刷新
    10. refresh(executorWrapper, x);
    11. return;
    12. }
    13. });
    14. }
    15. // 刷新
    16. private static void refresh(ExecutorWrapper executorWrapper, DtpExecutorProps props) {
    17. // 简化代码
    18. // 获取到旧的 主要参数
    19. TpMainFields oldFields = ExecutorConverter.toMainFields(executorWrapper);
    20. // 刷新
    21. doRefresh(executorWrapper, props);
    22. // 获取到新的 主要参数
    23. TpMainFields newFields = ExecutorConverter.toMainFields(executorWrapper);
    24. if (oldFields.equals(newFields)) {
    25. // 参数配置没有改变过,不做后续处理了,直接返回
    26. return;
    27. }
    28. // 参数改变了
    29. // 获取到改变的参数
    30. List diffFields = EQUATOR.getDiffFields(oldFields, newFields);
    31. List diffKeys = StreamUtil.fetchProperty(diffFields, FieldInfo::getFieldName);
    32. // 用户修改了配置中心的参数是要 发送参数改变通知 告知用户的
    33. NoticeManager.doNoticeAsync(executorWrapper, oldFields, diffKeys);
    34. }
    35. }

    首先先说后面,就是发现参数变了,就发送通知告诉用户。

    然后看中间的关键代码 doRefresh()

    1. public class DtpRegistry implements ApplicationRunner {
    2. private static void doRefresh(ExecutorWrapper executorWrapper, DtpExecutorProps props) {
    3. // 获取到当前的线程池
    4. ExecutorAdapter executor = executorWrapper.getExecutor();
    5. // 更新线程池大小参数
    6. doRefreshPoolSize(executor, props);
    7. // 更新 KeepAliveTime
    8. if (!Objects.equals(executor.getKeepAliveTime(props.getUnit()), props.getKeepAliveTime())) {
    9. executor.setKeepAliveTime(props.getKeepAliveTime(), props.getUnit());
    10. }
    11. // 更新 allowCoreThreadTimeOut
    12. if (!Objects.equals(executor.allowsCoreThreadTimeOut(), props.isAllowCoreThreadTimeOut())) {
    13. executor.allowCoreThreadTimeOut(props.isAllowCoreThreadTimeOut());
    14. }
    15. // 更新队列
    16. // update queue
    17. updateQueueProps(executor, props);
    18. if (executor instanceof DtpExecutor) {
    19. // 刷新动态线程池
    20. doRefreshDtp(executorWrapper, props);
    21. return;
    22. }
    23. // 刷新普通线程池
    24. doRefreshCommon(executorWrapper, props);
    25. }
    26. private static void doRefreshPoolSize(ExecutorAdapter executor, DtpExecutorProps props) {
    27. if (props.getMaximumPoolSize() < executor.getMaximumPoolSize()) {
    28. if (!Objects.equals(executor.getCorePoolSize(), props.getCorePoolSize())) {
    29. executor.setCorePoolSize(props.getCorePoolSize());
    30. }
    31. if (!Objects.equals(executor.getMaximumPoolSize(), props.getMaximumPoolSize())) {
    32. executor.setMaximumPoolSize(props.getMaximumPoolSize());
    33. }
    34. return;
    35. }
    36. if (!Objects.equals(executor.getMaximumPoolSize(), props.getMaximumPoolSize())) {
    37. executor.setMaximumPoolSize(props.getMaximumPoolSize());
    38. }
    39. if (!Objects.equals(executor.getCorePoolSize(), props.getCorePoolSize())) {
    40. executor.setCorePoolSize(props.getCorePoolSize());
    41. }
    42. }
    43. }

    所谓刷新,本质就是对改变的参数重新 set 为新值而已。

    6. 三方组件线程池管理源码

    三方组件线程池代码是在 adapter 模块下,名称为 adapter-三方组件名。

    这到底是怎么管理的呢,比如说,为什么可以管理到 rockqtMq 的线程池?

    下面来一探究竟,就具一个模块的例子,其他都大差不差。

    6.1 DtpAdapterListener

    得先介绍公共模块的这个类,监听器,就是用来监听上面发的 Spring 事件的。

    1. public class DtpAdapterListener implements GenericApplicationListener {
    2. // 简化代码
    3. @Override
    4. public void onApplicationEvent(@NonNull ApplicationEvent event) {
    5. if (event instanceof RefreshEvent) {
    6. doRefresh(((RefreshEvent) event).getDtpProperties());
    7. } else if (event instanceof CollectEvent) {
    8. doCollect(((CollectEvent) event).getDtpProperties());
    9. } else if (event instanceof AlarmCheckEvent) {
    10. doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
    11. }
    12. }
    13. }

    这里只以刷新事件为例

    1. public class DtpAdapterListener implements GenericApplicationListener {
    2. protected void doRefresh(DtpProperties dtpProperties) {
    3. val handlerMap = ApplicationContextHolder.getBeansOfType(DtpAdapter.class);
    4. if (CollectionUtils.isEmpty(handlerMap)) {
    5. return;
    6. }
    7. handlerMap.forEach((k, v) -> v.refresh(dtpProperties));
    8. }
    9. }

    最终回调到举例的 RocketMQ

    RocketMqDtpAdapter#refresh

    1. public class RocketMqDtpAdapter extends AbstractDtpAdapter {
    2. private static final String NAME = "rocketMqTp";
    3. private static final String CONSUME_EXECUTOR_FIELD_NAME = "consumeExecutor";
    4. @Override
    5. public void refresh(DtpProperties dtpProperties) {
    6. // 调用父类的 refresh
    7. refresh(NAME, dtpProperties.getRocketMqTp(), dtpProperties.getPlatforms());
    8. }
    9. }

    为什么又调回去了呢,其实是因为要传递当前线程池相关信息啦

    也就是,每个三方监控类传递给父类自己的线程池相关信息,父类统一对抽象继续处理。

    因为每个子类的配置来源都不一样,这些差异由子类自己实现。

    子类线程池是哪里来的呢?

    父类 AbstractDtpAdapter 提供了 initialize 初始化回调

    RocketMqDtpAdapter#initialize 源码如此实现

    1. public class RocketMqDtpAdapter extends AbstractDtpAdapter {
    2. @Override
    3. protected void initialize() {
    4. super.initialize();
    5. // 消费者 consumer 线程池
    6. adaptConsumerExecutors();
    7. // 生产者 producer 线程池
    8. adaptProducerExecutors();
    9. }
    10. public void adaptConsumerExecutors() {
    11. val beans = ApplicationContextHolder.getBeansOfType(DefaultRocketMQListenerContainer.class);
    12. if (MapUtils.isEmpty(beans)) {
    13. log.warn("Cannot find beans of type DefaultRocketMQListenerContainer.");
    14. return;
    15. }
    16. beans.forEach((k, v) -> {
    17. DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) v;
    18. DefaultMQPushConsumer consumer = container.getConsumer();
    19. val pushConsumer = (DefaultMQPushConsumerImpl) ReflectionUtil.getFieldValue(DefaultMQPushConsumer.class,
    20. "defaultMQPushConsumerImpl", consumer);
    21. if (Objects.isNull(pushConsumer)) {
    22. return;
    23. }
    24. String cusKey = container.getConsumerGroup() + "#" + container.getTopic();
    25. ThreadPoolExecutor executor = null;
    26. val consumeMessageService = pushConsumer.getConsumeMessageService();
    27. if (consumeMessageService instanceof ConsumeMessageConcurrentlyService) {
    28. executor = (ThreadPoolExecutor) ReflectionUtil.getFieldValue(ConsumeMessageConcurrentlyService.class,
    29. CONSUME_EXECUTOR_FIELD_NAME, consumeMessageService);
    30. } else if (consumeMessageService instanceof ConsumeMessageOrderlyService) {
    31. executor = (ThreadPoolExecutor) ReflectionUtil.getFieldValue(ConsumeMessageOrderlyService.class,
    32. CONSUME_EXECUTOR_FIELD_NAME, consumeMessageService);
    33. }
    34. if (Objects.nonNull(executor)) {
    35. val executorWrapper = new ExecutorWrapper(cusKey, executor);
    36. initNotifyItems(cusKey, executorWrapper);
    37. executors.put(cusKey, executorWrapper);
    38. }
    39. });
    40. }
    41. public void adaptProducerExecutors() {
    42. val beans = ApplicationContextHolder.getBeansOfType(DefaultMQProducer.class);
    43. if (MapUtils.isEmpty(beans)) {
    44. log.warn("Cannot find beans of type TransactionMQProducer.");
    45. return;
    46. }
    47. beans.forEach((k, v) -> {
    48. DefaultMQProducer defaultMQProducer = (DefaultMQProducer) v;
    49. val producer = (DefaultMQProducerImpl) ReflectionUtil.getFieldValue(DefaultMQProducer.class,
    50. "defaultMQProducerImpl", defaultMQProducer);
    51. if (Objects.isNull(producer)) {
    52. return;
    53. }
    54. String proKey = defaultMQProducer.getProducerGroup() + "#" + defaultMQProducer.getCreateTopicKey();
    55. ThreadPoolExecutor executor = (ThreadPoolExecutor) producer.getAsyncSenderExecutor();
    56. if (Objects.nonNull(executor)) {
    57. val executorWrapper = new ExecutorWrapper(proKey, executor);
    58. initNotifyItems(proKey, executorWrapper);
    59. executors.put(proKey, executorWrapper);
    60. }
    61. });
    62. }
    63. }

    观察源码,实际上使用的是反射取到的线程池。

     利用反射取到线程池后再包装为动态线程池后,放入父类的 executors map 中。

    最后子类调用父类的 refresh ,父类也能从 executors map 拿到当前子类的 线程池,就可以走之前的流程了。

  • 相关阅读:
    基于Matlab+ AlexNet神经网络的动物识别系统
    剑指 Offer 13. 机器人的运动范围
    在HBuilderX的git上导入github项目/把项目传到github
    阿里二面:列出 Api 接口优化的几个技巧
    学习阶段单片机买esp32还是stm32?
    git stash 问题记录
    css常见布局
    Vue、js底层深入理解笔记(二)
    Java将获取的参数,图片以及pdf文件放入到word文档指定位置
    测量信号的功率
  • 原文地址:https://blog.csdn.net/weixin_42195284/article/details/130873180