在进程内采用异步多线程时,如果不做任何处理,SkyWalking 追踪执行链路的 trace 信息必然会出现中断。一般来说保证执行链路信息的完整是刚性需求,这时候为了实现 trace 信息的跨线程传递,就需要使用 SkyWalking 的异步任务包装类
SkyWalking 的 Java 客户端提供了异步任务包装类用于完成多线程下 trace 的跨线程传递功能,目前有如下几个实现:
RunnableWrapper
:Ruannable 接口 的包装类CallableWrapper
:Callable 接口 的包装类ConsumerWrapper
:函数式接口 Consumer 的包装类SupplierWrapper
:函数式接口 Supplier 的包装类FunctionWrapper
:函数式接口 Function 的包装类
以 SupplierWrapper
为例,其使用示例如下:
CompletableFuture.supplyAsync(new SupplierWrapper<>(() -> {
LoggerFactory.getLogger(this.getClass()).info("SupplierWrapper");
return "nathan";
}));
以下为 SupplierWrapper 的源码,查看上一节提到的包装类源码会发现它们都被注解 @TraceCrossThread
修饰。实际上 SkyWalking 会通过 SPI 机制在 skywalking-plugin.def 文件中指定字节码增强配置类,其中 CallableOrRunnableActivation
就是专门针对 @TraceCrossThread
注解进行扫描处理的配置类
@TraceCrossThread
public class SupplierWrapper<V> implements Supplier<V> {
final Supplier<V> supplier;
public static <V> SupplierWrapper<V> of(Supplier<V> r) {
return new SupplierWrapper<>(r);
}
public SupplierWrapper(Supplier<V> supplier) {
this.supplier = supplier;
}
@Override
public V get() {
return supplier.get();
}
}
CallableOrRunnableActivation 会将@TraceCrossThread
注解作为增强类的切入点,同时为类中需要被增强的方法配置切面,以便进行字节码增强实现跨线程 trace 传递的功能
以下为 CallableOrRunnableActivation 源码,可以看到配置中主要为目标类指定了两种增强:
- 构造方法增强
构造方法增强的配置为匹配任意构造方法,其增强切面为 CallableOrRunnableConstructInterceptor- 指定方法增强
指定方法增强只匹配上文 1.1 节 提到的包装类的特定方法,其增强切面为 CallableOrRunnableInvokeInterceptor
本文在此仅对 SkyWalking 的增强配置做初步介绍,读者如有兴趣可从此处深入分析
public class CallableOrRunnableActivation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String ANNOTATION_NAME = "org.apache.skywalking.apm.toolkit.trace.TraceCrossThread";
private static final String INIT_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.CallableOrRunnableConstructInterceptor";
private static final String CALL_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.CallableOrRunnableInvokeInterceptor";
private static final String CALL_METHOD_NAME = "call";
private static final String RUN_METHOD_NAME = "run";
private static final String GET_METHOD_NAME = "get";
private static final String APPLY_METHOD_NAME = "apply";
private static final String ACCEPT_METHOD_NAME = "accept";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}
@Override
public String getConstructorInterceptor() {
return INIT_METHOD_INTERCEPTOR;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(CALL_METHOD_NAME)
.and(takesArguments(0))
.or(named(RUN_METHOD_NAME).and(takesArguments(0)))
.or(named(GET_METHOD_NAME).and(takesArguments(0)))
.or(named(APPLY_METHOD_NAME).and(takesArguments(1)))
.or(named(ACCEPT_METHOD_NAME).and(takesArguments(1)));
}
@Override
public String getMethodsInterceptor() {
return CALL_METHOD_INTERCEPTOR;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return byClassAnnotationMatch(new String[] {ANNOTATION_NAME});
}
}
经过上一节介绍,我们知道 @TraceCrossThread
注解只能增强 CallableOrRunnableActivation
配置中指定的方法,采用 @TraceCrossThread
很难实现比较自由的自定义。不过仿照CallableOrRunnableActivation
中配置的构造方法切面和指定方法切面,实现跨线程传递 trace 的异步任务包装类也没那么困难,一个BiConsumerWrapper
的代码示例如下:
该包装类实现 trace 信息跨线程传递的核心处理其实只有两步:
- 创建包装类异步任务时,通过
ContextManager
将提交任务的线程的上下文中的 trace 信息缓存到包装类对象内部- 工作线程在执行异步任务前首先通过
ContextManager
创建自身的上下文,然后将异步任务缓存的 trace 信息载入到自身上下文,以此实现 trace 的跨线程传递
public class BiConsumerWrapper<T, U> implements BiConsumer<T, U> {
private final BiConsumer<T, U> function;
private ContextSnapshot snapshot;
public BiConsumerWrapper(BiConsumer<T, U> function) {
this.function = function;
if (ContextManager.isActive()) {
snapshot = ContextManager.capture();
}
}
@Override
public void accept(T t, U u) {
Optional.ofNullable(snapshot).ifPresent(snapshot -> {
ContextManager.createLocalSpan("BiConsumerWrapper");
ContextManager.continued(snapshot);
});
function.accept(t, u);
Optional.ofNullable(snapshot).ifPresent(value -> ContextManager.stopSpan());
}
}