• 从源码全面解析 dubbo 消费端服务调用的来龙去脉


    • 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主
    • 📕系列专栏:Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列、duubo源码系列
    • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
    • 🍂博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人
    • 📝联系方式:hls1793929520,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

    在这里插入图片描述

    一、引言

    对于 Java 开发者而言,关于 dubbo ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。

    但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。

    本期 dubbo 源码解析系列文章,将带你领略 dubbo 源码的奥秘

    本期源码文章吸收了之前 SpringKakfaJUC源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。

    虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

    废话不多说,发车!

    二、服务调用流程

    1、消费端

    上一篇文章,讲解了我们的消费端如何订阅我们服务端注册到 Zookeeper 的服务接口:从源码全面解析 dubbo 服务订阅的来龙去脉

    既然消费端已经知道了我们的服务信息,那么下一步就要开始正式调用了

    我们先从消费端聊聊服务调用的流程

    1.1 动态代理的回调

    我们聊到消费端订阅服务时,最终创建的代码如下:

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    
    • 1
    • 2
    • 3

    相信看过 动态代理 的小伙伴应该知道,当我们调用 代理 的接口时,实际上走的是 InvokerInvocationHandler 该类的 invoke 方法

    public Object invoke(Object proxy, Method method, Object[] args){
        // 获取方法名=getUserById
        String methodName = method.getName();
        // 获取参数
        Class<?>[] parameterTypes = method.getParameterTypes();
        
        // 组装成 RpcInvocation 进行调用
        RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
        
        // 执行调用方法
        return InvocationUtil.invoke(invoker, rpcInvocation);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    这里我们重点介绍下 RpcInvocation 的几个参数:

    • serviceModel(Consumer):决定了服务的调用方式,包括使用哪种协议、注册中心获取服务列表、负载均衡和容错策略等。
    • method.getNamegetUserById
    • invoker.getInterface().getNamecom.common.service.IUserService
    • protocolServiceKeycom.common.service.IUserService:dubbo
    • method.getParameterTypes:方法的入参类型(Long)
    • args:方法的入参值(2)

    我们继续往下看 InvocationUtil.invoke 做了什么

    public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
        URL url = invoker.getUrl();
        String serviceKey = url.getServiceKey();
        rpcInvocation.setTargetServiceUniqueName(serviceKey);
        
        return invoker.invoke(rpcInvocation).recreate();
    }
    
    // 判断当前的是应用注册还是接口注册
    public Result invoke(Invocation invocation) throws RpcException {
        if (currentAvailableInvoker != null) {
            if (step == APPLICATION_FIRST) {
                if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
                    return invoker.invoke(invocation);
                }
                return decideInvoker().invoke(invocation);
            }
            return currentAvailableInvoker.invoke(invocation);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    我们继续往下追源码

    1.2 过滤器
    // 过滤器责任链模式
    // 依次遍历,执行顺序:
    public interface FilterChainBuilder {
        public Result invoke(Invocation invocation) throws RpcException {
            Result asyncResult;
            InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Filter " + filter.getClass().getName() + " invoke.");
            asyncResult = filter.invoke(nextNode, invocation);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这里会依次遍历所有的 filter

    • ConsumerContextFilter:将消费者端的信息(远程地址、应用名、服务名)传递给服务提供者端
    • ConsumerClassLoaderFilter:将消费者端的ClassLoader传递给服务提供者端,以便服务提供者端可以在调用时使用相同的ClassLoader加载类。
    • FutureFilter:异步调用
    • MonitorFilter:统计服务调用信息(调用次数、平均响应时间、失败次数)
    • RouterSnapshotFilter:动态路由,它可以根据路由规则选择服务提供者,并缓存路由结果,以提高性能。

    具体每个过滤器怎么实现的,这里就不展开讲了,后面有机会单独出一章

    1.3 路由逻辑

    当我们的责任链完成之后,下一步会经过我们的 路由 逻辑

    public Result invoke(final Invocation invocation) throws RpcException {
        // 
        List<Invoker<T>> invokers = list(invocation);
        InvocationProfilerUtils.releaseDetailProfiler(invocation);
    
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
        return doInvoke(invocation, invokers, loadbalance);      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    其中 List> invokers = list(invocation) 这里就是我们的路由逻辑:

    List<Invoker<T>> invokers = list(invocation);
    
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        List<Invoker<T>> routedResult = doList(availableInvokers, invocation);
    }
    
    public List<Invoker<T>> doList(BitList<Invoker<T>> invokers, Invocation invocation) {
        // 这里就是我们的路由策略!!!
        List<Invoker<T>> result = routerChain.route(getConsumerUrl(), invokers, invocation);
        return result == null ? BitList.emptyList() : result;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这里的路由策略比较多,我举两个比较经典的:

    • simpleRoute(简单路由策略):默认的路由策略

    • routeAndPrint(自定义路由策略):我们可以自定义其路由逻辑

    而对于整体路由的流程:

    • 获取可用的服务提供者列表
    • 过滤出符合条件的服务提供者
    • 对过滤后的服务提供者列表进行排序
    • 得到符合规定的服务提供者信息

    到这里,我们路由会把符合要求的 服务端 给筛选出来,接下来就进入我们的负载均衡环节了

    1.4 重试次数

    这里我们设置 retries 为 5

    @DubboReference(protocol = "dubbo", timeout = 100, retries = 5)
    private IUserService iUserService;
    
    • 1
    • 2

    我们看下源码里面有几次调用:根据源码来看,我们会有 5+1 次调用

    int len = calculateInvokeTimes(methodName);
    for (int i = 0; i < len; i++) {}
    
    private int calculateInvokeTimes(String methodName) {
        // 获取当前的重试次数+1
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        RpcContext rpcContext = RpcContext.getClientAttachment();
        Object retry = rpcContext.getObjectAttachment(RETRIES_KEY);
        if (retry instanceof Number) {
            len = ((Number) retry).intValue() + 1;
            rpcContext.removeAttachment(RETRIES_KEY);
        }
        if (len <= 0) {
            len = 1;
        }
    
        return len;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    我们直接 Debug 一下看看:

    image-20230612235945732

    1.5 负载均衡

    这一行 LoadBalance loadbalance = initLoadBalance(invokers, invocation) 得到我们的负载均衡策略,默认情况下如下:

    image-20230612223930762

    我们可以看到,默认情况下是 RandomLoadBalance 随机负载。

    我们继续往下追源码:

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) {
        
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            // 如果是重新调用的,要去更新下Invoker,防止服务端发生了变化
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // 再次校验
                checkInvokers(copyInvokers, invocation);
            }
            // 负载均衡逻辑!!!
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getServiceContext().setInvokers((List) invoked);
            boolean success = false;
            try {
                Result result = invokeWithContext(invoker, invocation);
                success = true;
                return result;
            } 
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    这里我简单将下负载均衡的逻辑:

    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
    
    private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected){
        // 如果只有一个服务端,那还负载均衡个屁
        // 直接校验下OK不OK直接返回就好
        if (invokers.size() == 1) {
            Invoker<T> tInvoker = invokers.get(0);
            checkShouldInvalidateInvoker(tInvoker);
            return tInvoker;
        }
        // 如果多个服务端,需要执行负载均衡算法
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
        return invoker;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    Dubbo 里面的负载均衡算法如下:

    image-20230612225319739

    这里也就不一介绍了,正常情况下,我们采用的都是 RandomLoadBalance 负载均衡

    当然这里博主介绍另外一个写法,也是我们业务中使用的

    1.4.1 自定义负载均衡

    上面我们看到,通过 LoadBalance loadbalance = initLoadBalance(invokers, invocation) ,我们可以得到一个负载均衡的实现类

    在我们的生产场景中,不同的集群上含有不同的合作方,我们需要根据合作方去分发不同集群的调用

    这个时候,我们可以重写我们的 LoadBalance ,在里面重写我们 doSelect 的逻辑,而这里的 集群A 也就是我们的 group

    image-20230612232552377

    1.6 调用服务

    当我们完成下面的流程:过滤器 —> 路由 —> 重试 —> 负载均衡,就到了下面这行:

    Result result = invokeWithContext(invoker, invocation)
    
    • 1

    我们继续往下追:

    public Result invoke(Invocation invocation) throws RpcException {
        try {
            // 加读写锁
            lock.readLock().lock();
            return invoker.invoke(invocation);
        } finally {
            lock.readLock().unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    我们直接追到 AbstractInvokerinvoke 方法

    public Result invoke(Invocation inv) throws RpcException {
        RpcInvocation invocation = (RpcInvocation) inv;
    
        // 配置RPCinvocation
        prepareInvocation(invocation);
    
        // 调用RPC同时同步返回结果
        AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
    
        // 等待返回结果
        waitForResultIfSync(asyncResult, invocation);
    
        return asyncResult;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    我们可以看到,对于调用服务来说,一共分为一下三步:

    • 配置 RPCinvocation
    • 调用 RPC 同步返回结果
    • 等待返回结果
    1.6.1 配置 RPCinvocation

    这里主要将 Invocation 转变成 RPCInvocation

    • 设置 RpcInvocationInvoker 属性,指明该调用是由哪个 Invoker 发起的
    • 当前线程的一些状态信息
    • 同步调用、异步调用
    • 异步调用生成一个唯一的调用 ID
    • 选择序列化的类型
    private void prepareInvocation(RpcInvocation inv) {
        // 设置 RpcInvocation 的 Invoker 属性,指明该调用是由哪个 Invoker 发起的
        inv.setInvoker(this);
        
    	// 当前线程的一些状态信息
        addInvocationAttachments(inv);
    
        // 同步调用、异步调用
        inv.setInvokeMode(RpcUtils.getInvokeMode(url, inv));
    
        // 异步调用生成一个唯一的调用 ID
        RpcUtils.attachInvocationIdIfAsync(getUrl(), inv);
    
        // 选择序列化的类型
        Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DefaultSerializationSelector.getDefaultRemotingSerialization()));
        if (serializationId != null) {
            inv.put(SERIALIZATION_ID_KEY, serializationId);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    1.6.2 调用 RPC 同步返回结果
    private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {
        asyncResult = (AsyncRpcResult) doInvoke(invocation);
    }
    
    protected Result doInvoke(final Invocation invocation){
        // 获取超时时间
        int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, methodName, DEFAULT_TIMEOUT);
       
        // 设置超时时间
        invocation.setAttachment(TIMEOUT_KEY, String.valueOf(timeout));
        
        // 从dubbo线程池中拿出一个线程
        ExecutorService executor = getCallbackExecutor(getUrl(), inv);
        // request:进行调用
    	CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
        FutureContext.getContext().setCompatibleFuture(appResponseFuture);
        AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
        result.setExecutor(executor);
        return result;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    这里的 currentClient.request 进行请求的发送:

    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor){
        return client.request(request, timeout, executor);
    }
    
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor){
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        channel.send(req);
        return future;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    这里的 channel.send(req)dubbo 自己包装的 channel,我们去看看其实现

    当然,我们这里如果看过博主 Netty 源码文章的话,实际可以猜到,肯定是封装了 Nettychannel

    public void send(Object message, boolean sent) throws RemotingException {
            // 校验当前的Channel是否关闭
            super.send(message, sent);
    
            boolean success = true;
            int timeout = 0;
            try {
                // channel 写入并刷新
                // channel:io.netty.channel.Channel
                ChannelFuture future = channel.writeAndFlush(message);
                if (sent) {
                    // 等待超时的时间
                    // 超过时间会报错
                    timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                    success = future.await(timeout);
                }
                // 这里如果报错了,就会走重试的逻辑
                Throwable cause = future.cause();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    1.6.3 等待返回结果
    waitForResultIfSync(asyncResult, invocation);
    
    private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
        // 判断当前的调用是不是同步调用
        // 异步调用直接返回即可
        if (InvokeMode.SYNC != invocation.getInvokeMode()) {
            return;
        }
        
        // 获取超时时间 
        Object timeoutKey = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
        long timeout = RpcUtils.convertToNumber(timeoutKey, Integer.MAX_VALUE);
    
        // 等待timeout时间
        // 获取失败-直接抛出异常
        asyncResult.get(timeout, TimeUnit.MILLISECONDS);
    }
    
    public Result get(long timeout, TimeUnit unit){
        // 获取响应返回的数据-等待timeout时间
        return responseFuture.get(timeout, unit);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    如果没有异常,如下图所示:

    image-20230618160337104

    到这里我们的消费端调用服务的整个流程源码剖析就完毕了~

    三、流程

    高清图片可私聊博主

    在这里插入图片描述

    四、总结

    鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

    其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

    如果你也对 后端架构和中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一起学习,一起成长

    我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。

    我们下期再见。

    我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。

    往期文章推荐:

  • 相关阅读:
    纯代码实现站点的文章显示百度是否已经收录功能
    Oracle 锁表,如何解锁
    Linux 下孤儿进程与僵尸进程详解
    专栏十二:单细胞空间转录组四种常见的不同整合pipline
    去了家新公司,技术总监不让用 IntelliJ IDEA想离职了
    (续)SSM整合之spring笔记(AOP 动态代理)(P094—P098)
    Redis管理客户端,兼容Windows、Mac、Linux
    力扣(LeetCode)878. 第 N 个神奇数字(C++)
    第二部分:对象之间的关联
    opencv 打开中文路径图报错
  • 原文地址:https://blog.csdn.net/qq_40915439/article/details/131274746