dubbo在使用时,都是通过创建真实的业务线程池进行操作的。目前已知的线程池模型有两个和java中的相互对应:
在真实的使用过程中可能会因为使用fix模式的线程池,导致具体某些业务场景因为线程池中的线程数量不足而产生错误,而很多业务研发是对这些无感知的,只有当出现错误的时候才会去查看告警或者通过客户反馈出现严重的问题才去查看,结果发现是线程池满了。所以可以在创建线程池的时,通过某些手段对这个线程池进行监控,这样就可以进行及时的扩缩容机器或者告警。下面的这个程序就是这样子的,会在创建线程池后进行对其监控,并且及时作出相应处理。
(1)线程池实现, 这里主要是基于对FixedThreadPool 中的实现做扩展出线程监控的部分
- package com.lagou.threadpool;
-
- import org.apache.dubbo.common.URL;
- import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.util.Map;
- import java.util.concurrent.*;
-
- public class WachingThreadPool extends FixedThreadPool implements Runnable {
- private static final Logger LOGGER = LoggerFactory.getLogger(WachingThreadPool.class);
- // 定义线程池使用的阀值
- private static final double ALARM_PERCENT = 0.90;
- private final Map
THREAD_POOLS = new ConcurrentHashMap<>(); -
- public WachingThreadPool() {
- // 每隔3秒打印线程使用情况
- Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this, 1, 3, TimeUnit.SECONDS);
- }
-
- // 通过父类创建线程池
- @Override
- public Executor getExecutor(URL url) {
- final Executor executor = super.getExecutor(url);
- if (executor instanceof ThreadPoolExecutor) {
- THREAD_POOLS.put(url, (ThreadPoolExecutor) executor);
- }
- return executor;
- }
-
- @Override
- public void run() {
- // 遍历线程池
- for (Map.Entry
entry : THREAD_POOLS.entrySet()) { - final URL url = entry.getKey();
- final ThreadPoolExecutor executor = entry.getValue();
- // 计算相关指标
- final int activeCount = executor.getActiveCount();
- final int poolSize = executor.getCorePoolSize();
- double usedPercent = activeCount / (poolSize * 1.0);
- LOGGER.info("线程池执行状态:[{}/{}:{}%]", activeCount, poolSize, usedPercent * 100);
- if (usedPercent > ALARM_PERCENT) {
- LOGGER.error("超出警戒线! host:{} 当前使用率是:{},URL:{}", url.getIp(), usedPercent * 100, url);
- }
-
- }
- }
- }
(2)SPI声明,创建文件(固定的)
META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool
watching=包名.线程池名
(3)在服务提供方项目引入该依赖
(4)在服务提供方项目中设置使用该线程池生成器
dubbo.provider.threadpool=watching
(5)接下来需要做的就是模拟整个流程,因为该线程当前是每1秒抓一次数据,所以我们需要对该方法的提供者超过1秒的时间(比如这里用休眠Thread.sleep ),消费者则需要启动多个线程来并行执行,来模拟整个并发情况。
(6)在调用方则尝试简单通过for循环启动多个线程来执行 查看服务提供方的监控情况
- package com.lagou;
-
- import com.lagou.bean.ConsumerComponent;
- import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
- import org.springframework.context.annotation.AnnotationConfigApplicationContext;
- import org.springframework.context.annotation.ComponentScan;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.PropertySource;
-
- import java.io.IOException;
-
-
- public class AnnotationConsumerMain {
- public static void main(String[] args) throws IOException, InterruptedException {
- AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class);
- context.start();
- ConsumerComponent service = context.getBean(ConsumerComponent.class);
- while (true) {
- for (int i = 0; i < 1000; i++) {
- Thread.sleep(5);
- new Thread(new Runnable() {
- @Override
- public void run() {
- String msg = service.sayHello("world", 0);
- System.out.println(msg);
- }
- }).start();
- }
- }
- }
-
- @Configuration
- @PropertySource("classpath:/dubbo-consumer.properties")
- //@EnableDubbo(scanBasePackages = "com.lagou.bean")
- @ComponentScan("com.lagou.bean")
- @EnableDubbo
- static class ConsumerConfiguration {
-
- }
- }