• 如何在短期内快速掌握 Dubbo 原理和源码?有哪些经验可以分享?


    服务集群的概述

    概述

    为了避免单点故障,现在的应用通常至少会部署在两台服务器上,这样就组成了集群。集群就是单机的多实例,在多个服务器上部署多个服务,每个服务就是一个节点,部署N个节点,处理业务的能力就提升 N倍(大约),这些节点的集合就叫做集群。

    5a3895a77e9449f0b8d41de7128352fc.png 

    管理控制台

    目前的管理控制台已经发布0.1版本,结构上采取了前后端分离的方式,前端使用Vue和Vuetify分别作为Javascript框架和UI框架,后端采用Spring Boot框架。既可以按照标准的Maven方式进行打包,部署,也可以采用前后端分离的部署方式,方便开发,功能上,目前具备了服务查询,服务治理(包括Dubbo2.7中新增的治理规则)以及服务测试三部分内容。

    Maven方式部署

    安装
    git clone https://github.com/apache/dubbo-admin.git
    cd dubbo-admin
    mvn clean package
    cd dubbo-admin-distribution/target
    java -jar dubbo-admin-0.1.jar
    访问 http://localhost:8080
    前后端分离部署

    前端
    cd dubbo-admin-ui 
    npm install 
    npm run dev
    后端
    cd dubbo-admin-server
    mvn clean package 
    cd target
    java -jar dubbo-admin-server-0.1.jar
    访问 http://localhost:8081
    前后端分离模式下,前端的修改可以实时生效
    环境搭建

    集群调用存在的问题

    1、负载均衡
    2、集群容错
    3、服务治理


    集群的调用过程

    调用过程

    在对集群相关代码进行分析之前,这里有必要先来介绍一下集群容错的所有组件。包含 Cluster、Cluster Invoker、Directory、Router 和 LoadBalance 等。

    f2d0270fd9b5405e93aea7f3f26331c3.png 

    集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。

    第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。

    Directory 的用途是保存 Invoker,可简单类比为 List。其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的 Invoker 列表会随着注册中心内容的变化而变化。

    每次变化后,RegistryDirectory 会动态增删 Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。

    当 FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。

    最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。

    组件介绍

    Directory:它代表多个Invoker,从methodInvokerMap提取,但是他的值是动态,例如注册中心的变更。
    Router:负责从多个Invoker中按路由规则选出子集,例如应用隔离或读写分离或灰度发布等等
    Cluster:将Directory中的多个Invoker伪装成一个Invoker,来容错,调用失败重试。
    LoadBalance:从多个Invoker选取一个做本次调用,具体包含很多种负载均衡算法。
    Invoker:Provider中的一个可调用接口。例如DemoService
    集群容错

    在分布式系统中,集群某个某些节点出现问题是大概率事件,因此在设计分布式RPC框架的过程中,必须要把失败作为设计的一等公民来对待。一次调用失败之后,应该如何选择对失败的选择策略,这是一个见仁见智的问题,每种策略可能都有自己独特的应用场景。因此,作为框架来说,应当针对不同场景提供多种策略,供用户进行选择。

    在Dubbo设计中,通过Cluster这个接口的抽象,把一组可供调用的Provider信息组合成为一个统一的Invoker供调用方进行调用。经过路由规则过滤,负载均衡选址后,选中一个具体地址进行调用,如果调用失败,则会按照集群配置的容错策略进行容错处理。

    内置集群容错方式

    Dubbo默认内置了若干容错策略,如果不能满足用户需求,则可以通过自定义容错策略进行配置

    Dubbo主要内置了如下几种策略:

    Failover(失败自动切换)
    Failsafe(失败安全)
    Failfast(快速失败)
    Failback(失败自动恢复)
    Forking(并行调用)
    Broadcast(广播调用)
    这些名称比较相似,概念也比较容易混淆,下面逐一进行解释。

    Failover(失败自动切换)

    Failover是高可用系统中的一个常用概念,服务器通常拥有主备两套机器配置,如果主服务器出现故障,则自动切换到备服务器中,从而保证了整体的高可用性。

    Dubbo也借鉴了这个思想,并且把它作为Dubbo默认的容错策略。当调用出现失败的时候,根据配置的重试次数,会自动从其他可用地址中重新选择一个可用的地址进行调用,直到调用成功,或者是达到重试的上限位置。

    Dubbo里默认配置的重试次数是2,也就是说,算上第一次调用,最多会调用3次。

    其配置方法,容错策略既可以在服务提供方配置,也可以服务调用方进行配置。而重试次数的配置则更为灵活,既可以在服务级别进行配置,也可以在方法级别进行配置。具体优先顺序为:

    服务调用方方法级配置 > 服务调用方服务级配置 > 服务提供方方法级配置 > 服务提供方服务级配置
    以XML方式为例,具体配置方法如下:

    服务提供方,服务级配置


    服务提供方,方法级配置


         
     
    服务调用方,服务级配置


    服务调用方,方法级配置:


         
     

    Failover可以自动对失败进行重试,对调用者屏蔽了失败的细节,但是Failover策略也会带来一些副作用:

    1、重试会额外增加一下开销,例如增加资源的使用,在高负载系统下,额外的重试可能让系统雪上加霜。


    2、重试会增加调用的响应时间。


    3、某些情况下,重试甚至会造成资源的浪费。考虑一个调用场景,A->B->C,如果A处设置了超时100ms,再B->C的第一次调用完成时已经超过了100ms,但很不幸B->C失败,这时候会进行重试,但其实这时候重试已经没有意义,因此在A看来这次调用已经超时,A可能已经开始执行其他逻辑。
    Failsafe(失败安全)

    失败安全策略的核心是即使失败了也不会影响整个调用流程。通常情况下用于旁路系统或流程中,它的失败不影响核心业务的正确性。在实现上,当出现调用失败时,会忽略此错误,并记录一条日志,同时返回一个空结果,在上游看来调用是成功的。

    应用场景,可以用于写入审计日志等操作。

    具体配置方法:

    服务提供方,服务级配置


    服务调用方,服务级配置


    其中服务调用方配置优先于服务提供方配置。

    Failfast(快速失败)

    某些业务场景中,某些操作可能是非幂等的,如果重复发起调用,可能会导致出现脏数据等。例如调用某个服务,其中包含一个数据库的写操作,如果写操作完成,但是在发送结果给调用方的过程中出错了,那么在调用发看来这次调用失败了,但其实数据写入已经完成。这种情况下,重试可能并不是一个好策略,这时候就需要使用到Failfast策略,调用失败立即报错。让调用方来决定下一步的操作并保证业务的幂等性。

    具体配置方法:

    服务提供方,服务级配置


    服务调用方,服务级配置


    其中服务调用方配置优先于服务提供方配置。

    Failback(失败自动恢复)

    Failback通常和Failover两个概念联系在一起。在高可用系统中,当主机发生故障,通过Failover进行主备切换后,待故障恢复后,系统应该具备自动恢复原始配置的能力。

    Dubbo中的Failback策略中,如果调用失败,则此次失败相当于Failsafe,将返回一个空结果。而与Failsafe不同的是,Failback策略会将这次调用加入内存中的失败列表中,对于这个列表中的失败调用,会在另一个线程中进行异步重试,重试如果再发生失败,则会忽略,即使重试调用成功,原来的调用方也感知不到了。因此它通常适合于,对于实时性要求不高,且不需要返回值的一些异步操作。

    具体配置方法:

    服务提供方,服务级配置


    服务调用方,服务级配置


    其中服务调用方配置优先于服务提供方配置。

    按照目前的实现,Failback策略还有一些局限,例如内存中的失败调用列表没有上限,可能导致堆积,异步重试的执行间隔无法调整,默认是5秒。

    Forking(并行调用)

    上述几种策略中,主要都是针对调用失败发生后如何进行弥补的角度去考虑的,而Forking策略则跟上述几种策略不同,是一种典型的用成本换时间的思路。即第一次调用的时候就同时发起多个调用,只要其中一个调用成功,就认为成功。在资源充足,且对于失败的容忍度较低的场景下,可以采用此策略。

    具体配置方法:

    服务提供方,服务级配置


    服务调用方,服务级配置


    其中服务调用方配置优先于服务提供方配置。

    Broadcast(广播调用)

    在某些场景下,可能需要对服务的所有提供者进行操作,此时可以使用广播调用策略。此策略会逐个调用所有提供者,只要任意有一个提供者出错,则认为此次调用出错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

    具体配置方法:

    服务提供方,服务级配置


    服务调用方,服务级配置


    其中服务调用方配置优先于服务提供方配置。

    集群容错调优

    下表对各种策略做一个简单对比:

    6fff421ab6e34ba2a5ba035c00606f8b.png 
    综上我们得知,不同的容错策略往往对应不同的业务处理,这里做一个总结如下:

    1、Failover :通常用于对调用rt不敏感的场景,如读操作;但重试会带来更长延迟
    2、Failfast :通常用于非幂等性操作,需要快速感知失败的场景;比如新增记录
    3、Failsafe :通常用于旁路系统,失败不影响核心流程正确性的场景;如日志记录
    4、Failback :通常用于对于实时性要求不高,且不需要返回值的一些异步操作的场景
    5、Forking :通常用于资源充足,且对于失败的容忍度较低,实时性要求高的读操作,但需要浪费更多服务资源
    6、Broadcast:如通知所有提供者更新缓存或日志等本地资源信息


    源码分析

    我们在上一章看到了两个概念,分别是集群接口 Cluster 和 Cluster Invoker,这两者是不同的。Cluster 是接口,而 Cluster Invoker 是一种 Invoker。服务提供者的选择逻辑,以及远程调用失败后的的处理逻辑均是封装在 Cluster Invoker 中。那么 Cluster 接口和相关实现类有什么用呢?用途比较简单,仅用于生成 Cluster Invoker。下面我们来看一下源码。

    public class FailoverCluster implements Cluster {

        public final static String NAME = "failover";

        @Override
        public Invoker join(Directory directory) throws RpcException {
            // 创建并返回 FailoverClusterInvoker 对象
            return new FailoverClusterInvoker(directory);
        }
    }
    如上,FailoverCluster 总共就包含这几行代码,用于创建 FailoverClusterInvoker 对象,很简单。下面再看一个。

    public class FailbackCluster implements Cluster {

        public final static String NAME = "failback";

        @Override
        public Invoker join(Directory directory) throws RpcException {
            // 创建并返回 FailbackClusterInvoker 对象
            return new FailbackClusterInvoker(directory);
        }

    }
    如上,FailbackCluster 的逻辑也是很简单,无需解释了。所以接下来,我们把重点放在各种 Cluster Invoker 上

    Cluster Invoker

    我们首先从各种 Cluster Invoker 的父类 AbstractClusterInvoker 源码开始说起。前面说过,集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,这个在服务引用那篇文章中分析过,就不赘述。第二个阶段是在服务消费者进行远程调用时,此时 AbstractClusterInvoker 的 invoke 方法会被调用。列举 Invoker,负载均衡等操作均会在此阶段被执行。因此下面先来看一下 invoke 方法的逻辑。

    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;

        // 绑定 attachments 到 invocation 中.
        Map contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }

        // 列举 Invoker
        List> invokers = list(invocation);
        if (invokers != null && !invokers.isEmpty()) {
            // 加载 LoadBalance
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        // 调用 doInvoke 进行后续操作
        return doInvoke(invocation, invokers, loadbalance);
    }

    // 抽象方法,由子类实现
    protected abstract Result doInvoke(Invocation invocation, List> invokers,
                                           LoadBalance loadbalance) throws RpcException;
    AbstractClusterInvoker 的 invoke 方法主要用于列举 Invoker,以及加载 LoadBalance。最后再调用模板方法 doInvoke 进行后续操作。下面我们来看一下 Invoker 列举方法 list(Invocation) 的逻辑,如下:

    protected List> list(Invocation invocation) throws RpcException {
        // 调用 Directory 的 list 方法列举 Invoker
        List> invokers = directory.list(invocation);
        return invokers;
    }
    如上,AbstractClusterInvoker 中的 list 方法做的事情很简单,只是简单的调用了 Directory 的 list 方法,没有其他更多的逻辑了。Directory 即相关实现类在前文已经分析过,这里就不多说了。接下来,我们把目光转移到 AbstractClusterInvoker 的各种实现类上,来看一下这些实现类是如何实现 doInvoke 方法逻辑的。

    FailoverClusterInvoker

    FailoverClusterInvoker 在调用失败时,会自动切换 Invoker 进行重试。默认配置下,Dubbo 会使用这个类作为缺省 Cluster Invoker。下面来看一下该类的逻辑。

    public class FailoverClusterInvoker extends AbstractClusterInvoker {

        // 省略部分代码

        @Override
        public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
            List> copyinvokers = invokers;
            checkInvokers(copyinvokers, invocation);
            // 获取重试次数
            int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            RpcException le = null;
            List> invoked = new ArrayList>(copyinvokers.size());
            Set providers = new HashSet(len);
            // 循环调用,失败重试
            for (int i = 0; i < len; i++) {
                if (i > 0) {
                    checkWhetherDestroyed();
                    // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
                    // 通过调用 list 可得到最新可用的 Invoker 列表
                    copyinvokers = list(invocation);
                    // 对 copyinvokers 进行判空检查
                    checkInvokers(copyinvokers, invocation);
                }

                // 通过负载均衡选择 Invoker
                Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
                // 添加到 invoker 到 invoked 列表中
                invoked.add(invoker);
                // 设置 invoked 到 RPC 上下文中
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    // 调用目标 Invoker 的 invoke 方法
                    Result result = invoker.invoke(invocation);
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) {
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }

            // 若重试失败,则抛出异常
            throw new RpcException(..., "Failed to invoke the method ...");
        }
    }
    如上,FailoverClusterInvoker 的 doInvoke 方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。

    在 for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个 Invoker 的 invoke 方法进行远程调用。如果失败了,记录下异常,并进行重试。重试时会再次调用父类的 list 方法列举 Invoker。

    整个流程大致如此,不是很难理解。下面我们看一下 select 方法的逻辑。

    protected Invoker select(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException {
        if (invokers == null || invokers.isEmpty())
            return null;
        // 获取调用方法名
        String methodName = invocation == null ? "" : invocation.getMethodName();

        // 获取 sticky 配置,sticky 表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的
        // 调用同一个服务提供者,除非该提供者挂了再进行切换
        boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
        {
            // 检测 invokers 列表是否包含 stickyInvoker,如果不包含,
            // 说明 stickyInvoker 代表的服务提供者挂了,此时需要将其置空
            if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
                stickyInvoker = null;
            }

            // 在 sticky 为 true,且 stickyInvoker != null 的情况下。如果 selected 包含 
            // stickyInvoker,表明 stickyInvoker 对应的服务提供者可能因网络原因未能成功提供服务。
            // 但是该提供者并没挂,此时 invokers 列表中仍存在该服务提供者对应的 Invoker。
            if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
                // availablecheck 表示是否开启了可用性检查,如果开启了,则调用 stickyInvoker 的 
                // isAvailable 方法进行检查,如果检查通过,则直接返回 stickyInvoker。
                if (availablecheck && stickyInvoker.isAvailable()) {
                    return stickyInvoker;
                }
            }
        }

        // 如果线程走到当前代码处,说明前面的 stickyInvoker 为空,或者不可用。
        // 此时继续调用 doSelect 选择 Invoker
        Invoker invoker = doSelect(loadbalance, invocation, invokers, selected);

        // 如果 sticky 为 true,则将负载均衡组件选出的 Invoker 赋值给 stickyInvoker
        if (sticky) {
            stickyInvoker = invoker;
        }
        return invoker;
    }
    如上,select 方法的主要逻辑集中在了对粘滞连接特性的支持上。

    首先是获取 sticky 配置,然后再检测 invokers 列表中是否包含 stickyInvoker,如果不包含,则认为该 stickyInvoker 不可用,此时将其置空。

    这里的 invokers 列表可以看做是存活着的服务提供者列表,如果这个列表不包含 stickyInvoker,那自然而然的认为 stickyInvoker 挂了,所以置空。

    如果 stickyInvoker 存在于 invokers 列表中,此时要进行下一项检测 — 检测 selected 中是否包含 stickyInvoker。

    如果包含的话,说明 stickyInvoker 在此之前没有成功提供服务(但其仍然处于存活状态)。此时我们认为这个服务不可靠,不应该在重试期间内再次被调用,因此这个时候不会返回该 stickyInvoker。

    如果 selected 不包含 stickyInvoker,此时还需要进行可用性检测,比如检测服务提供者网络连通性等。

    当可用性检测通过,才可返回 stickyInvoker,否则调用 doSelect 方法选择 Invoker。如果 sticky 为 true,此时会将 doSelect 方法选出的 Invoker 赋值给 stickyInvoker。

    以上就是 select 方法的逻辑,这段逻辑看起来不是很复杂,但是信息量比较大。不搞懂 invokers 和 selected 两个入参的含义,以及粘滞连接特性,这段代码是不容易看懂的。

    所以大家在阅读这段代码时,不要忽略了对背景知识的理解。关于 select 方法先分析这么多,继续向下分析。

    private Invoker doSelect(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException {
        if (invokers == null || invokers.isEmpty())
            return null;
        if (invokers.size() == 1)
            return invokers.get(0);
        if (loadbalance == null) {
            // 如果 loadbalance 为空,这里通过 SPI 加载 Loadbalance,默认为 RandomLoadBalance
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }

        // 通过负载均衡组件选择 Invoker
        Invoker invoker = loadbalance.select(invokers, getUrl(), invocation);

        // 如果 selected 包含负载均衡选择出的 Invoker,或者该 Invoker 无法经过可用性检查,此时进行重选
        if ((selected != null && selected.contains(invoker))
                || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
            try {
                // 进行重选
                Invoker rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if (rinvoker != null) {
                    // 如果 rinvoker 不为空,则将其赋值给 invoker
                    invoker = rinvoker;
                } else {
                    // rinvoker 为空,定位 invoker 在 invokers 中的位置
                    int index = invokers.indexOf(invoker);
                    try {
                        // 获取 index + 1 位置处的 Invoker,以下代码等价于:
                        //     invoker = invokers.get((index + 1) % invokers.size());
                        invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                    } catch (Exception e) {
                        logger.warn("... may because invokers list dynamic change, ignore.");
                    }
                }
            } catch (Throwable t) {
                logger.error("cluster reselect fail reason is : ...");
            }
        }
        return invoker;
    }
    doSelect 主要做了两件事,第一是通过负载均衡组件选择 Invoker。第二是,如果选出来的 Invoker 不稳定,或不可用,此时需要调用 reselect 方法进行重选。若 reselect 选出来的 Invoker 为空,此时定位 invoker 在 invokers 列表中的位置 index,然后获取 index + 1 处的 invoker,这也可以看做是重选逻辑的一部分。下面我们来看一下 reselect 方法的逻辑。

    private Invoker reselect(LoadBalance loadbalance, Invocation invocation,
        List> invokers, List> selected, boolean availablecheck) throws RpcException {

        List> reselectInvokers = new ArrayList>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

        // 下面的 if-else 分支逻辑有些冗余,pull request #2826 对这段代码进行了简化,可以参考一下
        // 根据 availablecheck 进行不同的处理
        if (availablecheck) {
            // 遍历 invokers 列表
            for (Invoker invoker : invokers) {
                // 检测可用性
                if (invoker.isAvailable()) {
                    // 如果 selected 列表不包含当前 invoker,则将其添加到 reselectInvokers 中
                    if (selected == null || !selected.contains(invoker)) {
                        reselectInvokers.add(invoker);
                    }
                }
            }

            // reselectInvokers 不为空,此时通过负载均衡组件进行选择
            if (!reselectInvokers.isEmpty()) {
                return loadbalance.select(reselectInvokers, getUrl(), invocation);
            }

        // 不检查 Invoker 可用性
        } else {
            for (Invoker invoker : invokers) {
                // 如果 selected 列表不包含当前 invoker,则将其添加到 reselectInvokers 中
                if (selected == null || !selected.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
            if (!reselectInvokers.isEmpty()) {
                // 通过负载均衡组件进行选择
                return loadbalance.select(reselectInvokers, getUrl(), invocation);
            }
        }

        {
            // 若线程走到此处,说明 reselectInvokers 集合为空,此时不会调用负载均衡组件进行筛选。
            // 这里从 selected 列表中查找可用的 Invoker,并将其添加到 reselectInvokers 集合中
            if (selected != null) {
                for (Invoker invoker : selected) {
                    if ((invoker.isAvailable())
                            && !reselectInvokers.contains(invoker)) {
                        reselectInvokers.add(invoker);
                    }
                }
            }
            if (!reselectInvokers.isEmpty()) {
                // 再次进行选择,并返回选择结果
                return loadbalance.select(reselectInvokers, getUrl(), invocation);
            }
        }
        return null;
    }
    reselect 方法总结下来其实只做了两件事情,第一是查找可用的 Invoker,并将其添加到 reselectInvokers 集合中。第二,如果 reselectInvokers 不为空,则通过负载均衡组件再次进行选择。其中第一件事情又可进行细分,一开始,reselect 从 invokers 列表中查找有效可用的 Invoker,若未能找到,此时再到 selected 列表中继续查找。

    关于 reselect 方法就先分析到这,继续分析其他的 Cluster Invoker。

    FailbackClusterInvoker

    FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务消费者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。下面来看一下它的实现逻辑。

    public class FailbackClusterInvoker extends AbstractClusterInvoker {

        private static final long RETRY_FAILED_PERIOD = 5 * 1000;

        private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
                new NamedInternalThreadFactory("failback-cluster-timer", true));

        private final ConcurrentMap> failed = new ConcurrentHashMap>();
        private volatile ScheduledFuture retryFuture;

        @Override
        protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                checkInvokers(invokers, invocation);
                // 选择 Invoker
                Invoker invoker = select(loadbalance, invocation, invokers, null);
                // 进行调用
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                // 如果调用过程中发生异常,此时仅打印错误日志,不抛出异常
                logger.error("Failback to invoke method ...");

                // 记录调用信息
                addFailed(invocation, this);
                // 返回一个空结果给服务消费者
                return new RpcResult();
            }
        }

        private void addFailed(Invocation invocation, AbstractClusterInvoker router) {
            if (retryFuture == null) {
                synchronized (this) {
                    if (retryFuture == null) {
                        // 创建定时任务,每隔5秒执行一次
                        retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

                            @Override
                            public void run() {
                                try {
                                    // 对失败的调用进行重试
                                    retryFailed();
                                } catch (Throwable t) {
                                    // 如果发生异常,仅打印异常日志,不抛出
                                    logger.error("Unexpected error occur at collect statistic", t);
                                }
                            }
                        }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                    }
                }
            }

            // 添加 invocation 和 invoker 到 failed 中
            failed.put(invocation, router);
        }

        void retryFailed() {
            if (failed.size() == 0) {
                return;
            }

            // 遍历 failed,对失败的调用进行重试
            for (Map.Entry> entry : new HashMap>(failed).entrySet()) {
                Invocation invocation = entry.getKey();
                Invoker invoker = entry.getValue();
                try {
                    // 再次进行调用
                    invoker.invoke(invocation);
                    // 调用成功后,从 failed 中移除 invoker
                    failed.remove(invocation);
                } catch (Throwable e) {
                    // 仅打印异常,不抛出
                    logger.error("Failed retry to invoke method ...");
                }
            }
        }
    }
    这个类主要由3个方法组成,首先是 doInvoker,该方法负责初次的远程调用。若远程调用失败,则通过 addFailed 方法将调用信息存入到 failed 中,等待定时重试。addFailed 在开始阶段会根据 retryFuture 为空与否,来决定是否开启定时任务。retryFailed 方法则是包含了失败重试的逻辑,该方法会对 failed 进行遍历,然后依次对 Invoker 进行调用。调用成功则将 Invoker 从 failed 中移除,调用失败则忽略失败原因。

    以上就是 FailbackClusterInvoker 的执行逻辑,不是很复杂,继续往下看。

    FailfastClusterInvoker

    FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常。适用于幂等操作,比如新增记录。源码如下:

    public class FailfastClusterInvoker extends AbstractClusterInvoker {

        @Override
        public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
            checkInvokers(invokers, invocation);
            // 选择 Invoker
            Invoker invoker = select(loadbalance, invocation, invokers, null);
            try {
                // 调用 Invoker
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                if (e instanceof RpcException && ((RpcException) e).isBiz()) {
                    // 抛出异常
                    throw (RpcException) e;
                }
                // 抛出异常
                throw new RpcException(..., "Failfast invoke providers ...");
            }
        }
    }
    如上,首先是通过 select 方法选择 Invoker,然后进行远程调用。如果调用失败,则立即抛出异常。FailfastClusterInvoker 就先分析到这,下面分析 FailsafeClusterInvoker。

    FailsafeClusterInvoker

    FailsafeClusterInvoker 是一种失败安全的 Cluster Invoker。所谓的失败安全是指,当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常。适用于写入审计日志等操作。下面分析源码。

    public class FailsafeClusterInvoker extends AbstractClusterInvoker {

        @Override
        public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                checkInvokers(invokers, invocation);
                // 选择 Invoker
                Invoker invoker = select(loadbalance, invocation, invokers, null);
                // 进行远程调用
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                // 打印错误日志,但不抛出
                logger.error("Failsafe ignore exception: " + e.getMessage(), e);
                // 返回空结果忽略错误
                return new RpcResult();
            }
        }
    }
    FailsafeClusterInvoker 的逻辑和 FailfastClusterInvoker 的逻辑一样简单,无需过多说明。继续向下分析。

    ForkingClusterInvoker

    ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。下面来看该类的实现。

    public class ForkingClusterInvoker extends AbstractClusterInvoker {

        private final ExecutorService executor = Executors.newCachedThreadPool(
                new NamedInternalThreadFactory("forking-cluster-timer", true));

        @Override
        public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                checkInvokers(invokers, invocation);
                final List> selected;
                // 获取 forks 配置
                final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
                // 获取超时配置
                final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                // 如果 forks 配置不合理,则直接将 invokers 赋值给 selected
                if (forks <= 0 || forks >= invokers.size()) {
                    selected = invokers;
                } else {
                    selected = new ArrayList>();
                    // 循环选出 forks 个 Invoker,并添加到 selected 中
                    for (int i = 0; i < forks; i++) {
                        // 选择 Invoker
                        Invoker invoker = select(loadbalance, invocation, invokers, selected);
                        if (!selected.contains(invoker)) {
                            selected.add(invoker);
                        }
                    }
                }

                // ----------------------✨ 分割线1 ✨---------------------- //

                RpcContext.getContext().setInvokers((List) selected);
                final AtomicInteger count = new AtomicInteger();
                final BlockingQueue ref = new LinkedBlockingQueue();
                // 遍历 selected 列表
                for (final Invoker invoker : selected) {
                    // 为每个 Invoker 创建一个执行线程
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // 进行远程调用
                                Result result = invoker.invoke(invocation);
                                // 将结果存到阻塞队列中
                                ref.offer(result);
                            } catch (Throwable e) {
                                int value = count.incrementAndGet();
                                // 仅在 value 大于等于 selected.size() 时,才将异常对象
                                // 放入阻塞队列中,请大家思考一下为什么要这样做。
                                if (value >= selected.size()) {
                                    // 将异常对象存入到阻塞队列中
                                    ref.offer(e);
                                }
                            }
                        }
                    });
                }

                // ----------------------✨ 分割线2 ✨---------------------- //

                try {
                    // 从阻塞队列中取出远程调用结果
                    Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);

                    // 如果结果类型为 Throwable,则抛出异常
                    if (ret instanceof Throwable) {
                        Throwable e = (Throwable) ret;
                        throw new RpcException(..., "Failed to forking invoke provider ...");
                    }

                    // 返回结果
                    return (Result) ret;
                } catch (InterruptedException e) {
                    throw new RpcException("Failed to forking invoke provider ...");
                }
            } finally {
                RpcContext.getContext().clearAttachments();
            }
        }
    }
    ForkingClusterInvoker 的 doInvoker 方法比较长,这里通过两个分割线将整个方法划分为三个逻辑块。从方法开始到分割线1之间的代码主要是用于选出 forks 个 Invoker,为接下来的并发调用提供输入。分割线1和分割线2之间的逻辑通过线程池并发调用多个 Invoker,并将结果存储在阻塞队列中。分割线2到方法结尾之间的逻辑主要用于从阻塞队列中获取返回结果,并对返回结果类型进行判断。如果为异常类型,则直接抛出,否则返回。

    以上就是ForkingClusterInvoker 的 doInvoker 方法大致过程。我们在分割线1和分割线2之间的代码上留了一个问题,问题是这样的:为什么要在value >= selected.size()的情况下,才将异常对象添加到阻塞队列中?这里来解答一下。原因是这样的,在并行调用多个服务提供者的情况下,只要有一个服务提供者能够成功返回结果,而其他全部失败。此时 ForkingClusterInvoker 仍应该返回成功的结果,而非抛出异常。在value >= selected.size()时将异常对象放入阻塞队列中,可以保证异常对象不会出现在正常结果的前面,这样可从阻塞队列中优先取出正常的结果。

    关于 ForkingClusterInvoker 就先分析到这,接下来分析最后一个 Cluster Invoker。

    BroadcastClusterInvoker

    本章的最后,我们再来看一下 BroadcastClusterInvoker。BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。源码如下。

    public class BroadcastClusterInvoker extends AbstractClusterInvoker {

        @Override
        public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
            checkInvokers(invokers, invocation);
            RpcContext.getContext().setInvokers((List) invokers);
            RpcException exception = null;
            Result result = null;
            // 遍历 Invoker 列表,逐个调用
            for (Invoker invoker : invokers) {
                try {
                    // 进行远程调用
                    result = invoker.invoke(invocation);
                } catch (RpcException e) {
                    exception = e;
                    logger.warn(e.getMessage(), e);
                } catch (Throwable e) {
                    exception = new RpcException(e.getMessage(), e);
                    logger.warn(e.getMessage(), e);
                }
            }

            // exception 不为空,则抛出异常
            if (exception != null) {
                throw exception;
            }
            return result;
        }
    }
    以上就是 BroadcastClusterInvoker 的代码,比较简单,就不多说了。

    这里分析了集群容错的几种实现方式。集群容错对于 Dubbo 框架来说,是很重要的逻辑。集群模块处于服务提供者和消费者之间,对于服务消费者来说,集群可向其屏蔽服务提供者集群的情况,使其能够专心进行远程调用。除此之外,通过集群模块,我们还可以对服务之间的调用链路进行编排优化,治理服务。总的来说,对于 Dubbo 而言,集群容错相关逻辑是非常重要的。想要对 Dubbo 有比较深的理解,集群容错是必须要掌握的。

    负载均衡

    在之前章节中,介绍了服务集群的调用方式。我们发现在多服务实例时,负载均衡调用是其中极其重要的一环。在本章节中,我们一起学习Dubbo中的各种负载均衡策略

    负载均衡的主要作用

    6be3a798a90343e3ac5680a54294148e.png
    负载均衡(LoadBalance),它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费,一举两得。 

    在 Dubbo 中,也有负载均衡的概念和相应的实现。Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。

    内置的负载均衡策略

    Dubbo 提供了4种负载均衡实现,分别是基于权重随机算法的 RandomLoadBalance、基于最少活跃调用数算法的 LeastActiveLoadBalance、基于 hash 一致性的 ConsistentHashLoadBalance,以及基于加权轮询算法的 RoundRobinLoadBalance。这几个负载均衡算法代码不是很长,但是想看懂也不是很容易,需要大家对这几个算法的原理有一定了解才行。如果不是很了解,也没不用太担心。我们会在分析每个算法的源码之前,对算法原理进行简单的讲解,帮助大家建立初步的印象。

    RandomLoadBalance

    RandomLoadBalance 是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。

    ce8db73a49154ba7a00bba2e8277210d.png 

    以上就是 RandomLoadBalance 背后的算法思想,比较简单。下面开始分析源码。

    public class RandomLoadBalance extends AbstractLoadBalance {

        public static final String NAME = "random";

        private final Random random = new Random();

        @Override
        protected Invoker doSelect(List> invokers, URL url, Invocation invocation) {
            int length = invokers.size();
            int totalWeight = 0;
            boolean sameWeight = true;
            // 下面这个循环有两个作用,第一是计算总权重 totalWeight,
            // 第二是检测每个服务提供者的权重是否相同
            for (int i = 0; i < length; i++) {
                int weight = getWeight(invokers.get(i), invocation);
                // 累加权重
                totalWeight += weight;
                // 检测当前服务提供者的权重与上一个服务提供者的权重是否相同,
                // 不相同的话,则将 sameWeight 置为 false。
                if (sameWeight && i > 0
                        && weight != getWeight(invokers.get(i - 1), invocation)) {
                    sameWeight = false;
                }
            }

            // 下面的 if 分支主要用于获取随机数,并计算随机数落在哪个区间上
            if (totalWeight > 0 && !sameWeight) {
                // 随机获取一个 [0, totalWeight) 区间内的数字
                int offset = random.nextInt(totalWeight);
                // 循环让 offset 数减去服务提供者权重值,当 offset 小于0时,返回相应的 Invoker。
                // 举例说明一下,我们有 servers = [A, B, C],weights = [5, 3, 2],offset = 7。
                // 第一次循环,offset - 5 = 2 > 0,即 offset > 5,
                // 表明其不会落在服务器 A 对应的区间上。
                // 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8,
                // 表明其会落在服务器 B 对应的区间上
                for (int i = 0; i < length; i++) {
                    // 让随机值 offset 减去权重值
                    offset -= getWeight(invokers.get(i), invocation);
                    if (offset < 0) {
                        // 返回相应的 Invoker
                        return invokers.get(i);
                    }
                }
            }

            // 如果所有服务提供者权重值相同,此时直接随机返回一个即可
            return invokers.get(random.nextInt(length));
        }
    }
    RandomLoadBalance 的算法思想比较简单,在经过多次请求后,能够将调用请求按照权重值进行“均匀”分配。当然 RandomLoadBalance 也存在一定的缺点,当调用次数比较少时,Random 产生的随机数可能会比较集中,此时多数请求会落到同一台服务器上。这个缺点并不是很严重,多数情况下可以忽略。RandomLoadBalance 是一个简单,高效的负载均衡实现,因此 Dubbo 选择它作为缺省实现。

    LeastActiveLoadBalance

    LeastActiveLoadBalance 翻译过来是最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。关于 LeastActiveLoadBalance 的背景知识就先介绍到这里,下面开始分析源码。

    public class LeastActiveLoadBalance extends AbstractLoadBalance {

        public static final String NAME = "leastactive";

        @Override
        protected Invoker doSelect(List> invokers, URL url, Invocation invocation) {
            int length = invokers.size();
            // 最小的活跃数
            int leastActive = -1;
            // 具有相同“最小活跃数”的服务者提供者(以下用 Invoker 代称)数量
            int leastCount = 0;
            // leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息
            int[] leastIndexes = new int[length];
            // 记录每个Invoker的权重
            int[] weights = new int[length];
            int totalWeight = 0;
            // 第一个最小活跃数的 Invoker 权重值,用于与其他具有相同最小活跃数的 Invoker 的权重进行对比,
            // 以检测是否“所有具有相同最小活跃数的 Invoker 的权重”均相等
            int firstWeight = 0;
            boolean sameWeight = true;


            //遍历 invokers 列表
            for (int i = 0; i < length; i++) {
                Invoker invoker = invokers.get(i);
                // 获取 Invoker 对应的活跃数
                int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
                int afterWarmup = getWeight(invoker, invocation);
                //获取权重
                weights[i] = afterWarmup;
                // 发现更小的活跃数,重新开始
                if (leastActive == -1 || active < leastActive) {
                    // 使用当前活跃数 active 更新最小活跃数 leastActive
                    leastActive = active;
                    // 更新 leastCount 为 1
                    leastCount = 1;
                    // 记录当前下标值到 leastIndexs 中
                    leastIndexes[0] = i;
                    totalWeight = afterWarmup;
                    firstWeight = afterWarmup;
                    sameWeight = true;
                    // 当前 Invoker 的活跃数 active 与最小活跃数 leastActive 相同
                } else if (active == leastActive) {
                    // 在 leastIndexs 中记录下当前 Invoker 在 invokers 集合中的下标
                    leastIndexes[leastCount++] = i;
                    // 累加权重
                    totalWeight += afterWarmup;
                    // 检测当前 Invoker 的权重与 firstWeight 是否相等,
                    // 不相等则将 sameWeight 置为 false
                    if (sameWeight && i > 0
                            && afterWarmup != firstWeight) {
                        sameWeight = false;
                    }
                }
            }
            // 当只有一个 Invoker 具有最小活跃数,此时直接返回该 Invoker 即可
            if (leastCount == 1) {
                return invokers.get(leastIndexes[0]);
            }
            // 有多个 Invoker 具有相同的最小活跃数,但它们之间的权重不同
            if (!sameWeight && totalWeight > 0) {
                // 随机生成一个 [0, totalWeight) 之间的数字
                int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
                // 循环让随机数减去具有最小活跃数的 Invoker 的权重值,
                // 当 offset 小于等于0时,返回相应的 Invoker
                for (int i = 0; i < leastCount; i++) {
                    int leastIndex = leastIndexes[i];
                    //获取权重值,并让随机数减去权重值
                    offsetWeight -= weights[leastIndex];
                    if (offsetWeight < 0) {
                        return invokers.get(leastIndex);
                    }
                }
            }
            // 如果权重相同或权重为0时,随机返回一个 Invoker
            return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
        }
    }
    除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。所以准确的来说,LeastActiveLoadBalance 是基于加权最小活跃数算法实现的。举个例子说明一下,在一个服务提供者集群中,有两个性能优异的服务提供者。某一时刻它们的活跃数相同,此时 Dubbo 会根据它们的权重去分配请求,权重越大,获取到新请求的概率就越大。如果两个服务提供者权重相同,此时随机选择一个即可。

    ConsistentHashLoadBalance

    一致性 hash 算法由麻省理工学院的 Karger 及其合作者于1997年提出的,算法提出之初是用于大规模缓存系统的负载均衡。它的工作过程是这样的,首先根据 ip 或者其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 2^32-1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。大致效果如下图所示,每个缓存节点在圆环上占据一个位置。如果缓存项的 key 的 hash 值小于缓存节点 hash 值,则到该缓存节点中存储或读取缓存项。比如下面绿色点对应的缓存项将会被存储到 cache-2 节点中。由于 cache-3 挂了,原本应该存到该节点中的缓存项最终会存储到 cache-4 节点中。

    e85de623c6764272a3b671c4298e89a6.png
    下面来看看一致性 hash 在 Dubbo 中的应用。我们把上图的缓存节点替换成 Dubbo 的服务提供者,于是得到了下图: 

    e435c22961d7421bad004fe720ac7218.png
    这里相同颜色的节点均属于同一个服务提供者,比如 Invoker1-1,Invoker1-2,……, Invoker1-160。这样做的目的是通过引入虚拟节点,让 Invoker 在圆环上分散开来,避免数据倾斜问题。所谓数据倾斜是指,由于节点不够分散,导致大量请求落到了同一个节点上,而其他节点只会接收到了少量请求的情况。比如: 

    d930f05801b4435db27686db91e91c47.png
    如上,由于 Invoker-1 和 Invoker-2 在圆环上分布不均,导致系统中75%的请求都会落到 Invoker-1 上,只有 25% 的请求会落到 Invoker-2 上。解决这个问题办法是引入虚拟节点,通过虚拟节点均衡各个节点的请求量。 

    到这里背景知识就普及完了,接下来开始分析源码。我们先从 ConsistentHashLoadBalance 的 doSelect 方法开始看起,如下:

    public class ConsistentHashLoadBalance extends AbstractLoadBalance {

        private final ConcurrentMap> selectors = 
            new ConcurrentHashMap>();

        @Override
        protected Invoker doSelect(List> invokers, URL url, Invocation invocation) {
            String methodName = RpcUtils.getMethodName(invocation);
            String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;

            // 获取 invokers 原始的 hashcode
            int identityHashCode = System.identityHashCode(invokers);
            ConsistentHashSelector selector = (ConsistentHashSelector) selectors.get(key);
            // 如果 invokers 是一个新的 List 对象,意味着服务提供者数量发生了变化,可能新增也可能减少了。
            // 此时 selector.identityHashCode != identityHashCode 条件成立
            if (selector == null || selector.identityHashCode != identityHashCode) {
                // 创建新的 ConsistentHashSelector
                selectors.put(key, new ConsistentHashSelector(invokers, methodName, identityHashCode));
                selector = (ConsistentHashSelector) selectors.get(key);
            }

            // 调用 ConsistentHashSelector 的 select 方法选择 Invoker
            return selector.select(invocation);
        }

        private static final class ConsistentHashSelector {...}
    }
    如上,doSelect 方法主要做了一些前置工作,比如检测 invokers 列表是不是变动过,以及创建 ConsistentHashSelector。这些工作做完后,接下来开始调用 ConsistentHashSelector 的 select 方法执行负载均衡逻辑。在分析 select 方法之前,我们先来看一下一致性 hash 选择器 ConsistentHashSelector 的初始化过程,如下:

    private static final class ConsistentHashSelector {

        // 使用 TreeMap 存储 Invoker 虚拟节点
        private final TreeMap> virtualInvokers;

        private final int replicaNumber;

        private final int identityHashCode;

        private final int[] argumentIndex;

        ConsistentHashSelector(List> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            // 获取虚拟节点数,默认为160
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            // 获取参与 hash 计算的参数下标值,默认对第一个参数进行 hash 运算
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker invoker : invokers) {
                String address = invoker.getUrl().getAddress();
                for (int i = 0; i < replicaNumber / 4; i++) {
                    // 对 address + i 进行 md5 运算,得到一个长度为16的字节数组
                    byte[] digest = md5(address + i);
                    // 对 digest 部分字节进行4次 hash 运算,得到四个不同的 long 型正整数
                    for (int h = 0; h < 4; h++) {
                        // h = 0 时,取 digest 中下标为 0 ~ 3 的4个字节进行位运算
                        // h = 1 时,取 digest 中下标为 4 ~ 7 的4个字节进行位运算
                        // h = 2, h = 3 时过程同上
                        long m = hash(digest, h);
                        // 将 hash 到 invoker 的映射关系存储到 virtualInvokers 中,
                        // virtualInvokers 需要提供高效的查询操作,因此选用 TreeMap 作为存储结构
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }
    }
    ConsistentHashSelector 的构造方法执行了一系列的初始化逻辑,比如从配置中获取虚拟节点数以及参与 hash 计算的参数下标,默认情况下只使用第一个参数进行 hash。需要特别说明的是,ConsistentHashLoadBalance 的负载均衡逻辑只受参数值影响,具有相同参数值的请求将会被分配给同一个服务提供者。ConsistentHashLoadBalance 不 关系权重,因此使用时需要注意一下。

    在获取虚拟节点数和参数下标配置后,接下来要做的事情是计算虚拟节点 hash 值,并将虚拟节点存储到 TreeMap 中。到此,ConsistentHashSelector 初始化工作就完成了。接下来,我们来看看 select 方法的逻辑。

    public Invoker select(Invocation invocation) {
        // 将参数转为 key
        String key = toKey(invocation.getArguments());
        // 对参数 key 进行 md5 运算
        byte[] digest = md5(key);
        // 取 digest 数组的前四个字节进行 hash 运算,再将 hash 值传给 selectForKey 方法,
        // 寻找合适的 Invoker
        return selectForKey(hash(digest, 0));
    }

    private Invoker selectForKey(long hash) {
        // 到 TreeMap 中查找第一个节点值大于或等于当前 hash 的 Invoker
        Map.Entry> entry = virtualInvokers.tailMap(hash, true).firstEntry();
        // 如果 hash 大于 Invoker 在圆环上最大的位置,此时 entry = null,
        // 需要将 TreeMap 的头节点赋值给 entry
        if (entry == null) {
            entry = virtualInvokers.firstEntry();
        }

        // 返回 Invoker
        return entry.getValue();
    }
    如上,选择的过程相对比较简单了。首先是对参数进行 md5 以及 hash 运算,得到一个 hash 值。然后再拿这个值到 TreeMap 中查找目标 Invoker 即可。

    RoundRobinLoadBalance

    LeastActiveLoadBalance 即加权轮询负载均衡,我们先来了解一下什么是加权轮询。这里从最简单的轮询开始讲起,所谓轮询是指将请求轮流分配给每台服务器。举个例子,我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。轮询是一种无状态负载均衡算法,实现简单,适用于每台服务器性能相近的场景下。但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。

    public class RoundRobinLoadBalance extends AbstractLoadBalance {
        public static final String NAME = "roundrobin";

        private static int RECYCLE_PERIOD = 60000;

        protected static class WeightedRoundRobin {
            // 服务提供者权重
            private int weight;
            // 当前权重
            private AtomicLong current = new AtomicLong(0);
            // 最后一次更新时间
            private long lastUpdate;

            public void setWeight(int weight) {
                this.weight = weight;
                // 初始情况下,current = 0
                current.set(0);
            }
            public long increaseCurrent() {
                // current = current + weight;
                return current.addAndGet(weight);
            }
            public void sel(int total) {
                // current = current - total;
                current.addAndGet(-1 * total);
            }
        }

        // 嵌套 Map 结构,存储的数据结构示例如下:
        // {
        //     "UserService.query": {
        //         "url1": WeightedRoundRobin@123, 
        //         "url2": WeightedRoundRobin@456, 
        //     },
        //     "UserService.update": {
        //         "url1": WeightedRoundRobin@123, 
        //         "url2": WeightedRoundRobin@456,
        //     }
        // }
        // 最外层为服务类名 + 方法名,第二层为 url 到 WeightedRoundRobin 的映射关系。
        // 这里我们可以将 url 看成是服务提供者的 id
        private ConcurrentMap> methodWeightMap = new ConcurrentHashMap>();

        // 原子更新锁
        private AtomicBoolean updateLock = new AtomicBoolean();

        @Override
        protected Invoker doSelect(List> invokers, URL url, Invocation invocation) {
            String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
            // 获取 url 到 WeightedRoundRobin 映射表,如果为空,则创建一个新的
            ConcurrentMap map = methodWeightMap.get(key);
            if (map == null) {
                methodWeightMap.putIfAbsent(key, new ConcurrentHashMap());
                map = methodWeightMap.get(key);
            }
            int totalWeight = 0;
            long maxCurrent = Long.MIN_VALUE;

            // 获取当前时间
            long now = System.currentTimeMillis();
            Invoker selectedInvoker = null;
            WeightedRoundRobin selectedWRR = null;

            // 下面这个循环主要做了这样几件事情:
            //   1. 遍历 Invoker 列表,检测当前 Invoker 是否有
            //      相应的 WeightedRoundRobin,没有则创建
            //   2. 检测 Invoker 权重是否发生了变化,若变化了,
            //      则更新 WeightedRoundRobin 的 weight 字段
            //   3. 让 current 字段加上自身权重,等价于 current += weight
            //   4. 设置 lastUpdate 字段,即 lastUpdate = now
            //   5. 寻找具有最大 current 的 Invoker,以及 Invoker 对应的 WeightedRoundRobin,
            //      暂存起来,留作后用
            //   6. 计算权重总和
            for (Invoker invoker : invokers) {
                String identifyString = invoker.getUrl().toIdentityString();
                WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
                int weight = getWeight(invoker, invocation);
                if (weight < 0) {
                    weight = 0;
                }

                // 检测当前 Invoker 是否有对应的 WeightedRoundRobin,没有则创建
                if (weightedRoundRobin == null) {
                    weightedRoundRobin = new WeightedRoundRobin();
                    // 设置 Invoker 权重
                    weightedRoundRobin.setWeight(weight);
                    // 存储 url 唯一标识 identifyString 到 weightedRoundRobin 的映射关系
                    map.putIfAbsent(identifyString, weightedRoundRobin);
                    weightedRoundRobin = map.get(identifyString);
                }
                // Invoker 权重不等于 WeightedRoundRobin 中保存的权重,说明权重变化了,此时进行更新
                if (weight != weightedRoundRobin.getWeight()) {
                    weightedRoundRobin.setWeight(weight);
                }

                // 让 current 加上自身权重,等价于 current += weight
                long cur = weightedRoundRobin.increaseCurrent();
                // 设置 lastUpdate,表示近期更新过
                weightedRoundRobin.setLastUpdate(now);
                // 找出最大的 current 
                if (cur > maxCurrent) {
                    maxCurrent = cur;
                    // 将具有最大 current 权重的 Invoker 赋值给 selectedInvoker
                    selectedInvoker = invoker;
                    // 将 Invoker 对应的 weightedRoundRobin 赋值给 selectedWRR,留作后用
                    selectedWRR = weightedRoundRobin;
                }

                // 计算权重总和
                totalWeight += weight;
            }

            // 对 进行检查,过滤掉长时间未被更新的节点。
            // 该节点可能挂了,invokers 中不包含该节点,所以该节点的 lastUpdate 长时间无法被更新。
            // 若未更新时长超过阈值后,就会被移除掉,默认阈值为60秒。
            if (!updateLock.get() && invokers.size() != map.size()) {
                if (updateLock.compareAndSet(false, true)) {
                    try {
                        ConcurrentMap newMap = new ConcurrentHashMap();
                        // 拷贝
                        newMap.putAll(map);

                        // 遍历修改,即移除过期记录
                        Iterator> it = newMap.entrySet().iterator();
                        while (it.hasNext()) {
                            Entry item = it.next();
                            if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                                it.remove();
                            }
                        }

                        // 更新引用
                        methodWeightMap.put(key, newMap);
                    } finally {
                        updateLock.set(false);
                    }
                }
            }

            if (selectedInvoker != null) {
                // 让 current 减去权重总和,等价于 current -= totalWeight
                selectedWRR.sel(totalWeight);
                // 返回具有最大 current 的 Invoker
                return selectedInvoker;
            }

            // should not happen here
            return invokers.get(0);
        }
    }
    轮询调用并不是简单的一个接着一个依次调用,它是根据权重的值进行循环的。

    负载均衡总结

    Dubbo 负载均衡策略提供下列四种方式:

    ​ Random LoadBalance 随机,按权重设置随机概率。 Dubbo的默认负载均衡策略

    ​ 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

    ​ RoundRobin LoadBalance 轮循,按公约后的权重设置轮循比率。

    ​ 存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

    ​ LeastActive LoadBalance 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。

    ​ 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

    ​ ConsistentHash LoadBalance 一致性Hash,相同参数的请求总是发到同一提供者。

    ​ 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

    服务治理

    服务治理的概述

    服务治理主要作用是改变运行时服务的行为和选址逻辑,达到限流,权重配置等目的,主要有:标签路由,条件路由,黑白名单,动态配置,权重调节,负载均衡等功能。

    f99d28b567ca4402b9efdb75fd8cc817.png
    执行过程 

    3709459117fb44e492e517761261f661.png
    1、消费者,提供者启动成功,订阅zookeeper节点 

    2、管理平台对服务进行治理处理,向zookeeper写入节点数据

    3、写入成功,通知消费者,提供者

    4、根据不同的业务处理,在invoker调用时做出响应的处理

    相关案例

    服务禁用

    服务禁用:通常用于临时踢除某台提供者机器

    configVersion: v2.7
    scope: application
    key: demo-provider
    enabled: true
    configs:
      - addresses: ["192.168.191.2:20883"]
        side: provider
        parameters:
          disabled: true
    服务降级屏蔽

    服务降级,当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行。

    容错:当系统出现非业务异常(比如并发数太高导致超时,网络异常等)时,不对该接口进行处理。
    屏蔽:在大促,促销活动的可预知情况下,例如双11活动。采用直接屏蔽接口访问
    configVersion: v2.7
    scope: service
    key: org.apache.dubbo.samples.governance.api.DemoService
    enabled: true
    configs:
      - side: consumer
        parameters:
         force: return 12345

  • 相关阅读:
    天软特色因子看板 (2023.10 第03期)
    ElementUI之动态树+数据表格+分页
    『LeetCode|每日一题』---->按摩师
    Flink1.12.7 Standalone版本安装
    数据库系统原理与应用教程:以MySQL为例(2)—— MySQL 软件的卸载(windows 环境)
    玩转 MaxCompute SQL 训练营! 数据分析挖掘迅速出师
    C#:Winfrom 实现DataGridView 自定义分页
    linux命令行与shell脚本大全——学习笔记(1-4章)
    sqlserver刷新全部视图
    js单行代码-----函数
  • 原文地址:https://blog.csdn.net/m0_72088858/article/details/126926217