• skywalking源码——skywalking-agent——trace数据收集

    本源码来自于skywalking-agent 8.9.0版本



    本篇文章讲述trace数据的创建、暂存、传递过程所涉及的类、方法实现,相关代码在apm-agent-core模块,主要是context包下的类。本篇主要讲述 trace和context 相关类。我们在下一篇还会根据本篇来讲述我们公司在这块的改造,敬请期待!!!




    一个完整的trace可以简单的描述为segment、spand、id组成,当然实际并不是这样的,它更加复杂,因为这里里面涉及到跨进程、跨线程的传递、如何高效发送,所以会产生一些其他的存储对象。因此我们可以理解为在每个进程内都会存储一个 segment,这个segment里面会去处理跨进程、跨线程的传递、如何高效发送这些问题。



    public class TraceSegment {
         * The id of this trace segment. Every segment has its unique-global-id.
        private String traceSegmentId;
         * The refs of parent trace segments, except the primary one. For most RPC call, {@link #ref} contains only one
         * element, but if this segment is a start span of batch process, the segment faces multi parents, at this moment,
         * we only cache the first parent segment reference.

    * This field will not be serialized. Keeping this field is only for quick accessing. */ private TraceSegmentRef ref; /** * The spans belong to this trace segment. They all have finished. All active spans are hold and controlled by * "skywalking-api" module. */ private List<AbstractTracingSpan> spans; /** * The relatedGlobalTraceId represent the related trace. Most time it related only one * element, because only one parent {@link TraceSegment} exists, but, in batch scenario, the num becomes greater * than 1, also meaning multi-parents {@link TraceSegment}. But we only related the first parent TraceSegment. */ private DistributedTraceId relatedGlobalTraceId; private boolean ignore = false; private boolean isSizeLimited = false; private final long createTime; /** * Create a default/empty trace segment, with current time as start time, and generate a new segment id. */ public TraceSegment() { this.traceSegmentId = GlobalIdGenerator.generate(); this.spans = new LinkedList<>(); this.relatedGlobalTraceId = new NewDistributedTraceId(); this.createTime = System.currentTimeMillis(); } public TraceSegment(boolean ignore) { this(); this.ignore = ignore; } /** * Establish the link between this segment and its parents. * * @param refSegment {@link TraceSegmentRef} */ public void ref(TraceSegmentRef refSegment) { if (null == ref) { this.ref = refSegment; } } /** * Establish the line between this segment and the relative global trace id. */ public void relatedGlobalTrace(DistributedTraceId distributedTraceId) { if (relatedGlobalTraceId instanceof NewDistributedTraceId) { this.relatedGlobalTraceId = distributedTraceId; } } /** * After {@link AbstractSpan} is finished, as be controller by "skywalking-api" module, notify the {@link * TraceSegment} to archive it. */ public void archive(AbstractTracingSpan finishedSpan) { spans.add(finishedSpan); } /** * Finish this {@link TraceSegment}.

    return this, for chaining */ public TraceSegment finish(boolean isSizeLimited) { this.isSizeLimited = isSizeLimited; return this; } public String getTraceSegmentId() { return traceSegmentId; } /** * Get the first parent segment reference. */ public TraceSegmentRef getRef() { return ref; } public DistributedTraceId getRelatedGlobalTrace() { return relatedGlobalTraceId; } public boolean isSingleSpanSegment() { return this.spans != null && this.spans.size() == 1; } public boolean isIgnore() { return ignore; } public void setIgnore(boolean ignore) { this.ignore = ignore; } /** * This is a high CPU cost method, only called when sending to collector or test cases. * * @return the segment as GRPC service parameter */ public SegmentObject transform() { SegmentObject.Builder traceSegmentBuilder = SegmentObject.newBuilder(); traceSegmentBuilder.setTraceId(getRelatedGlobalTrace().getId()); /* * Trace Segment */ traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId); // Don't serialize TraceSegmentReference // SpanObject for (AbstractTracingSpan span : this.spans) { traceSegmentBuilder.addSpans(span.transform()); } traceSegmentBuilder.setService(Config.Agent.SERVICE_NAME); traceSegmentBuilder.setServiceInstance(Config.Agent.INSTANCE_NAME); traceSegmentBuilder.setIsSizeLimited(this.isSizeLimited); return traceSegmentBuilder.build(); } @Override public String toString() { return "TraceSegment{" + "traceSegmentId='" + traceSegmentId + '\'' + ", ref=" + ref + ", spans=" + spans + "}"; } public long createTime() { return this.createTime; } }

    relatedGlobalTraceId:关联的 DistributedTraceId 数组,存储整个trace的id。也是一个数组存储,和ref一样的设计。





    这个目录下包含 DistributedTraceId接口和它的两个实现类 NewDistributedTraceId 、PropagatedTraceId ,另外还有生成traceId的类GlobalIdGenerator





    public class NewDistributedTraceId extends DistributedTraceId {
        public NewDistributedTraceId() {
    public class PropagatedTraceId extends DistributedTraceId {
        public PropagatedTraceId(String id) {
    public final class GlobalIdGenerator {
        private static final String PROCESS_ID = UUID.randomUUID().toString().replaceAll("-", "");
        private static final ThreadLocal<IDContext> THREAD_ID_SEQUENCE = ThreadLocal.withInitial(
            () -> new IDContext(System.currentTimeMillis(), (short) 0));
        private GlobalIdGenerator() {
         * Generate a new id, combined by three parts.

    * The first one represents application instance id. *

    * The second one represents thread id. *

    * The third one also has two parts, 1) a timestamp, measured in milliseconds 2) a seq, in current thread, between * 0(included) and 9999(included) * * @return unique id to represent a trace or segment */ public static String generate() { return StringUtil.join( '.', PROCESS_ID, String.valueOf(Thread.currentThread().getId()), String.valueOf(THREAD_ID_SEQUENCE.get().nextSeq()) ); } private static class IDContext { private long lastTimestamp; private short threadSeq; // Just for considering time-shift-back only. private long lastShiftTimestamp; private int lastShiftValue; private IDContext(long lastTimestamp, short threadSeq) { this.lastTimestamp = lastTimestamp; this.threadSeq = threadSeq; } private long nextSeq() { return timestamp() * 10000 + nextThreadSeq(); } private long timestamp() { long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis < lastTimestamp) { // Just for considering time-shift-back by Ops or OS. @hanahmily 's suggestion. if (lastShiftTimestamp != currentTimeMillis) { lastShiftValue++; lastShiftTimestamp = currentTimeMillis; } return lastShiftValue; } else { lastTimestamp = currentTimeMillis; return lastTimestamp; } } private short nextThreadSeq() { if (threadSeq == 10000) { threadSeq = 0; } return threadSeq++; } } }

    public abstract class AbstractTracingSpan implements AbstractSpan {
         * Span id starts from 0.
        protected int spanId;
         * Parent span id starts from 0. -1 means no parent span.
        protected int parentSpanId;
        protected List<TagValuePair> tags;
        protected String operationName;
        protected SpanLayer layer;
         * The span has been tagged in async mode, required async stop to finish.
        protected volatile boolean isInAsyncMode = false;
         * The flag represents whether the span has been async stopped
        private volatile boolean isAsyncStopped = false;
         * The context to which the span belongs
        protected final TracingContext owner;
         * The start time of this Span.
        protected long startTime;
         * The end time of this Span.
        protected long endTime;
         * Error has occurred in the scope of span.
        protected boolean errorOccurred = false;
        protected int componentId = 0;
         * Log is a concept from OpenTracing spec. https://github.com/opentracing/specification/blob/master/specification.md#log-structured-data
        protected List<LogDataEntity> logs;
         * The refs of parent trace segments, except the primary one. For most RPC call, {@link #refs} contains only one
         * element, but if this segment is a start span of batch process, the segment faces multi parents, at this moment,
         * we use this {@link #refs} to link them.
        protected List<TraceSegmentRef> refs;
         * Tracing Mode. If true means represents all spans generated in this context should skip analysis.
        protected boolean skipAnalysis;
        protected AbstractTracingSpan(int spanId, int parentSpanId, String operationName, TracingContext owner) {
            this.operationName = operationName;
            this.spanId = spanId;
            this.parentSpanId = parentSpanId;
            this.owner = owner;
         * Set a key:value tag on the Span.

    * {@inheritDoc} * * @return this Span instance, for chaining */ @Override public AbstractTracingSpan tag(String key, String value) { return tag(Tags.ofKey(key), value); } @Override public AbstractTracingSpan tag(AbstractTag<?> tag, String value) { if (tags == null) { tags = new ArrayList<>(8); } if (tag.isCanOverwrite()) { for (TagValuePair pair : tags) { if (pair.sameWith(tag)) { pair.setValue(value); return this; } } } tags.add(new TagValuePair(tag, value)); return this; } /** * Finish the active Span. When it is finished, it will be archived by the given {@link TraceSegment}, which owners * it. * * @param owner of the Span. */ public boolean finish(TraceSegment owner) { this.endTime = System.currentTimeMillis(); owner.archive(this); return true; } @Override public AbstractTracingSpan start() { this.startTime = System.currentTimeMillis(); return this; } /** * Record an exception event of the current walltime timestamp. * * @param t any subclass of {@link Throwable}, which occurs in this span. * @return the Span, for chaining */ @Override public AbstractTracingSpan log(Throwable t) { if (logs == null) { logs = new LinkedList<>(); } if (!errorOccurred && ServiceManager.INSTANCE.findService(StatusCheckService.class).isError(t)) { errorOccurred(); } logs.add(new LogDataEntity.Builder().add(new KeyValuePair("event", "error")) .add(new KeyValuePair("error.kind", t.getClass().getName())) .add(new KeyValuePair("message", t.getMessage())) .add(new KeyValuePair( "stack", ThrowableTransformer.INSTANCE.convert2String(t, 4000) )) .build(System.currentTimeMillis())); return this; } /** * Record a common log with multi fields, for supporting opentracing-java * * @return the Span, for chaining */ @Override public AbstractTracingSpan log(long timestampMicroseconds, Map<String, ?> fields) { if (logs == null) { logs = new LinkedList<>(); } LogDataEntity.Builder builder = new LogDataEntity.Builder(); for (Map.Entry<String, ?> entry : fields.entrySet()) { builder.add(new KeyValuePair(entry.getKey(), entry.getValue().toString())); } logs.add(builder.build(timestampMicroseconds)); return this; } /** * In the scope of this span tracing context, error occurred, in auto-instrumentation mechanism, almost means throw * an exception. * * @return span instance, for chaining. */ @Override public AbstractTracingSpan errorOccurred() { this.errorOccurred = true; return this; } /** * Set the operation name, just because these is not compress dictionary value for this name. Use the entire string * temporarily, the agent will compress this name in async mode. * * @return span instance, for chaining. */ @Override public AbstractTracingSpan setOperationName(String operationName) { this.operationName = operationName; return this; } @Override public int getSpanId() { return spanId; } @Override public String getOperationName() { return operationName; } @Override public AbstractTracingSpan setLayer(SpanLayer layer) { this.layer = layer; return this; } /** * Set the component of this span, with internal supported. Highly recommend to use this way. * * @return span instance, for chaining. */ @Override public AbstractTracingSpan setComponent(Component component) { this.componentId = component.getId(); return this; } @Override public AbstractSpan start(long startTime) { this.startTime = startTime; return this; } public SpanObject.Builder transform() { SpanObject.Builder spanBuilder = SpanObject.newBuilder(); spanBuilder.setSpanId(this.spanId); spanBuilder.setParentSpanId(parentSpanId); spanBuilder.setStartTime(startTime); spanBuilder.setEndTime(endTime); spanBuilder.setOperationName(operationName); spanBuilder.setSkipAnalysis(skipAnalysis); if (isEntry()) { spanBuilder.setSpanType(SpanType.Entry); } else if (isExit()) { spanBuilder.setSpanType(SpanType.Exit); } else { spanBuilder.setSpanType(SpanType.Local); } if (this.layer != null) { spanBuilder.setSpanLayerValue(this.layer.getCode()); } if (componentId != DictionaryUtil.nullValue()) { spanBuilder.setComponentId(componentId); } spanBuilder.setIsError(errorOccurred); if (this.tags != null) { for (TagValuePair tag : this.tags) { spanBuilder.addTags(tag.transform()); } } if (this.logs != null) { for (LogDataEntity log : this.logs) { spanBuilder.addLogs(log.transform()); } } if (this.refs != null) { for (TraceSegmentRef ref : this.refs) { spanBuilder.addRefs(ref.transform()); } } return spanBuilder; } @Override public void ref(TraceSegmentRef ref) { if (refs == null) { refs = new LinkedList<>(); } /* * Provide the OOM protection if the entry span hosts too many references. */ if (refs.size() == Config.Agent.TRACE_SEGMENT_REF_LIMIT_PER_SPAN) { return; } if (!refs.contains(ref)) { refs.add(ref); } } @Override public AbstractSpan prepareForAsync() { if (isInAsyncMode) { throw new RuntimeException("Prepare for async repeatedly. Span is already in async mode."); } ContextManager.awaitFinishAsync(this); isInAsyncMode = true; return this; } @Override public AbstractSpan asyncFinish() { if (!isInAsyncMode) { throw new RuntimeException("Span is not in async mode, please use '#prepareForAsync' to active."); } if (isAsyncStopped) { throw new RuntimeException("Can not do async finish for the span repeatedly."); } this.endTime = System.currentTimeMillis(); owner.asyncStop(this); isAsyncStopped = true; return this; } @Override public boolean isProfiling() { return this.owner.profileStatus().isProfiling(); } @Override public void skipAnalysis() { this.skipAnalysis = true; } }

    getSpanId():获得 Span 编号。一个整数,在 TraceSegment 内唯一,从 0 开始自增,在创建 Span 对象时生成。

    setOperationName(operationName) :设置操作名,一般就是path的值

    setOperationId(operationId) :设置操作编号。考虑到操作名是字符串,Agent 发送给 Collector 占用流量较大。因此,Agent 会将操作注册到 Collector ,生成操作编号。

    setComponent(Component) :设置 org.skywalking.apm.network.trace.component.Component ,例如:MongoDB / SpringMVC / Tomcat 等等。在此要顺便提一点,skywalking的性能监控主要是Component级别的监控,不是方法级别的监控。

    tag(key, value) :设置键值对的标签。可以调用多次,构成 Span 的标签集合


    errorOccurred() :标记发生异常。大多数情况下,配置 log(Throwable) 方法一起使用

    start() :开始 Span 。一般情况的实现,设置开始时间。

    isEntry() :是否是入口 Span

    isExit() :是否是出口 Span


    实现 AbstractTracingSpan 抽象类,基于栈的链路追踪 Span 抽象类。这种 Span 能够被多次调用 start(…) 和 finish(…) 方法,在类似堆栈的调用中。 EntrySpan、ExitSpand都是他的子类。

    public abstract class StackBasedTracingSpan extends AbstractTracingSpan {
        protected int stackDepth;
        protected String peer;
        protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, TracingContext owner) {
            super(spanId, parentSpanId, operationName, owner);
            this.stackDepth = 0;
            this.peer = null;
        protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, String peer,
                                        TracingContext owner) {
            super(spanId, parentSpanId, operationName, owner);
            this.peer = peer;
        public SpanObject.Builder transform() {
            SpanObject.Builder spanBuilder = super.transform();
            if (StringUtil.isNotEmpty(peer)) {
            return spanBuilder;
        public boolean finish(TraceSegment owner) {
            if (--stackDepth == 0) {
                return super.finish(owner);
            } else {
                return false;
        public AbstractSpan setPeer(final String remotePeer) {
            this.peer = remotePeer;
            return this;
    stackDepth 属,栈深度。

    finish(TraceSegment) 实现方法,完成( 结束 ) Span ,将当前 Span ( 自己 )添加到 TraceSegment 。当且仅当 stackDepth == 0 时,添加成功。栈深度为零,出栈成功。调用 super#finish(TraceSegment) 方法,完成( 结束 ) Span ,将当前 Span ( 自己 )添加到 TraceSegment 。
    当操作编号为空时,尝试使用操作名获得操作编号并设置。用于减少 Agent 发送 Collector 数据的网络流量。栈深度非零,出栈失败。


    实现 StackBasedTracingSpan 抽象类,入口 Span ,用于服务提供者( Service Provider ) ,例如 Tomcat 。

    EntrySpan 是 TraceSegment 的第一个 Span ,这也是为什么称为"入口" Span 的原因。

    那么为什么 EntrySpan 继承 StackBasedTracingSpan ?
    如果你看过skywalking插件的源码你会发现在springmvc、tomcat插件处都会创建EntrySpan,那么岂不是会出现重复,StackBasedTracingSpan的作用就是处理这问题,它会先判断是否已经存在EntrySpan,存在则对stackDepth+1,否则创建EntrySpan。这也是上面我们看到的 finish(TraceSegment) 方法,只在栈深度为零时,出栈成功。通过这样的方式,保持一个 TraceSegment 有且仅有一个 EntrySpan 对象。

    当然,多个 TraceSegment 会有多个 EntrySpan 对象 ,例如【服务 A】远程调用【服务 B】。

    另外,虽然 EntrySpan 在第一个服务提供者创建,EntrySpan 代表的是最后一个服务提供者,例如,上面的例子,EntrySpan 代表的是 Spring MVC 的方法切面,这也是为什么在skywalking-ui的追踪上看到追踪片的起点是springmvc不是tomcat的原因。所以,startTime 和 endTime 以第一个为准,componentId 、componentName 、layer 、logs 、tags 、operationName 、operationId 等等以最后一个为准。并且,一般情况下,最后一个服务提供者的信息也会更加详细。


    继承 StackBasedTracingSpan 抽象类,出口 Span ,用于服务消费者( Service Consumer ) ,例如 HttpClient 、MongoDBClient 。

    那么为什么 ExitSpan 继承 StackBasedTracingSpan ?
    这个和EntrySpan是一样的原理。例如,我们可能在使用的 Dubbox 场景下,【Dubbox 服务 A】使用 HTTP 调用【Dubbox 服务 B】时,实际过程是,【Dubbox 服务 A】=》【HttpClient】=》【Dubbox 服务 B】。Agent 会在【Dubbox 服务 A】创建 ExitSpan 对象,也会在 【HttpClient】创建 ExitSpan 对象。那岂不是一次出口,出现两个 ExitSpan 。其实Agent 只会在【Dubbox 服务 A】,生成 ExitSpan 对象,第二个方法切面,栈深度 - 1。这也是上面我们看到 finish(TraceSegment) 方法,只在栈深度为零时,出栈成功。通过这样的方式,保持一次出口有且仅有一个 ExitSpan 对象。

    同理,多个 TraceSegment 会有多个 ExitSpan 对象 ,例如【服务 A】远程调用【服务 B】,然后【服务 A】再次远程调用【服务 B】,或者然后【服务 A】远程调用【服务 C】。

    另外,虽然 ExitSpan 在第一个消费者创建,ExitSpan 代表的也是第一个服务提消费者,例如,上面的例子,ExitSpan 代表的是【Dubbox 服务 A】。





    AbstractTag :标签抽象类。注意,这个类的用途是将标签属性设置到 Span 上,或者说,它是设置 Span 的标签的工具类。代码如下:

    key 属性,标签的键。
    set(AbstractSpan span, T tagValue) :抽象方法,设置 Span 的标签键 key 的值为 tagValue。交给子类进行具体的实现。


    StringTag :值类型为 String 的标签实现类。

    set(AbstractSpan span, String tagValue) 实现方法,设置 Span 的标签键 key 的值为 tagValue


    常用 Tag 枚举类,内部定义了多个 HTTP 、DB 相关的 StringTag 的静态变量。如果要做标记的扩展可以在这里面新增。比如新增gid(设备ID)



    IgnoredTracerContext、TracingContext是 AbstractTracerContext 的两个实现类,是目前存在的上线文类。IgnoredTracerContext是不需要想OAP发送trace数据的上下文,在采样率和属于可忽略path的时候创建IgnoredTracerContext。否则创建正常的 TracingContext。对于 IgnoredTracerContext 对象不存在traceId,这样会导致开发人员获取的traceId为异常的traceId,不利于使用。可以通过改造在 correlationContext.data 内也存入traceId,然后获取traceId的逻辑也变更为先执行getReadablePrimaryTraceId() 方法,拿不到traceId时再从correlationContext.data获取。这样就可以保证在跨进程、跨线程时也传递traceId(会在后面的实战中详细讲解)。


    实现 java.io.Serializable 接口,跨进程 Context 传输载体。所以如果要在跨进程传输的数据都可以放在data里面。


    跨线程 Context 传递快照。和 ContextCarrier 基本一致,由于不需要跨进程传输,可以少传递一些属性。


    实现 Service 接口,Agent 抽样服务。该服务的作用是,如何对 TraceSegment 抽样收集。考虑到如果每条 TraceSegment 都进行追踪,会带来一定的 CPU ( 用于序列化与反序列化 ) 和网络的开销。通过配置 Config.Agent.SAMPLE_N_PER_3_SECS 属性,设置每三秒,收集 TraceSegment 的条数。默认情况下,不开启抽样服务,即全部收集。


    on 属性,是否开启抽样服务。
    samplingFactorHolder 属性,抽样计数器。通过定时任务,每三秒重置一次。
    scheduledFuture 属性,定时任务。

    boot() 实现方法,若开启抽样服务( Config.Agent.SAMPLE_N_PER_3_SECS > 0 ) 时,创建定时任务,每三秒,调用一次 resetSamplingFactor() 方法,重置计数器。

    trySampling() 方法,若开启抽样服务,判断是否超过每三秒的抽样上限。若不是,返回 true ,并增加计数器。否则,返回 false 。

    forceSampled() 方法,强制增加计数器加一。一般情况下,该方法用于链路追踪上下文传播时,被调用服务必须记录链路,参见调用处的代码。

    resetSamplingFactor() 方法,重置计数器,每3秒执行一次。


    由于这里的采样率是对于所有接口的采样率,我们在使用中可能会出现method为get的请求一个采样率,method为非get的请求一个采样率,那么可以写个 SamplingService 的子类进行处理,相对的配置也要跟上。

