
由于分布式系统节点众多,排查错误日志要涉及到多个节点,如果在多个节点中没有唯一的请求id来把各个节点的请求日志串联起来,那么查询起来就会耗时耗力,因此Spring Sleuth出现了(Spring Sleuth基于Google dapper论文实现,详细了解可以查看此论文),Sleuth会在接收请求的入口通过Filter生成唯一的标识TraceId,这个TraceId会一直跟随请求路径传递到各个节点,只要有日志输出就会把TraceId的值打印出来,如下图(正常还会生成SpanId,为了便于理解没展现)

假如线上发生问题,要排查日志,那么根据这个TraceId,就能够快速查询到各个节点对应的请求日志,但是唯一的遗憾是异步执行会丢失TraceId,因此这里介绍异步跨线程下如何保证TraceId不丢失的问题
我们在官方文档中找到了异步传递Traceid说明,如下图

大致意思Sleuth默认支持@Async传递TraceId,并且支持spring.sleuth.async.enabled进行控制,同时提供了
LazyTraceExecutorTraceableExecutorServiceTraceableScheduledExecutorService线程包装类,来支持跨线程传递TraceId,其中TraceableScheduledExecutorService是ScheduledExecutorService类的实现,用于实现定时任务触发,个人觉得这种需求不是特别多,所以只介绍常用的一些配置,比如@Async配置、线程池配置、EventBus配置,具体查看后续章节
默认Sleuth是支持@Async注解异步传递TraceId的,但是如果自定义线程池,配置不对的情况可能就会导致失效,因为Spring在这快有个bug,详细了解请查看以下链接:
所以正确配置方法有如下3种
这里用到了Sleuth的LazyTraceExecutor包装了线程池,这样可以保证trace对象传到下一个线程中
- @Configuration
- @EnableAsync
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public class SpringAsyncConfig extends AsyncConfigurerSupport {
-
- @Autowired
- private BeanFactory beanFactory;
-
- @Override
- public Executor getAsyncExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(10);
- executor.setMaxPoolSize(10);
- executor.setQueueCapacity(500);
- executor.setThreadNamePrefix("AsyncExecutor-");
- executor.initialize();
- return new LazyTraceExecutor(this.beanFactory, executor);
- }
-
- }
- 复制代码
Sleuth初始化时会默认查找TaskExecutor作为Async的线程池,如果查找不到会获取默认的线程池
- @EnableAsync
- @Configuration
- public class WebConfig {
- @Bean
- public TaskExecutor getAsyncExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(10);
- executor.setMaxPoolSize(10);
- executor.setQueueCapacity(500);
- executor.setThreadNamePrefix("AsyncExecutor-");
- executor.initialize();
- return executor;
- }
- }
- 复制代码
如果默认不配置任何线程池,只在工程中加了@EnableAsync 注解,那么Sleuth会使用自带的线程池SimpleAsyncTaskExecutor,这个线程池每次调用都会创建新线程,如果调用量比较多,创建的线程也会非常多,我们知道系统资源是有限的,如果线程数过多,会导致程序内存吃紧,从而导致OOM,所以不推荐使用这种方式
Async配置
- @Configuration
- @EnableAsync
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public class SpringAsyncConfig extends AsyncConfigurerSupport {
-
- @Autowired
- private BeanFactory beanFactory;
-
- @Override
- public Executor getAsyncExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(10);
- executor.setMaxPoolSize(10);
- executor.setQueueCapacity(500);
- executor.setThreadNamePrefix("AsyncExecutor-");
- executor.initialize();
- return new LazyTraceExecutor(this.beanFactory, executor);
- }
-
- }
- 复制代码
Service
- @Service
- @Slf4j
- public class TestService {
- @Async
- public void printAsyncLog() {
- log.info("async log.....");
- }
-
- }
- 复制代码
Controller
- @Slf4j
- @RestController
- @RequestMapping("/test/async")
- public class AsyncTestWeb {
- @Autowired
- private TestService testService;
-
- @RequestMapping(value = "/print/log", method = RequestMethod.GET)
- public String printLog() {
- log.info("sync log.....1.....");
- testService.printAsyncLog();
- log.info("sync log.....2.....");
- return "success";
- }
- }
- 复制代码
执行请求test/async/print/log,输出以下信息,可以看到TraceId一样,只有Spanid发生了变化,线程名称前缀AsyncExecutor与设置前缀相同
- 19:44:54.818, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com.example.elkdemo.web.AsyncTestWeb printLog:30 - sync log.....1.....
- 19:44:54.819, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com.example.elkdemo.web.AsyncTestWeb printLog:32 - sync log.....2.....
- 19:44:54.819, [fae1c9449e12695f 2d51edbb45896bd8] [AsyncExecutor-2] INFO [] c.e.elkdemo.service.TestService printAsyncLog:50 - async log.....
- 复制代码
线程池执行是通过TraceableExecutorService包装了ExecutorService,而且在初始化的时候需要注入进去BeanFactory对象,所以线程池作为全局变量和局部变量配置稍有不同,注意下面线程池设置只是示例代码,实际运用中可以根据需求自行修改
- @Service
- @Slf4j
- public class TestService{
- final BeanFactory beanFactory;
- private TraceableExecutorService traceableExecutorService;
-
- public TestService(BeanFactory beanFactory1) {
- this.beanFactory = beanFactory1;
- this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
- }
-
- /**
- * 异步输出线程池日志
- */
- public void printThreadPoolLog() {
- traceableExecutorService.execute(() -> log.info("async thread pool log....."));
- }
-
- }
- 复制代码
- @Service
- @Slf4j
- public class TestService {
-
- @Autowired
- private BeanFactory beanFactory;
-
- volatile TraceableExecutorService traceableExecutorService;
-
- public TraceableExecutorService getTraceableExecutorService() {
- if (traceableExecutorService == null) {
- synchronized (TraceableExecutorService.class) {
- if (traceableExecutorService == null) {
- traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
- }
- }
- }
- return traceableExecutorService;
- }
-
- /**
- * 异步输出线程池日志
- */
- public void printThreadPoolLog() {
- TraceableExecutorService executorService = getTraceableExecutorService();
- executorService.execute(() -> log.info("async thread pool log....."));
- }
-
- }
- 复制代码
- @Service
- @Slf4j
- public class TestService implements InitializingBean {
-
- @Autowired
- private BeanFactory beanFactory;
-
- private TraceableExecutorService traceableExecutorService;
-
- @Override
- public void afterPropertiesSet() {
- traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
- }
-
- /**
- * 异步输出线程池日志
- */
- public void printThreadPoolLog() {
- traceableExecutorService.execute(() -> log.info("async thread pool log....."));
- }
-
- }
- 复制代码
- /**
- * 异步输出线程池日志
- */
- public void printThreadPoolLog2() {
- TraceableExecutorService executorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
- executorService.execute(() -> log.info("async thread pool log....."));
- }
- 复制代码
这里采用全局变量配置方式测试
Controller
- @Slf4j
- @RestController
- @RequestMapping("/test/async")
- public class AsyncTestWeb {
- @Autowired
- private TestService testService;
-
- @RequestMapping(value = "/print/threadPool/log", method = RequestMethod.GET)
- public String printThreadPoolLog() {
- log.info("sync log.....1.....");
- testService.printThreadPoolLog();
- log.info("sync log.....2.....");
- return "success";
- }
- }
- 复制代码
Service
service采用构造函数方式进行初始化
- @Service
- @Slf4j
- public class TestService{
- final BeanFactory beanFactory;
- private TraceableExecutorService traceableExecutorService;
-
- public TestService(BeanFactory beanFactory1) {
- this.beanFactory = beanFactory1;
- this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
- }
- /**
- * 异步输出线程池日志
- */
- public void printThreadPoolLog() {
- traceableExecutorService.execute(() -> log.info("async thread pool log....."));
- }
-
- }
- 复制代码
执行请求/test/async/print/threadPool/log,输出以下信息,可以看到Traceid一样,只有Spanid发生了变化
- 19:35:13.799, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:38 - sync log.....1.....
- 19:35:13.801, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:40 - sync log.....2.....
- 19:35:13.801, [884212fb58c658c5 70008b8d3a97602d] [pool-4-thread-2] INFO [] c.e.elkdemo.service.TestService lambda$printThreadPoolLog$0:37 - async thread pool log.....
- 复制代码
EventBus配置与线程池配置类似,把TraceableExecutorService注入到AsyncEventBus中即可,因TraceableExecutorService类引用了BeanFactory实例,所以比原生方式复杂了一点,以下只介绍构造函数的初始化方式,其他初始化方式与线程池配置类似,所以这里就不再举例说明
- @Component
- @Slf4j
- public class PushEventBus {
-
- private EventBus eventBus;
-
- public PushEventBus(BeanFactory beanFactory) {
- Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
- this.eventBus = new AsyncEventBus(traceableExecutorService);
- }
-
- public void register(Object obj) {
- eventBus.register(obj);
- }
-
- public void post(Object obj) {
- eventBus.post(obj);
- }
-
- }
- 复制代码
EventBus
- @Component
- @Slf4j
- public class PushEventBus {
-
- private EventBus eventBus;
-
- public PushEventBus(BeanFactory beanFactory) {
- Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
- this.eventBus = new AsyncEventBus(traceableExecutorService);
- }
-
- public void register(Object obj) {
- eventBus.register(obj);
- }
-
- public void post(Object obj) {
- eventBus.post(obj);
- }
-
- }
- 复制代码
监听类
- @Slf4j
- public class EventListener {
- /**
- * 监听 Integer 类型的消息
- */
- @Subscribe
- public void listenInteger(Integer param) {
- log.info("EventListener#listenInteger->{}",param);
- }
-
- /**
- * 监听 String 类型的消息
- */
- @Subscribe
- public void listenString(String param) {
- log.info("EventListener#listenString->{}",param);
- }
- }
- 复制代码
controller
- @Slf4j
- @RestController
- @RequestMapping("/test/async")
- public class AsyncTestWeb {
-
- @Autowired
- private PushEventBus pushEventBus;
-
- @RequestMapping(value = "/print/guava/log", method = RequestMethod.GET)
- public String printGuavaLog() {
- pushEventBus.register(new EventListener());
- log.info("sync log.....1.....");
- pushEventBus.post("11");
- log.info("sync log.....2.....");
- return "success";
- }
- }
- 复制代码
执行请求/test/async/print/guava/log,输出以下信息,可以看到Traceid一样,只有Spanid发生了变化
- 19:27:44.234, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:48 - sync log.....1.....
- 19:27:44.236, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:50 - sync log.....2.....
- 19:27:44.236, [50844e0d3909868c 702bf55c84873f17] [pool-3-thread-1] INFO [] c.e.elkdemo.service.EventListener listenString:21 - EventListener#listenString->11
- 复制代码