• 【SkyWalking】SkyWalking是如何实现跨进程传播链路数据?


    一、简介

    1 为什么写这篇文章

    写这篇文章是为了让自己和大家梳理这些内容:

    1. SkyWalking的链路串联依赖跨进程数据传播,他的跨进程传播协议是怎样的?
    2. 如果我想借助SkyWalking的跨进程传播协议实现传递全链路业务数据(如全局userId等),该如何实现?

    2 跨进程传播协议-简介

    SkyWalking 跨进程传播协议是用于上下文的传播,之前经历过sw3协议、sw6协议,本文介绍是当前(2023年)最新的sw8协议。
    该协议适用于不同语言、系统的探针之间传递上下文。

    二、协议

    Header项分为三类:

    • Standard Header项,Header名称:sw8
    • Extension Header项,Header名称:sw8-x
    • Correlation Header项,Header名称:sw8-correlation

    协议的整体设计:
    在这里插入图片描述

    下面详细讲解协议的Header项:

    1 Standard Header项

    该Header项是上下文传播必须包含的。

    • Header名称:sw8.
    • Header值:由-分隔的8个字段组成。Header值的长度应该小于2KB。

    Header值中具体包含以下8个字段:

    • 采样(Sample),0 或 1,0 表示上下文存在,但是可以(也很可能)被忽略而不做采样;1 表示这个trace需要采样并发送到后端。
    • 追踪ID(Trace Id),是 Base64 编码的字符串,其内容是由 . 分割的三个 long 类型值, 表示此trace的唯一标识。
    • 父追踪片段ID(Parent trace segment Id),是 Base64 编码的字符串,其内容是字符串且全局唯一。
    • 父跨度ID(Parent span Id),是一个从 0 开始的整数,这个跨度ID指向父追踪片段(segment)中的父跨度(span)。
    • 父服务名称(Parent service),是 Base64 编码的字符串,其内容是一个长度小于或等于50个UTF-8编码的字符串。
    • 父服务实例标识(Parent service instance),是 Base64 编码的字符串,其内容是一个长度小于或等于50个UTF-8编码的字符串。
    • 父服务的端点(Parent endpoint),是 Base64 编码的字符串,其内容是父追踪片段(segment)中第一个入口跨度(span)的操作名,由长度小于或等于50个UTF-8编码的字符组成。
    • 本请求的目标地址(Peer),是 Base64 编码的字符串,其内容是客户端用于访问目标服务的网络地址(不一定是 IP + 端口)。

    示例值: 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT

    2 Extension Header项

    该Header项是可选的。扩展Header项是为高级特性设计的,它提供了部署在上游和下游服务中的探针之间的交互功能。

    Header名称:sw8-x

    Header值:由-分割,字段可扩展。

    扩展Header值
    当前值包括的字段:

    追踪模式(Tracing Mode),空、0或1,默认为空或0。表示在这个上下文中生成的所有跨度(span)应该跳过分析。在默认情况下,这个应该在上下文中传播到服务端,除非它在跟踪过程中被更改。
    客户端发送的时间戳:用于异步RPC,如MQ。一旦设置,消费端将计算发送和接收之间的延迟,并使用key transmission.latency自动在span中标记延迟。

    示例值:1-1621344125000

    3 Correlation Header项

    该Header项是是可选的。并非所有语言的探针都支持,已知的是Java的探针是支持该协议。
    该Header项用于跨进程传递用户自定义数据,例如userId、orgId。
    这个协议跟OpenTracing 的 Baggage很类似,但是Correlation Header项相比,在默认设置下会更有更严格的限制,例如,只能存放3个字段,且有字段长度限制,这个是为了安全、性能等考虑。
    数据格式:

    Header名称:sw8-correlation

    Header值:由,分割一对对key、value,每对key、value逗号分割,key、value的由Base64编码。

    示例值:a2V5MQ==:dmFsdWUx,a2V5LTI=:dmFsdWUy

    三、跨进程传播协议的源码分析

    1 OpenTracing规范

    SkyWalking是基于OpenTracing标准的追踪系统,参考吴晟老师翻译的OpenTracing规范的文章opentracing之Inject和Extract,OpenTracing定义了跨进程传播的几个要素:

    SpanContext:SpanContext代表跨越进程边界,传递到下级span的状态。在SkyWalking中的实现类是org.apache.skywalking.apm.agent.core.context.TracingContext
    Carrier:传递跨进程数据的搬运工,负责将追踪状态从一个进程"carries"(携带,传递)到另一个进程
    Inject 和 Extract:SpanContexts可以通过Inject(注入)操作向Carrier增加,或者通过Extract(提取)从Carrier中获取,跨进程通讯数据(例如:HTTP头)。通过这种方式,SpanContexts可以跨越进程边界,并提供足够的信息来建立跨进程的span间关系(因此可以实现跨进程连续追踪)

    2 通过dubbo插件分析跨进程数据传播

    我们以SkyWalking java agent的dubbo-2.7.x-plugin插件为例,其中跨进程传播数据的核心代码在org.apache.skywalking.apm.plugin.asf.dubbo.DubboInterceptor,下面是该类跨进程传播的核心代码:

    public class DubboInterceptor implements InstanceMethodsAroundInterceptor {
    
        /**
         * Consumer: The serialized trace context data will
         * inject to the {@link RpcContext#attachments} for transport to provider side.
         * 

    * Provider: The serialized trace context data will extract from * {@link RpcContext#attachments}. current trace segment will ref if the serialization context data is not null. */ @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { ...... if (isConsumer) { // 1、consumer端 // ContextCarrier final ContextCarrier contextCarrier = new ContextCarrier(); // 1.1 createExitSpan()内部会调用TracerContext.inject(carrier),将TracerContext中的context数据inject(注入)到ContextCarrier的context中 span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port); CarrierItem next = contextCarrier.items(); // 1.2 遍历ContextCarrier,从ContextCarrier的context获取数据,注入到dubbo的attachment,从consumer端传递到provider端 while (next.hasNext()) { next = next.next(); rpcContext.setAttachment(next.getHeadKey(), next.getHeadValue()); if (invocation.getAttachments().containsKey(next.getHeadKey())) { invocation.getAttachments().remove(next.getHeadKey()); } } } else { // 2 provider端 // 2.1 从consumer端传递到provider端的attachment中获取跨进程协议数据,然后设置到context ContextCarrier contextCarrier = new ContextCarrier(); CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); next.setHeadValue(rpcContext.getAttachment(next.getHeadKey())); } // 2.2 createEntrySpan()内部会调用TracerContext.extract(carrier),将ContextCarrier的context数据extract(提取)到将TracerContext中的context中 span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier); span.setPeer(rpcContext.getRemoteAddressString()); } } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    从上面的源码可以看出在服务调用方和被调用方,都会用到ContextCarrier,他是临时搬运工,负责两个进程的TracerContext数据的传递。
    下面分析ContextCarrier等类的核心源码。

    3 分析跨进程传播协议的核心源码

    TracingContext
    org.apache.skywalking.apm.agent.core.context.TracingContext是OpenTracing的SpanContext的一种实现,里面包含了span的上下文,包含在segment、correlationContext、extensionContext,而inject()、extract()负责跨进程上下文透传。

    public class TracingContext implements AbstractTracerContext {
    
         /**
         * The final {@link TraceSegment}, which includes all finished spans.
         */
        private TraceSegment segment;
      
        @Getter(AccessLevel.PACKAGE)
        private final CorrelationContext correlationContext;
        @Getter(AccessLevel.PACKAGE)
        private final ExtensionContext extensionContext;
    
         /**
         * Prepare for the cross-process propagation. How to initialize the carrier, depends on the implementation.
         *
         * @param carrier to carry the context for crossing process.
         */
        void inject(ContextCarrier carrier);
    
        /**
         * Build the reference between this segment and a cross-process segment. How to build, depends on the
         * implementation.
         *
         * @param carrier carried the context from a cross-process segment.
         */
        void extract(ContextCarrier carrier);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    ContextCarrier
    ContextCarrier作为传递跨进程数据的搬运工,负责将追踪状态从一个进程"carries"(携带,传递)到另一个进程,其中包含了sw8协议里的Standard Header项、Extension Header项、Correlation Header项相关的上下文数据,具体参考下面的代码:

    public class ContextCarrier implements Serializable {
        /**
         * extensionContext包含了在某些特定场景中用于增强分析的可选上下文,对应sw8的Extension Header项
         */
        private ExtensionContext extensionContext = new ExtensionContext();
        /**
         * 用户的自定义上下文容器。此上下文与主追踪上下文一同传播。对应sw8的Correlation Header项
         */
        private CorrelationContext correlationContext = new CorrelationContext();
    
        /**
         * @return 存在于当前tracing上下文中的item清单
         */
        public CarrierItem items() {
            SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null);
            SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(
                correlationContext, sw8ExtensionCarrierItem);
            SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(this, sw8CorrelationCarrierItem);
            return new CarrierItemHead(sw8CarrierItem);
        }
    
        /**
         * Extract the extension context to tracing context
         */
        void extractExtensionTo(TracingContext tracingContext) {
            tracingContext.getExtensionContext().extract(this);
            // The extension context could have field not to propagate further, so, must use the this.* to process.
            this.extensionContext.handle(tracingContext.activeSpan());
        }
    
        /**
         * Extract the correlation context to tracing context
         */
        void extractCorrelationTo(TracingContext tracingContext) {
            tracingContext.getCorrelationContext().extract(this);
            // The correlation context could have field not to propagate further, so, must use the this.* to process.
            this.correlationContext.handle(tracingContext.activeSpan());
        }
    
        /**
         * 序列化sw8的Standard Header项,使用 '-' 分割各个字段
         * Serialize this {@link ContextCarrier} to a {@link String}, with '|' split.
         * @return the serialization string.
         */
        String serialize(HeaderVersion version) {
            if (this.isValid(version)) {
                return StringUtil.join(
                    '-',
                    "1",
                    Base64.encode(this.getTraceId()),
                    Base64.encode(this.getTraceSegmentId()),
                    this.getSpanId() + "",
                    Base64.encode(this.getParentService()),
                    Base64.encode(this.getParentServiceInstance()),
                    Base64.encode(this.getParentEndpoint()),
                    Base64.encode(this.getAddressUsedAtClient())
                );
            }
            return "";
        }
    
        /**
         * 反序列化sw8的Standard Header项
         * Initialize fields with the given text.
         * @param text carries {@link #traceSegmentId} and {@link #spanId}, with '|' split.
         */
        ContextCarrier deserialize(String text, HeaderVersion version) {
            if (text == null) {
                return this;
            }
            if (HeaderVersion.v3.equals(version)) {
                String[] parts = text.split("-", 8);
                if (parts.length == 8) {
                    try {
                        // parts[0] is sample flag, always trace if header exists.
                        this.traceId = Base64.decode2UTFString(parts[1]);
                        this.traceSegmentId = Base64.decode2UTFString(parts[2]);
                        this.spanId = Integer.parseInt(parts[3]);
                        this.parentService = Base64.decode2UTFString(parts[4]);
                        this.parentServiceInstance = Base64.decode2UTFString(parts[5]);
                        this.parentEndpoint = Base64.decode2UTFString(parts[6]);
                        this.addressUsedAtClient = Base64.decode2UTFString(parts[7]);
                    } catch (IllegalArgumentException ignored) {
    
                    }
                }
            }
            return this;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    CorrelationContext
    ContextCarrier里包含里sw8的Correlation Header项存放于CorrelationContext,这个类非常有用,适合我们去在全链路跨进程传递自定义的数据。
    sw8协议里的Standard Header项、Extension Header项是比较固定的协议格式,我们可以扩展这些协议,例如Standard Header项,当前固定是8位的,对应8个字段,我们可以扩展为9位,第九位可以定义为userId。但是如果要这样改造,就得修改ContextCarrier类序列化、反序列的逻辑,要重新发布agent,并考虑好新旧版本兼容性问题、以及不同语言的agent是否兼容。
    而sw8的Correlation Header项使用起来就非常方便。先看下对应实现了CorrelationContext的源码:

    /**
     * Correlation context, use to propagation user custom data.
     * Correlation上下文,用于传播用户自定义数据
     */
    public class CorrelationContext {
    
        private final Map<String, String> data;
    
        /**
         * Add or override the context. 添加或覆盖上下文数据
         *
         * @param key   to add or locate the existing context
         * @param value as new value
         * @return old one if exist.
         */
        public Optional<String> put(String key, String value) {
            // 可以存放于span的tag中
            if (AUTO_TAG_KEYS.contains(key) && ContextManager.isActive()) {
                ContextManager.activeSpan().tag(new StringTag(key), value);
            }
            // setting
            data.put(key, value);
            return Optional.empty();
        }
    
        /**
         * @param key to find the context 获取上下文数据
         * @return value if exist.
         */
        public Optional<String> get(String key) {
            return Optional.ofNullable(data.get(key));
        }
    
        /**
         * Serialize this {@link CorrelationContext} to a {@link String} 序列化
         *
         * @return the serialization string.
         */
        String serialize() {
            if (data.isEmpty()) {
                return "";
            }
    
            return data.entrySet().stream()
                       .map(entry -> Base64.encode(entry.getKey()) + ":" + Base64.encode(entry.getValue()))
                       .collect(Collectors.joining(","));
        }
    
        /**
         * Deserialize data from {@link String} 反序列化
         */
        void deserialize(String value) {
            if (StringUtil.isEmpty(value)) {
                return;
            }
    
            for (String perData : value.split(",")) {
                // Only data with limited count of elements can be added
                if (data.size() >= Config.Correlation.ELEMENT_MAX_NUMBER) {
                    break;
                }
                final String[] parts = perData.split(":");
                if (parts.length != 2) {
                    continue;
                }
                data.put(Base64.decode2UTFString(parts[0]), Base64.decode2UTFString(parts[1]));
            }
        }
    
        /**
         * Prepare for the cross-process propagation. Inject the {@link #data} into {@link
         * ContextCarrier#getCorrelationContext()}
         */
        void inject(ContextCarrier carrier) {
            carrier.getCorrelationContext().data.putAll(this.data);
        }
    
        /**
         * Extra the {@link ContextCarrier#getCorrelationContext()} into this context.
         */
        void extract(ContextCarrier carrier) {
            ......
        }
    
        /**
         * Clone the context data, work for capture to cross-thread. 克隆数据,用于跨线程传递
         */
        @Override
        public CorrelationContext clone() {
            final CorrelationContext context = new CorrelationContext();
            context.data.putAll(this.data);
            return context;
        }
    
        /**
         * Continue the correlation context in another thread.传递到另外的线程
         *
         * @param snapshot holds the context.
         */
        void continued(ContextSnapshot snapshot) {
            this.data.putAll(snapshot.getCorrelationContext().data);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    通过源码可知,CorrelationContext通过Map来存放数据,CorrelationContext数据支持跨线程、跨进程透传。

    四、小结

    分析Dubbo插件的跨进程核心代码,了解了跨进程传播协议的核心实现逻辑。

    其实在其他分布式追踪系统(如Zipkin、Jager)、全链路灰度系统等涉及到跨进程数据传播的系统中,也是使用了类似于上面SkyWalking协议的思路。

    参考

    SkyWalking Cross Process Propagation Headers Protocol
    SkyWalking Cross Process Correlation Headers Protocol
    详解 Apache SkyWalking 的跨进程传播协议

  • 相关阅读:
    uniapp运行到IOS真机提示 错误:请查看是否设备未加入到证书列表或者确认证书类型是否匹配
    五种多目标优化算法(MOJS、MOGWO、NSWOA、MOPSO、NSGA2)性能对比(提供MATLAB代码)
    Java.lang.Character类中isLetter()方法具有什么功能呢?
    ES高亮显示语法
    「网络流浅谈」最小割的模型
    14.一元二次方程组,有实根输出实根,没有实根输出虚根
    牛客刷题<十>使用函数实现数据大小端转换
    OB_GINS_day3
    组合数学(上):数列、排列、组合
    Java预习46
  • 原文地址:https://blog.csdn.net/u011397981/article/details/133689418