• dubbo 服务跟踪


    本文的目标是改进dubbo,在各个dubbo服务之间透传traceId,实现服务跟踪

    一、关于RPC


    在大型系统中,一个对外http服务的背后往往隐匿了多个内部服务之间的相互调用。因为性能、开发成本层面的考量,http协议并不适合内部服务之间的调用,为此产生了thrift、dubbo 等优秀RPC框架。

    thrift 的由facebook 开发,跨语言支持丰富是其最大的亮点,thrift定义了一种接口定于语言,通过自动化工具,生成client 端和server端代码。

    dubbo 是由一个由阿里开发的开源分布式服务框架,相对于于thrift,dubbo实现了完善的服务治理功能,包括:服务发现、路由规则、配置规则、服务降级、负载均衡等。

    二、关于服务跟踪


    在微服务的趋势下,一次调用产生的日志分布在不同的机器上,虽然可以使用ELK的技术,将所有服务的日志灌入es中,但是如何将这写日志“穿起来”是一个关键问题。

    一般的做法是在系统的边界生成一个traceId,向调用链上的后继服务传递traceId,后继服务使用traceId 打印相应日志,并再向后继服务传递traceId。简称“traceId透传”。

    在使用http协议作为服务协议的系统里,可以统一使用一个封装好的http client做traceId透传。但是dubbo实现traceId透传就稍微复杂些了。

    三、dubbo traceId 透传测试


    首先抛开实现,看一下实现透传成功的测试case

    评价指标:

    (1) 调用链上的所有dubbo服务都有一个同样的traceId
    (2) 对业务代码无侵入,来的业务代码无需修改升级
    (3) 对性能无太大的影响

    测试用例:

    /**
     * 测试Service
     * Created by WuMingzhi on 2017/3/19.
     */
    public interface GiftService {
    
        int getPrice(int giftId);
    }
    
    /**
     * provider端
     * Created by WuMingzhi on 2017/3/19.
     */
    public class GiftServiceImpl implements GiftService{
    
        private static Random random = new Random();
    
        @Override
        public int getPrice(int i) {
            System.out.println("gift provider traceId:" + TraceIdUtil.getTraceId());
            int price = random.nextInt(i + 100);
            System.out.println("set gift price: " + price);
            return price;
        }
    }
    
    /**
     * consumer 端
     * Created by WuMingzhi on 2017/3/19.
     */
    public class TestGiftTraceId {
    
        @Resource
        private GiftService giftService;
    
        @Test
        public void TestGiftTraceId() throws InterruptedException {
    
            while (true){
                // consumer 端设置一个 traceId
                TraceIdUtil.setTraceId(UUID.randomUUID().toString());
                System.out.println("gift consumer traceId:" + TraceIdUtil.getTraceId());
    
                int price = giftService.getPrice(100);
                System.out.println("get gift price: " + price);
    
                Thread.sleep(1000);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    测试结果:

    consumer端

    gift consumer traceId:53bf37ca-6ce9-401a-b33f-87d6f3c96cfa
    get gift price: 183
    gift consumer traceId:a79a2a0a-29fa-48d4-b4f6-14bdd3504603
    get gift price: 144
    gift consumer traceId:cd058847-8683-452b-ac4c-43bd94557a06
    get gift price: 178
    gift consumer traceId:fd5a72a1-f060-4aef-8864-baf1d7ee1fac
    get gift price: 187

    provider端

    gift provider traceId:53bf37ca-6ce9-401a-b33f-87d6f3c96cfa
    set gift price: 183
    gift provider traceId:a79a2a0a-29fa-48d4-b4f6-14bdd3504603
    set gift price: 144
    gift provider traceId:cd058847-8683-452b-ac4c-43bd94557a06
    set gift price: 178
    gift provider traceId:fd5a72a1-f060-4aef-8864-baf1d7ee1fac
    set gift price: 187

    结论 consumer 和provider 具有相同的traceId, 透传成功

    四、dubbo 源码分析


    下图为dubbo的线程派发模型:

    Proxy 是dubbo 使用javassist为consumer 端service生成的动态代理instance。
    Implement 是provider端的service实现instance。

    traceId透传即要求Proxy 和 Implement具有相同的traceId。dubbo具有良好的分层特征,transport的对象是RPCInvocation。所以Proxy将traceId放入RPCInvocation,交由Client进行序列化和TCP传输,Server反序列化得到RPCInvocation,取出traceId,交由Implement即可。

    这里写图片描述

    下图为consumer端 JavassistProxyFactory 的代码分析

    这里写图片描述

    下图为consumer端 InvokerInvocationHandler 的代码分析

    这里写图片描述

    下图为provider端 DubboProtocol 的代码分析

    这里写图片描述

    五、dubbo 透传traceId的实现


    修改了dubbo 的两个类,添加了一个类,只列出关键代码。

    package com.alibaba.dubbo.rpc.proxy;
    
    /**
     * traceId工具类这个类是新添加的
     * Created by WuMingzhi on 17/3/18.
     */
    public class TraceIdUtil {
    
        private static final ThreadLocal TRACE_ID = new ThreadLocal();
    
        public static String getTraceId() {
            return TRACE_ID.get();
        }
    
        public static void setTraceId(String traceId) {
            TRACE_ID.set(traceId);
        }
    
    }
    
    package com.alibaba.dubbo.rpc.proxy;
    
    /**
     * InvokerHandler 这个类 是修改的
     * @author william.liangf
     */
    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker invoker;
    
        public InvokerInvocationHandler(Invoker handler){
            this.invoker = handler;
        }
    
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            // 这里将cosumer 端的traceId放入RpcInvocation
            RpcInvocation rpcInvocation = new RpcInvocation(method, args);
            rpcInvocation.setAttachment("traceId", TraceIdUtil.getTraceId());
            return invoker.invoke(rpcInvocation).recreate();
        }
    
    }
    
    
    package com.alibaba.dubbo.rpc.protocol.dubbo;
    
    /**
     * dubbo protocol support.
     *
     * @author qian.lei
     * @author william.liangf
     * @author chao.liuc
     */
    public class DubboProtocol extends AbstractProtocol {
    
    
        private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
            public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    Invoker invoker = getInvoker(channel, inv);
                    //如果是callback 需要处理高版本调用低版本的问题
                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                        String methodsStr = invoker.getUrl().getParameters().get("methods");
                        boolean hasMethod = false;
                        if (methodsStr == null || methodsStr.indexOf(",") == -1){
                            hasMethod = inv.getMethodName().equals(methodsStr);
                        } else {
                            String[] methods = methodsStr.split(",");
                            for (String method : methods){
                                if (inv.getMethodName().equals(method)){
                                    hasMethod = true;
                                    break;
                                }
                            }
                        }
                        if (!hasMethod){
                            logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                            return null;
                        }
                    }
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    // 这里将收到的consumer端的traceId放入provider端的thread local
                    TraceIdUtil.setTraceId(inv.getAttachment("traceId"));
                    return invoker.invoke(inv);
                }
                throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106

    六、更多思考

    本文只介绍了如何在dubbo系统之间透传traceId,但没有介绍使用traceId进行日志跟踪的case。建议读者实现两个层次的日志跟踪:
    (1)线程日志跟踪:在调用的入口处设置线程的traceId(来着上游线程或者自动生成);在业务代码里使用统一的LogFactory 获取自动打印traceId的 logger 打印函数的调用信息。通过traceId 即可grep 出一次调用的所有日志。
    (2)dubbo日志跟踪:类似上文,dubbo日志跟踪连接了不同服务之间的线程日志,使得在dubbo下实现服务跟踪成为可能。

  • 相关阅读:
    LeetCode952三部曲之三:再次优化(122ms -> 96ms,超51% -> 超91%)
    基于matlab的SVR回归模型
    麒麟v10 安装jenkins
    CUDA 安装
    Zip Slip漏洞审计实战
    JavaScript 笔记: 函数
    8.strtok函数
    windows共享文件夹,免密访问
    36 WEB漏洞-逻辑越权之验证码与Token及接口
    Innodb之索引与算法
  • 原文地址:https://blog.csdn.net/m0_67401660/article/details/126327782