• Dubbo源码(七) - 集群


    前言

    本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo

    集群(cluster)就是一组计算机,它们作为一个总体向用户提供一组网络资源。这些单个的计算机系统就是集群的节点(node)。

    在Dubbo中,为了避免单点故障,同一个服务允许有多个服务提供者,也允许同时连接多个注册中心。那么,服务消费者引用服务时,该请求哪个注册中心的服务提供者以及调用失败之后该如何处理呢?这些就是Dubbo集群所做的事。

    集群容错

    在分析集群源码之前,先看看集群容错的所有组件,下图是官方文档的组件图

    Untitled

    Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker:

    • Cluster 是接口,其只有一个方法,负责生成Cluster Invoker
    • Cluster Invoker继承了Invoker接口,是一个 Invoker,是主要逻辑实现的地方

    将上图从中间切分,可将集群工作过程分为两个阶段,左边为第一阶段。

    1. 第一个阶段是在服务消费者初始化期间。

      集群 Cluster 实现类为服务消费者创建 Cluster Invoker,即图上的 merge 操作,也就是将多个服务提供者合并为一个 Cluster Invoker

    2. 第二个阶段是在服务消费者进行远程调用时。

      步骤大体上就如图所示:list → route → select → invoke

      • list:从服务目录拿到 invoker 集合
      • route:通过路由过滤出符合规则的 invoker 集合
      • select:通过负载均衡从 invoker 集合中选择一个
      • invoke:执行 invoker 的 invoke 方法,进行真正的远程调用

      其中,list、route操作在之前文章讲过了,传送门:《服务目录》《服务路由》

      select 不是本文重点,后续负载均衡时讲解。

    以上就是集群工作的整个流程,这里并没有介绍集群是如何容错的,也就是 invoke 步骤调用失败的处理。Dubbo提供了多种容错方式:集群容错示例

    下面的源码我们以默认的 Failover Cluster - 失败自动切换 进行分析

    源码分析

    Cluster

    首先来看看 Cluster 接口,这是一个自适应拓展类,默认实现为FailoverCluster

    public class FailoverCluster implements Cluster {
    public final static String NAME = "failover";
    @Override
    public Invoker join(Directory directory) throws RpcException {
    return new FailoverClusterInvoker(directory);
    }
    }

    前面讲了,Cluster 的作用就是将多个服务提供者合并为一个 Cluster Invoker

    多个服务提供者合并也就是 服务目录(Directory) 中的 invoker 集合。join 方法返回了一个 Cluster Invoker

    接下来,我们看看调用路径。 Cluster 接口在多个地方被调用,我们看服务消费者初始化期间的调用。

    // 调用路径如下:
    // ReferenceBean#getObject()
    // ReferenceConfig#get()
    // ReferenceConfig#init()
    // ReferenceConfig#createProxy(Map map)
    // RegistryProtocol#refer(Class type, URL url)
    // RegistryProtocol#doRefer(Cluster cluster, Registry registry, Class type, URL url)
    private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {
    // 创建 RegistryDirectory 实例
    RegistryDirectory directory = new RegistryDirectory(type, url);
    // 设置注册中心和协议
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map parameters = new HashMap(directory.getUrl().getParameters());
    // 生成服务消费者链接
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    // 注册服务消费者,在 consumers 目录下新节点
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
    && url.getParameter(Constants.REGISTER_KEY, true)) {
    URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
    registry.register(registeredConsumerUrl);
    directory.setRegisteredConsumerUrl(registeredConsumerUrl);
    }
    // 订阅 providers、configurators、routers 等节点数据
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
    Constants.PROVIDERS_CATEGORY
    + "," + Constants.CONFIGURATORS_CATEGORY
    + "," + Constants.ROUTERS_CATEGORY));
    // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
    }

    调用在倒数第三行代码。

    如果看过我之前写的《服务引用》那篇文章,想必对 doRefer 方法不陌生了。在服务目录订阅完注册中心的数据后,就调用 join 方法生成 Cluster Invoker

    啰嗦多一句:

    可以这么理解,实际负责远程调用的,是服务目录中的 invoker 集合中的 invoker,而 Cluster Invoker 则对服务目录中的 invoker 集合进行处理。

    Cluster Invoker

    默认的 Cluster Invoker 是FailoverClusterInvoker,既然是一个 Invoker,我们就从它的 invoke 方法入手。

    AbstractClusterInvoker

    invoke 方法在它的父类AbstractClusterInvoker

    public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();
    LoadBalance loadbalance = null;
    // 绑定 attachments 到 invocation 中.
    Map contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
    ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    // 列举 Invoker
    List> invokers = list(invocation);
    if (invokers != null && !invokers.isEmpty()) {
    // 加载 LoadBalance
    loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
    .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 调用 doInvoke 进行后续操作
    return doInvoke(invocation, invokers, loadbalance);
    }

    invoke 方法逻辑也很简单:

    1. 列举 invoker
    2. 加载 LoadBalance(自适应拓展类)
    3. 调用 doInvoke 进行后续操作

    其中列举 invoker 如下

    protected List> list(Invocation invocation) throws RpcException {
    List> invokers = directory.list(invocation);
    return invokers;
    }

    list 方法就是调用服务目录的 list 方法,里面做了两件事(结合前面的组件图):

    • list:从服务目录拿到 invoker 集合
    • route:通过路由过滤出符合规则的 invoker 集合

    FailoverClusterInvoker

    doInvoke 方法具体实现在FailoverClusterInvoker中,此 invoker 的容错方式为失败自动切换。

    public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
    List> copyinvokers = invokers;
    checkInvokers(copyinvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 获取重试次数
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
    len = 1;
    }
    // 循环调用,失败重试
    RpcException le = null; // last exception.
    List> invoked = new ArrayList>(copyinvokers.size()); // invoked invokers.
    Set providers = new HashSet(len);
    for (int i = 0; i < len; i++) {
    if (i > 0) {
    checkWhetherDestroyed();
    // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
    // 通过调用 list 可得到最新可用的 Invoker 列表
    copyinvokers = list(invocation);
    // check again
    // 对 copyinvokers 进行判空检查
    checkInvokers(copyinvokers, invocation);
    }
    // 通过负载均衡选择 Invoker
    Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
    // 添加到 invoker 到 invoked 列表中
    invoked.add(invoker);
    // 设置 invoked 到 RPC 上下文中
    RpcContext.getContext().setInvokers((List) invoked);
    try {
    // 调用目标 Invoker 的 invoke 方法
    Result result = invoker.invoke(invocation);
    return result;
    } catch (RpcException e) {
    if (e.isBiz()) { // biz exception.
    throw e;
    }
    le = e;
    } catch (Throwable e) {
    le = new RpcException(e.getMessage(), e);
    } finally {
    providers.add(invoker.getUrl().getAddress());
    }
    }
    // 若重试失败,则抛出异常
    throw new RpcException(xxx);
    }

    doInvoke 方法代码量不少,但是逻辑简化之后也很简单,就是根据重试次数,在 for 循环中进行远程调用,成功则返回,失败就重试。如果重试次数耗尽还无法调用成功,则抛出异常。

    从这里可以知道,Dubbo的默认失败重试次数是3次。

    此方法中我们关注下 select 方法,它负责从 invoker 集合中选出一个 infoker

    protected Invoker select(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
    return null;
    // 获取调用方法名
    String methodName = invocation == null ? "" : invocation.getMethodName();
    // 获取 sticky 配置,sticky 表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的
    // 调用同一个服务提供者,除非该提供者挂了再进行切换
    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
    {
    // 检测 invokers 列表是否包含 stickyInvoker,如果不包含,
    // 说明 stickyInvoker 代表的服务提供者挂了,此时需要将其置空
    if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
    stickyInvoker = null;
    }
    // 在 sticky 为 true,且 stickyInvoker != null 的情况下。如果 selected 包含
    // stickyInvoker,表明 stickyInvoker 对应的服务提供者可能因网络原因未能成功提供服务。
    // 但是该提供者并没挂,此时 invokers 列表中仍存在该服务提供者对应的 Invoker。
    if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
    // availablecheck 表示是否开启了可用性检查,如果开启了,则调用 stickyInvoker 的
    // isAvailable 方法进行检查,如果检查通过,则直接返回 stickyInvoker。
    if (availablecheck && stickyInvoker.isAvailable()) {
    return stickyInvoker;
    }
    }
    }
    // 如果线程走到当前代码处,说明前面的 stickyInvoker 为空,或者不可用。
    // 此时继续调用 doSelect 选择 Invoker
    Invoker invoker = doSelect(loadbalance, invocation, invokers, selected);
    // 如果 sticky 为 true,则将负载均衡组件选出的 Invoker 赋值给 stickyInvoker
    if (sticky) {
    stickyInvoker = invoker;
    }
    return invoker;
    }

    select 方法主要处理对粘滞连接特性的支持。注释写的很清楚了。选择 invoker 的操作在 doSelect 方法

    private Invoker doSelect(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
    return null;
    if (invokers.size() == 1)
    return invokers.get(0);
    if (loadbalance == null) {
    // 如果 loadbalance 为空,这里通过 SPI 加载 Loadbalance,默认为 RandomLoadBalance
    loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    // 通过负载均衡组件选择 Invoker
    Invoker invoker = loadbalance.select(invokers, getUrl(), invocation);
    // 如果 selected 包含负载均衡选择出的 Invoker,或者该 Invoker 无法经过可用性检查,此时进行重选
    if ((selected != null && selected.contains(invoker))
    || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
    try {
    // 进行重选
    Invoker rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
    if (rinvoker != null) {
    invoker = rinvoker;
    } else {
    // rinvoker 为空,定位 invoker 在 invokers 中的位置
    int index = invokers.indexOf(invoker);
    try {
    // 获取 index + 1 位置处的 Invoker,以下代码等价于:
    // invoker = invokers.get((index + 1) % invokers.size());
    invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
    } catch (Exception e) {
    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
    }
    }
    } catch (Throwable t) {
    logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
    }
    }
    return invoker;
    }

    这里通过负载均衡选出 invoker,如果 invoker 在 selected 中(就是在doInvoke方法中调用失败的invoker)或者不可用,则调用 reselect 方法进行重选。如果重选还是选不出 invoker,则返回 invoker 集合中的下一个元素。这里的繁琐判断,就是为了尽量保证拿到可用的 invoker

    我们继续看看 reselect 方法

    private Invoker reselect(LoadBalance loadbalance, Invocation invocation,
    List> invokers, List> selected, boolean availablecheck)
    throws RpcException {
    List> reselectInvokers = new ArrayList>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
    // 下面的 if-else 分支逻辑有些冗余,pull request #2826 对这段代码进行了简化,可以参考一下
    // 根据 availablecheck 进行不同的处理
    if (availablecheck) { // invoker.isAvailable() should be checked
    for (Invoker invoker : invokers) {
    if (invoker.isAvailable()) {
    if (selected == null || !selected.contains(invoker)) {
    reselectInvokers.add(invoker);
    }
    }
    }
    if (!reselectInvokers.isEmpty()) {
    return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }
    } else { // do not check invoker.isAvailable()
    for (Invoker invoker : invokers) {
    if (selected == null || !selected.contains(invoker)) {
    reselectInvokers.add(invoker);
    }
    }
    if (!reselectInvokers.isEmpty()) {
    return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }
    }
    {
    // 若线程走到此处,说明 reselectInvokers 集合为空,此时不会调用负载均衡组件进行筛选。
    // 这里从 selected 列表中查找可用的 Invoker,并将其添加到 reselectInvokers 集合中
    if (selected != null) {
    for (Invoker invoker : selected) {
    if ((invoker.isAvailable()) // available first
    && !reselectInvokers.contains(invoker)) {
    reselectInvokers.add(invoker);
    }
    }
    }
    if (!reselectInvokers.isEmpty()) {
    return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }
    }
    return null;
    }

    这个方法可以分成两部分:

    1. 在非 selected 的 invoker 集合中,调用负载均衡选择一个 invoker
    2. 在步骤1无法选出 invoker 时,在 selected 中选出 invoker

    至此,Dubbo的集群就讲完了。负载均衡有空再说。

    再论Cluster

    前面我们提到 Cluster 接口在多个地方被调用,也讲了同一个服务有多个服务提供者时的处理。那么,有多个注册中心呢,该如何处理?

    // 类ReferenceConfig
    private T createProxy(Map map) {
    ......
    // 本地引用
    if (isJvmRefer) {
    ......
    // 远程引用
    } else {
    ......
    // 单个注册中心或服务提供者(服务直连,下同)
    if (urls.size() == 1) {
    // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
    invoker = refprotocol.refer(interfaceClass, urls.get(0));
    // 多个注册中心或多个服务提供者,或者两者混合
    } else {
    List> invokers = new ArrayList>();
    URL registryURL = null;
    // 获取所有的 Invoker
    for (URL url : urls) {
    // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
    // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
    invokers.add(refprotocol.refer(interfaceClass, url));
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    registryURL = url; // use last registry url
    }
    }
    if (registryURL != null) { // registry url is available
    // use AvailableCluster only when register's cluster is available
    // 如果注册中心链接不为空,则将使用 AvailableCluster
    URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
    // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
    invoker = cluster.join(new StaticDirectory(u, invokers));
    } else { // not a registry url
    invoker = cluster.join(new StaticDirectory(invokers));
    }
    }
    }
    // 生成代理类
    return (T) proxyFactory.getProxy(invoker);
    }

    createProxy 是服务引用时,生成服务代理对象的方法。这里会判断,如果有多个注册中心,会再封装一层集群,也就是先选择注册中心,再选择服务提供者。

    这里一般情况 registryURL 不为空,cluster 使用的是AvailableCluster

    public class AvailableCluster implements Cluster {
    public static final String NAME = "available";
    @Override
    public Invoker join(Directory directory) throws RpcException {
    return new AbstractClusterInvoker(directory) {
    @Override
    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
    for (Invoker invoker : invokers) {
    if (invoker.isAvailable()) {
    return invoker.invoke(invocation);
    }
    }
    throw new RpcException("No provider available in " + invokers);
    }
    };
    }
    }

    AvailableCluster的逻辑很简单,按顺序选择可使用的 invoker (这里的invoker其实就是每个注册中心)

    总结

    本篇文章介绍了Dubbo集群容错的整体工作过程和调用逻辑。Dubbo提供了多种集群实现,本文只介绍了Failover Cluster,其余实现感兴趣的可以自行查看源码。

  • 相关阅读:
    [YOLOV7] Win10+Anaconda+Pytorch 部署YOLOv7(含踩坑解决方案)
    论,韭菜的自我修养
    探索Moonbeam路由流动性的强大功能
    03-GO语言基础基本数据类型
    xml和json互转工具类
    Spring Data JPA @Entity之间的关联关系注解如何正确使用?
    2024年最新通信安全员考试题库
    牛客网Python专项练习重点题整理
    军训场KL
    MethodInterceptor
  • 原文地址:https://www.cnblogs.com/konghuanxi/p/16572167.html