• 链路日志中追踪traceId


    一,使用traceId概述

    平时出现问题查询日志是程序员的解决方式,日志一般会从服务器的日志文件,然后查找自己需要的日志,或者日志输出到es中,在es中进行搜索日志,可是对于目前流行的微服务或者单体服务,将日志串起来去查看并不是一件容易的事情,一般微服务会调用多个系统,有http请求的,有mq的等会产生大量的日志,根据日志快速定位到具体的问题才是我们想要的解决方案,毕竟要用最短的时间找到问题所在,并快速解决。目前的elk搜集日志,也只是把所有的日志搜集起来,并没有将具体的日志按照请求串起来,所以这个目前需要在日志中添加traceId进行日志的追踪。

    二,请求的源头

    1,http请求
    思路
    在最开始请求系统时候生成一个全局唯一的traceId,放在http 请求header中,系统接收到请求后,从header中取出这个traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期,即当一个服务调用另外一个服务的时候,需要将traceId往下传递,从而形成一条链路。
    实现
    @Service
    public class TraceInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
    String traceId = request.getHeader(ElkConstants.TRACE_ID);
    if (StringUtils.isNotEmpty(traceId)) {
    MDC.put(ElkConstants.TRACE_ID, traceId);
    } else {
    MDC.put(ElkConstants.TRACE_ID, UUID.randomUUID().toString());
    }
    return true;
    }

        @Override
        public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
            MDC.remove(ElkConstants.TRACE_ID);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    @Configuration
    public class InterceptorConfig implements WebMvcConfigurer {

        @Resource
        private TraceInterceptor traceInterceptor;
    
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            registry.addInterceptor(traceInterceptor).addPathPatterns("/**");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    public class ElkConstants {

    /**
     * TRACE_ID
     */
    public static final String TRACE_ID = "traceId";
    
    • 1
    • 2
    • 3
    • 4

    }
    2,定时任务Task
    思路
    在定时任务的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
    实现
    @Component
    @Slf4j
    public class SyncTask extends BaseTask {

    /**
     * 定时任务
     */
    public void sync() {
        //设置traceId
        setTraceId();
        //自己的业务逻辑
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    }
    public abstract class BaseTask {

    public void setTraceId() {
        MDC.put(ElkConstants.TRACE_ID, UUID.randomUUID().toString());
    }
    
    • 1
    • 2
    • 3

    }
    3,微服务(Feign)
    思路
    在请求的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
    实现
    @Configuration
    public class FeignInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate requestTemplate) {
    requestTemplate.header(ElkConstants.TRACE_ID, (String) MDC.get(ElkConstants.TRACE_ID));
    }
    }
    4,Mq的方式
    思路
    使用mq进行消息发送的时候,可以将请求发送消息的traceId从mdc中取出来,我们可以放到消息体中,当做一个字段,然后在消息端消费的时候,从消息体中获取到traceId,并放入到MDC中,伴随着此次的请求
    实现(kafka为列子,其他也可以按照思路进行不同的实现)
    @Component
    public class KafkaSender implements MessageSender {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Autowired
    private KafkaConfig kafkaConfig;
    
    private static final Logger log = LoggerFactory.getLogger(KafkaSender.class);
    
    /**
     * 异步发送
     *
     * @param mqSendEvent
     * @return
     */
    @Override
    public boolean sendAsyncMessage(MqSendEvent mqSendEvent) {
       try {
            mqSendEvent.setTraceId(MDC.get("traceId"));
            ListenableFuture> future = kafkaTemplate.send(mqSendEvent.getTopic(),
                    JSON.toJSONString(mqSendEvent));
            future.addCallback(new ListenableFutureCallback>() {
                @Override
                public void onFailure(Throwable ex) {
                    log.error("kafka sendAsyncMessage error, ex = {}, topic = {}, data = {}", ex, mqSendEvent.getTopic(),
                            JSON.toJSONString(mqSendEvent));
                }
    
                @Override
                public void onSuccess(SendResult result) {
                    log.info("kafka sendAsyncMessage success topic = {}, data = {}", mqSendEvent.getTopic(),
                            JSON.toJSONString(mqSendEvent));
                }
            });
        } finally {
            MDC.clear();
        }
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    }

    public interface MessageSender {

    /**
     * 异步发送
     *
     * @param mqSendEvent
     */
    public boolean sendAsyncMessage(MqSendEvent mqSendEvent);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    }
    public class MqSendEvent implements Serializable {
    //topic
    private String topic;
    //数据
    private String data;
    //traceId数据
    private String traceId;

    public MqSendEvent(String topic, String data) {
        this.topic = topic;
        this.data = data;
    }
    
    public String getTopic() {
        return topic;
    }
    
    public void setTopic(String topic) {
        this.topic = topic;
    }
    
    public String getData() {
        return data;
    }
    
    public void setData(String data) {
        this.data = data;
    }
    
    public String getTraceId() {
        return traceId;
    }
    
    public void setTraceId(String traceId) {
        this.traceId = traceId;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    }
    @Component
    @Slf4j
    public class UserRegisterListener extends MessageBaseListener {

    /**
     * 监听消息
     *
     * @param consumerRecord
     * @param ack
     */
    @Override
    @KafkaListener(topics = {MessageConstants.REGISTER_USER_TOPIC}, containerFactory = MessageConstants.CONSUMER_CONTAINER_FACTORY_NAME)
    protected void receiverMessage(ConsumerRecord consumerRecord, Acknowledgment ack) {
        try {
            log.info("consumer message record:{}", consumerRecord);
            Optional optional = Optional.ofNullable(consumerRecord.value());
            if (optional.isPresent()) {
                Object value = optional.get();
                MqSendEvent mqSendEvent = JSON.parseObject((String) value, MqSendEvent.class);
                JSONObject jsonObject = JSON.parseObject(mqSendEvent.getData());
                MDC.put(ElkConstants.TRACE_ID, mqSendEvent.getTraceId());
             //具体业务逻辑
            }
        } finally {
            MDC.clear();
        }
        ack.acknowledge();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    }
    5,多线程方式
    思路
    在定时任务的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
    实现

    三,MDC实现链路追踪的原理

    1,概述
    MDC(Mapped Diagnostic Contexts),是Slf4J类日志系统中实现分布式多线程日志数据传递的重要工具,用户可利用MDC将一些运行时的上下文数据打印出来。目前只有log4j和logback提供原生的MDC支持
    2,使用(具体不同场景使用见(二,请求源头))
    在logback配置文件中配置MDC容器中的变量%X{trackId}


    %d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %X{trackId} [%15.15t] %class{39}.%method[%L] : %m%n
    UTF-8f


    3,源码分析
    MDC中的put方法
    public static void put(String key, String val) throws IllegalArgumentException {
    if (key == null) {
    throw new IllegalArgumentException(“key parameter cannot be null”);
    } else if (mdcAdapter == null) {
    throw new IllegalStateException(“MDCAdapter cannot be null. See also http://www.slf4j.org/codes.html#null_MDCA”);
    } else {
    mdcAdapter.put(key, val);
    }
    }
    MDCAdapter接口
    public interface MDCAdapter {
    //设置当前线程MDC上下文中指定key的值和value的值
    void put(String var1, String var2);
    // 获取当前线程MDC上下文中指定key的值
    String get(String var1);
    // 移除当前线程MDC上下文中指定key的值
    void remove(String var1);
    // 清空MDC上下文
    void clear();
    // 获取MDC上下文
    Map getCopyOfContextMap();
    // 设置MDC上下文
    void setContextMap(Map var1);
    }
    LogbackMDCAdapter实现

    public class LogbackMDCAdapter implements MDCAdapter {
    final ThreadLocal> copyOnThreadLocal = new ThreadLocal();
    private static final int WRITE_OPERATION = 1;
    private static final int MAP_COPY_OPERATION = 2;
    //threadlocal线程级别的,这就是为什么需要remove或者clear的原因所在,
    //防止内存泄露,具体为啥,可以研究一下ThreadLocal底层原理就明白了
    final ThreadLocal lastOperation = new ThreadLocal();

    public LogbackMDCAdapter() {
    }
    ......
    
    • 1
    • 2
    • 3

    可以看到LogbackMDCAdapter声明了类型为ThreadLocal的map。ThreadLocal 提供了线程本地的实例。ThreadLocal变量在线程之间隔离而在方法或类间能够共享,是属于线程单独私有的,线程之间相互隔离,到这里就可以大致理解为其实使用的就是ThreadLocal的私有线程的特性,大概就可以明白其中的原理了。
    LogbackMDCAdapter 的put方法
    public void put(String key, String val) throws IllegalArgumentException {
    if (key == null) {
    throw new IllegalArgumentException(“key cannot be null”);
    } else {
    //复制当前线程的threadLocal
    Map oldMap = (Map)this.copyOnThreadLocal.get();
    //设置当前操作为写操作,并返回上一次的操作
    Integer lastOp = this.getAndSetLastOperation(1);
    //上一次不是读操作或者null,已经初始化了,有内容的话在里面设置内容
    if (!this.wasLastOpReadOrNull(lastOp) && oldMap != null) {
    oldMap.put(key, val);
    } else {
    // 复制一个当前线程的副本
    Map newMap = this.duplicateAndInsertNewMap(oldMap);
    newMap.put(key, val);
    }

        }
    }
    
    private Integer getAndSetLastOperation(int op) {
        //设置写操作,并返回上一次操作
        Integer lastOp = (Integer)this.lastOperation.get();
        this.lastOperation.set(op);
        return lastOp;
    }
    
    private boolean wasLastOpReadOrNull(Integer lastOp) {
        //判断操作类型 是null或者读操作
        return lastOp == null || lastOp == 2;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    创建线程安全的map放到threadLocal中
    private Map duplicateAndInsertNewMap(Map oldMap) {
    Map newMap = Collections.synchronizedMap(new HashMap());
    if (oldMap != null) {
    synchronized(oldMap) {
    newMap.putAll(oldMap);
    }
    }

        this.copyOnThreadLocal.set(newMap);
        return newMap;
    }
    
    • 1
    • 2
    • 3

    get方法
    public String get(String key) {
    //从当前线程获取map
    Map map = (Map)this.copyOnThreadLocal.get();
    return map != null && key != null ? (String)map.get(key) : null;
    }
    remove方法
    public void remove(String key) {
    if (key != null) {
    Map oldMap = (Map)this.copyOnThreadLocal.get();
    if (oldMap != null) {
    //设置为写操作
    Integer lastOp = this.getAndSetLastOperation(1);
    if (this.wasLastOpReadOrNull(lastOp)) {
    //读操作或者null的时候,复制新map并移除当前key
    Map newMap = this.duplicateAndInsertNewMap(oldMap);
    newMap.remove(key);
    } else {
    oldMap.remove(key);
    }

            }
        }
    }
    
    • 1
    • 2
    • 3

    clear方法
    public void clear() {
    //设置为写操作
    this.lastOperation.set(1);
    //移除复制
    this.copyOnThreadLocal.remove();
    }

    四,MDC使用总结

    mdc就是基于Threadlocal进行的一个流程周转的标志物的传递,就是根据这种标志,可以追踪到日志的整体请求记录,便于进行定位到问题所在,而且对于用户的影响极小

  • 相关阅读:
    【数据结构】C++代码定义了一个动态数组(Vector)的数据结构,并实现了一些基本的操作,包括插入、删除、扩容和输出。
    协助办案,,,
    AIX360-CEMExplainer: MNIST Example
    百度校园招聘-研发工程师笔试
    Java毕设项目淮安市教育局职业教研室技能竞赛计算机(附源码+系统+数据库+LW)
    XML文件
    Linux系统编程_文件编程第2天:写整数、结构体,fopen等
    k8s中常用命令总结
    Python学习记录 析构函数
    基于PHP+MySQL共享自行车租赁管理系统的设计与实现
  • 原文地址:https://blog.csdn.net/sfb749277979/article/details/126725871