• SkyWalking 实现跨线程 Trace 传递


    前言

    在进程内采用异步多线程时,如果不做任何处理,SkyWalking 追踪执行链路的 trace 信息必然会出现中断。一般来说保证执行链路信息的完整是刚性需求,这时候为了实现 trace 信息的跨线程传递,就需要使用 SkyWalking 的异步任务包装类

    1. SkyWalking 客户端异步任务包装类

    1.1 跨线程包装类的使用

    SkyWalking 的 Java 客户端提供了异步任务包装类用于完成多线程下 trace 的跨线程传递功能,目前有如下几个实现:

    • RunnableWrapperRuannable 接口 的包装类
    • CallableWrapperCallable 接口 的包装类
    • ConsumerWrapper函数式接口 Consumer 的包装类
    • SupplierWrapper函数式接口 Supplier 的包装类
    • FunctionWrapper函数式接口 Function 的包装类

    SupplierWrapper 为例,其使用示例如下:

    CompletableFuture.supplyAsync(new SupplierWrapper<>(() -> {
    	LoggerFactory.getLogger(this.getClass()).info("SupplierWrapper");
        return "nathan";
    }));
    
    • 1
    • 2
    • 3
    • 4

    1.2 跨线程包装类的原理

    1.2.1 @TraceCrossThread 注解

    以下为 SupplierWrapper 的源码,查看上一节提到的包装类源码会发现它们都被注解 @TraceCrossThread 修饰。实际上 SkyWalking 会通过 SPI 机制在 skywalking-plugin.def 文件中指定字节码增强配置类,其中 CallableOrRunnableActivation 就是专门针对 @TraceCrossThread 注解进行扫描处理的配置类

    @TraceCrossThread
    public class SupplierWrapper<V> implements Supplier<V> {
        final Supplier<V> supplier;
    
        public static <V> SupplierWrapper<V> of(Supplier<V> r) {
            return new SupplierWrapper<>(r);
        }
    
        public SupplierWrapper(Supplier<V> supplier) {
            this.supplier = supplier;
        }
    
        @Override
        public V get() {
            return supplier.get();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    1.2.2 CallableOrRunnableActivation 的增强配置

    CallableOrRunnableActivation 会将@TraceCrossThread 注解作为增强类的切入点,同时为类中需要被增强的方法配置切面,以便进行字节码增强实现跨线程 trace 传递的功能

    以下为 CallableOrRunnableActivation 源码,可以看到配置中主要为目标类指定了两种增强:

    1. 构造方法增强
      构造方法增强的配置为匹配任意构造方法,其增强切面为 CallableOrRunnableConstructInterceptor
    2. 指定方法增强
      指定方法增强只匹配上文 1.1 节 提到的包装类的特定方法,其增强切面为 CallableOrRunnableInvokeInterceptor

    本文在此仅对 SkyWalking 的增强配置做初步介绍,读者如有兴趣可从此处深入分析

    public class CallableOrRunnableActivation extends ClassInstanceMethodsEnhancePluginDefine {
    
        public static final String ANNOTATION_NAME = "org.apache.skywalking.apm.toolkit.trace.TraceCrossThread";
        private static final String INIT_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.CallableOrRunnableConstructInterceptor";
        private static final String CALL_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.CallableOrRunnableInvokeInterceptor";
        private static final String CALL_METHOD_NAME = "call";
        private static final String RUN_METHOD_NAME = "run";
        private static final String GET_METHOD_NAME = "get";
        private static final String APPLY_METHOD_NAME = "apply";
        private static final String ACCEPT_METHOD_NAME = "accept";
    
        @Override
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
            return new ConstructorInterceptPoint[] {
                new ConstructorInterceptPoint() {
                    @Override
                    public ElementMatcher<MethodDescription> getConstructorMatcher() {
                        return any();
                    }
    
                    @Override
                    public String getConstructorInterceptor() {
                        return INIT_METHOD_INTERCEPTOR;
                    }
                }
            };
        }
    
        @Override
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
            return new InstanceMethodsInterceptPoint[] {
                new InstanceMethodsInterceptPoint() {
                    @Override
                    public ElementMatcher<MethodDescription> getMethodsMatcher() {
                        return named(CALL_METHOD_NAME)
                            .and(takesArguments(0))
                            .or(named(RUN_METHOD_NAME).and(takesArguments(0)))
                            .or(named(GET_METHOD_NAME).and(takesArguments(0)))
                            .or(named(APPLY_METHOD_NAME).and(takesArguments(1)))
                            .or(named(ACCEPT_METHOD_NAME).and(takesArguments(1)));
                    }
    
                    @Override
                    public String getMethodsInterceptor() {
                        return CALL_METHOD_INTERCEPTOR;
                    }
    
                    @Override
                    public boolean isOverrideArgs() {
                        return false;
                    }
                }
            };
        }
    
        @Override
        protected ClassMatch enhanceClass() {
            return byClassAnnotationMatch(new String[] {ANNOTATION_NAME});
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    2. 自定义实现 SkyWalking 跨线程包装类

    经过上一节介绍,我们知道 @TraceCrossThread 注解只能增强 CallableOrRunnableActivation 配置中指定的方法,采用 @TraceCrossThread 很难实现比较自由的自定义。不过仿照CallableOrRunnableActivation 中配置的构造方法切面和指定方法切面,实现跨线程传递 trace 的异步任务包装类也没那么困难,一个BiConsumerWrapper 的代码示例如下:

    该包装类实现 trace 信息跨线程传递的核心处理其实只有两步:

    1. 创建包装类异步任务时,通过 ContextManager 将提交任务的线程的上下文中的 trace 信息缓存到包装类对象内部
    2. 工作线程在执行异步任务前首先通过 ContextManager 创建自身的上下文,然后将异步任务缓存的 trace 信息载入到自身上下文,以此实现 trace 的跨线程传递
    public class BiConsumerWrapper<T, U> implements BiConsumer<T, U> {
        private final BiConsumer<T, U> function;
        private ContextSnapshot snapshot;
    
        public BiConsumerWrapper(BiConsumer<T, U> function) {
            this.function = function;
            if (ContextManager.isActive()) {
                snapshot = ContextManager.capture();
            }
        }
    
        @Override
        public void accept(T t, U u) {
            Optional.ofNullable(snapshot).ifPresent(snapshot -> {
                ContextManager.createLocalSpan("BiConsumerWrapper");
                ContextManager.continued(snapshot);
            });
            function.accept(t, u);
            Optional.ofNullable(snapshot).ifPresent(value -> ContextManager.stopSpan());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
  • 相关阅读:
    Vue-router的动态路由:获取传递的值
    Android 多线程并发详解
    EFCore的新东西
    python虚拟环境创建问题:You will need to adjust your conda configuration to proceed.
    coppercam入门手册[6]
    一文讲清楚 JVM Safe Point
    消息队列MQ核心原理全面总结(11大必会原理)
    Ansible playbook自动化运维工具详解
    TRUNK通信过程
    Taro编译警告解决方案:Error: chunk common [mini-css-extract-plugin]
  • 原文地址:https://blog.csdn.net/weixin_45505313/article/details/125416176