• Spring-Cloud如何异步跨线程查询链路日志(附实例)


    概述

    由于分布式系统节点众多,排查错误日志要涉及到多个节点,如果在多个节点中没有唯一的请求id来把各个节点的请求日志串联起来,那么查询起来就会耗时耗力,因此Spring Sleuth出现了(Spring Sleuth基于Google dapper论文实现,详细了解可以查看此论文),Sleuth会在接收请求的入口通过Filter生成唯一的标识TraceId,这个TraceId会一直跟随请求路径传递到各个节点,只要有日志输出就会把TraceId的值打印出来,如下图(正常还会生成SpanId,为了便于理解没展现)

    假如线上发生问题,要排查日志,那么根据这个TraceId,就能够快速查询到各个节点对应的请求日志,但是唯一的遗憾是异步执行会丢失TraceId,因此这里介绍异步跨线程下如何保证TraceId不丢失的问题

    我们在官方文档中找到了异步传递Traceid说明,如下图

    大致意思Sleuth默认支持@Async传递TraceId,并且支持spring.sleuth.async.enabled进行控制,同时提供了

    • LazyTraceExecutor
    • TraceableExecutorService
    • TraceableScheduledExecutorService

    线程包装类,来支持跨线程传递TraceId,其中TraceableScheduledExecutorService是ScheduledExecutorService类的实现,用于实现定时任务触发,个人觉得这种需求不是特别多,所以只介绍常用的一些配置,比如@Async配置、线程池配置、EventBus配置,具体查看后续章节

    Asnc配置

    默认Sleuth是支持@Async注解异步传递TraceId的,但是如果自定义线程池,配置不对的情况可能就会导致失效,因为Spring在这快有个bug,详细了解请查看以下链接:

    github.com/spring-proj…

    github.com/spring-proj…

    所以正确配置方法有如下3种

    配置方法

    方式1(推荐)

    这里用到了Sleuth的LazyTraceExecutor包装了线程池,这样可以保证trace对象传到下一个线程中

    1. @Configuration
    2. @EnableAsync
    3. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    4. public class SpringAsyncConfig extends AsyncConfigurerSupport {
    5. @Autowired
    6. private BeanFactory beanFactory;
    7. @Override
    8. public Executor getAsyncExecutor() {
    9. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    10. executor.setCorePoolSize(10);
    11. executor.setMaxPoolSize(10);
    12. executor.setQueueCapacity(500);
    13. executor.setThreadNamePrefix("AsyncExecutor-");
    14. executor.initialize();
    15. return new LazyTraceExecutor(this.beanFactory, executor);
    16. }
    17. }
    18. 复制代码

    方式2

    Sleuth初始化时会默认查找TaskExecutor作为Async的线程池,如果查找不到会获取默认的线程池

    1. @EnableAsync
    2. @Configuration
    3. public class WebConfig {
    4. @Bean
    5. public TaskExecutor getAsyncExecutor() {
    6. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    7. executor.setCorePoolSize(10);
    8. executor.setMaxPoolSize(10);
    9. executor.setQueueCapacity(500);
    10. executor.setThreadNamePrefix("AsyncExecutor-");
    11. executor.initialize();
    12. return executor;
    13. }
    14. }
    15. 复制代码

    方式3

    如果默认不配置任何线程池,只在工程中加了@EnableAsync 注解,那么Sleuth会使用自带的线程池SimpleAsyncTaskExecutor,这个线程池每次调用都会创建新线程,如果调用量比较多,创建的线程也会非常多,我们知道系统资源是有限的,如果线程数过多,会导致程序内存吃紧,从而导致OOM,所以不推荐使用这种方式

    测试验证

    测试代码

    Async配置

    1. @Configuration
    2. @EnableAsync
    3. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    4. public class SpringAsyncConfig extends AsyncConfigurerSupport {
    5. @Autowired
    6. private BeanFactory beanFactory;
    7. @Override
    8. public Executor getAsyncExecutor() {
    9. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    10. executor.setCorePoolSize(10);
    11. executor.setMaxPoolSize(10);
    12. executor.setQueueCapacity(500);
    13. executor.setThreadNamePrefix("AsyncExecutor-");
    14. executor.initialize();
    15. return new LazyTraceExecutor(this.beanFactory, executor);
    16. }
    17. }
    18. 复制代码

    Service

    1. @Service
    2. @Slf4j
    3. public class TestService {
    4. @Async
    5. public void printAsyncLog() {
    6. log.info("async log.....");
    7. }
    8. }
    9. 复制代码

    Controller

    1. @Slf4j
    2. @RestController
    3. @RequestMapping("/test/async")
    4. public class AsyncTestWeb {
    5. @Autowired
    6. private TestService testService;
    7. @RequestMapping(value = "/print/log", method = RequestMethod.GET)
    8. public String printLog() {
    9. log.info("sync log.....1.....");
    10. testService.printAsyncLog();
    11. log.info("sync log.....2.....");
    12. return "success";
    13. }
    14. }
    15. 复制代码

    请求测试

    执行请求test/async/print/log,输出以下信息,可以看到TraceId一样,只有Spanid发生了变化,线程名称前缀AsyncExecutor与设置前缀相同

    1. 19:44:54.818, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com.example.elkdemo.web.AsyncTestWeb printLog:30 - sync log.....1.....
    2. 19:44:54.819, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com.example.elkdemo.web.AsyncTestWeb printLog:32 - sync log.....2.....
    3. 19:44:54.819, [fae1c9449e12695f 2d51edbb45896bd8] [AsyncExecutor-2] INFO [] c.e.elkdemo.service.TestService printAsyncLog:50 - async log.....
    4. 复制代码

    线程池配置

    线程池执行是通过TraceableExecutorService包装了ExecutorService,而且在初始化的时候需要注入进去BeanFactory对象,所以线程池作为全局变量和局部变量配置稍有不同,注意下面线程池设置只是示例代码,实际运用中可以根据需求自行修改

    全局变量配置

    构造函数初始化(推荐)

    1. @Service
    2. @Slf4j
    3. public class TestService{
    4. final BeanFactory beanFactory;
    5. private TraceableExecutorService traceableExecutorService;
    6. public TestService(BeanFactory beanFactory1) {
    7. this.beanFactory = beanFactory1;
    8. this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
    9. }
    10. /**
    11. * 异步输出线程池日志
    12. */
    13. public void printThreadPoolLog() {
    14. traceableExecutorService.execute(() -> log.info("async thread pool log....."));
    15. }
    16. }
    17. 复制代码

    单例初始化

    1. @Service
    2. @Slf4j
    3. public class TestService {
    4. @Autowired
    5. private BeanFactory beanFactory;
    6. volatile TraceableExecutorService traceableExecutorService;
    7. public TraceableExecutorService getTraceableExecutorService() {
    8. if (traceableExecutorService == null) {
    9. synchronized (TraceableExecutorService.class) {
    10. if (traceableExecutorService == null) {
    11. traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
    12. }
    13. }
    14. }
    15. return traceableExecutorService;
    16. }
    17. /**
    18. * 异步输出线程池日志
    19. */
    20. public void printThreadPoolLog() {
    21. TraceableExecutorService executorService = getTraceableExecutorService();
    22. executorService.execute(() -> log.info("async thread pool log....."));
    23. }
    24. }
    25. 复制代码

    通过InitializingBean的afterPropertiesSet进行初始化

    1. @Service
    2. @Slf4j
    3. public class TestService implements InitializingBean {
    4. @Autowired
    5. private BeanFactory beanFactory;
    6. private TraceableExecutorService traceableExecutorService;
    7. @Override
    8. public void afterPropertiesSet() {
    9. traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
    10. }
    11. /**
    12. * 异步输出线程池日志
    13. */
    14. public void printThreadPoolLog() {
    15. traceableExecutorService.execute(() -> log.info("async thread pool log....."));
    16. }
    17. }
    18. 复制代码

    局部变量配置

    1. /**
    2. * 异步输出线程池日志
    3. */
    4. public void printThreadPoolLog2() {
    5. TraceableExecutorService executorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
    6. executorService.execute(() -> log.info("async thread pool log....."));
    7. }
    8. 复制代码

    测试验证

    这里采用全局变量配置方式测试

    测试代码

    Controller

    1. @Slf4j
    2. @RestController
    3. @RequestMapping("/test/async")
    4. public class AsyncTestWeb {
    5. @Autowired
    6. private TestService testService;
    7. @RequestMapping(value = "/print/threadPool/log", method = RequestMethod.GET)
    8. public String printThreadPoolLog() {
    9. log.info("sync log.....1.....");
    10. testService.printThreadPoolLog();
    11. log.info("sync log.....2.....");
    12. return "success";
    13. }
    14. }
    15. 复制代码

    Service

    service采用构造函数方式进行初始化

    1. @Service
    2. @Slf4j
    3. public class TestService{
    4. final BeanFactory beanFactory;
    5. private TraceableExecutorService traceableExecutorService;
    6. public TestService(BeanFactory beanFactory1) {
    7. this.beanFactory = beanFactory1;
    8. this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
    9. }
    10. /**
    11. * 异步输出线程池日志
    12. */
    13. public void printThreadPoolLog() {
    14. traceableExecutorService.execute(() -> log.info("async thread pool log....."));
    15. }
    16. }
    17. 复制代码

    请求测试

    执行请求/test/async/print/threadPool/log,输出以下信息,可以看到Traceid一样,只有Spanid发生了变化

    1. 19:35:13.799, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:38 - sync log.....1.....
    2. 19:35:13.801, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:40 - sync log.....2.....
    3. 19:35:13.801, [884212fb58c658c5 70008b8d3a97602d] [pool-4-thread-2] INFO [] c.e.elkdemo.service.TestService lambda$printThreadPoolLog$0:37 - async thread pool log.....
    4. 复制代码

    EventBus配置

    EventBus配置与线程池配置类似,把TraceableExecutorService注入到AsyncEventBus中即可,因TraceableExecutorService类引用了BeanFactory实例,所以比原生方式复杂了一点,以下只介绍构造函数的初始化方式,其他初始化方式与线程池配置类似,所以这里就不再举例说明

    构造函数进行初始化

    1. @Component
    2. @Slf4j
    3. public class PushEventBus {
    4. private EventBus eventBus;
    5. public PushEventBus(BeanFactory beanFactory) {
    6. Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
    7. this.eventBus = new AsyncEventBus(traceableExecutorService);
    8. }
    9. public void register(Object obj) {
    10. eventBus.register(obj);
    11. }
    12. public void post(Object obj) {
    13. eventBus.post(obj);
    14. }
    15. }
    16. 复制代码

    测试验证

    测试代码

    EventBus

    1. @Component
    2. @Slf4j
    3. public class PushEventBus {
    4. private EventBus eventBus;
    5. public PushEventBus(BeanFactory beanFactory) {
    6. Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
    7. this.eventBus = new AsyncEventBus(traceableExecutorService);
    8. }
    9. public void register(Object obj) {
    10. eventBus.register(obj);
    11. }
    12. public void post(Object obj) {
    13. eventBus.post(obj);
    14. }
    15. }
    16. 复制代码

    监听类

    1. @Slf4j
    2. public class EventListener {
    3. /**
    4. * 监听 Integer 类型的消息
    5. */
    6. @Subscribe
    7. public void listenInteger(Integer param) {
    8. log.info("EventListener#listenInteger->{}",param);
    9. }
    10. /**
    11. * 监听 String 类型的消息
    12. */
    13. @Subscribe
    14. public void listenString(String param) {
    15. log.info("EventListener#listenString->{}",param);
    16. }
    17. }
    18. 复制代码

    controller

    1. @Slf4j
    2. @RestController
    3. @RequestMapping("/test/async")
    4. public class AsyncTestWeb {
    5. @Autowired
    6. private PushEventBus pushEventBus;
    7. @RequestMapping(value = "/print/guava/log", method = RequestMethod.GET)
    8. public String printGuavaLog() {
    9. pushEventBus.register(new EventListener());
    10. log.info("sync log.....1.....");
    11. pushEventBus.post("11");
    12. log.info("sync log.....2.....");
    13. return "success";
    14. }
    15. }
    16. 复制代码

    请求测试

    执行请求/test/async/print/guava/log,输出以下信息,可以看到Traceid一样,只有Spanid发生了变化

    1. 19:27:44.234, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:48 - sync log.....1.....
    2. 19:27:44.236, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:50 - sync log.....2.....
    3. 19:27:44.236, [50844e0d3909868c 702bf55c84873f17] [pool-3-thread-1] INFO [] c.e.elkdemo.service.EventListener listenString:21 - EventListener#listenString->11
    4. 复制代码
  • 相关阅读:
    linux系统shell脚本开发之循环的使用
    MMPose 1.0:迈向更优雅、更强大的姿态估计研发和应用框架
    HDFS集成Kerberos并使用Python调用
    花儿朵朵-全自动视频混剪,批量剪辑批量剪视频,探店带货系统,精细化顺序混剪,故事影视解说,视频处理大全,精细化顺序混剪,多场景裂变,多视频混剪
    小白也能看懂的国内外 AI 芯片概述
    ADCIRC模式与Python融合技术应用
    Xavier MAC与PHY自适应速率分析-代码分析
    2023面试题
    java基础 集合2
    Kubernetes(k8s)高可用搭建
  • 原文地址:https://blog.csdn.net/m0_73311735/article/details/126869707