前言
本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo
集群(cluster)就是一组计算机,它们作为一个总体向用户提供一组网络资源。这些单个的计算机系统就是集群的节点(node)。
在Dubbo中,为了避免单点故障,同一个服务允许有多个服务提供者,也允许同时连接多个注册中心。那么,服务消费者引用服务时,该请求哪个注册中心的服务提供者以及调用失败之后该如何处理呢?这些就是Dubbo集群所做的事。
集群容错
在分析集群源码之前,先看看集群容错的所有组件,下图是官方文档的组件图
Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker:
- Cluster 是接口,其只有一个方法,负责生成Cluster Invoker
- Cluster Invoker继承了Invoker接口,是一个 Invoker,是主要逻辑实现的地方
将上图从中间切分,可将集群工作过程分为两个阶段,左边为第一阶段。
-
第一个阶段是在服务消费者初始化期间。
集群 Cluster 实现类为服务消费者创建 Cluster Invoker,即图上的 merge 操作,也就是将多个服务提供者合并为一个 Cluster Invoker
-
第二个阶段是在服务消费者进行远程调用时。
步骤大体上就如图所示:list → route → select → invoke
- list:从服务目录拿到 invoker 集合
- route:通过路由过滤出符合规则的 invoker 集合
- select:通过负载均衡从 invoker 集合中选择一个
- invoke:执行 invoker 的 invoke 方法,进行真正的远程调用
其中,list、route操作在之前文章讲过了,传送门:《服务目录》、《服务路由》
select 不是本文重点,后续负载均衡时讲解。
以上就是集群工作的整个流程,这里并没有介绍集群是如何容错的,也就是 invoke 步骤调用失败的处理。Dubbo提供了多种容错方式:集群容错示例
下面的源码我们以默认的 Failover Cluster - 失败自动切换 进行分析
源码分析
Cluster
首先来看看 Cluster 接口,这是一个自适应拓展类,默认实现为FailoverCluster
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; @Override public Invoker join(Directory directory) throws RpcException { return new FailoverClusterInvoker(directory); } }
前面讲了,Cluster 的作用就是将多个服务提供者合并为一个 Cluster Invoker
多个服务提供者合并也就是 服务目录(Directory) 中的 invoker 集合。join 方法返回了一个 Cluster Invoker
接下来,我们看看调用路径。 Cluster 接口在多个地方被调用,我们看服务消费者初始化期间的调用。
// 调用路径如下: // ReferenceBean#getObject() // ReferenceConfig#get() // ReferenceConfig#init() // ReferenceConfig#createProxy(Map map) // RegistryProtocol#refer(Class type, URL url) // RegistryProtocol#doRefer(Cluster cluster, Registry registry, Class type, URL url) private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) { // 创建 RegistryDirectory 实例 RegistryDirectory directory = new RegistryDirectory(type, url); // 设置注册中心和协议 directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map parameters = new HashMap(directory.getUrl().getParameters()); // 生成服务消费者链接 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); // 注册服务消费者,在 consumers 目录下新节点 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url); registry.register(registeredConsumerUrl); directory.setRegisteredConsumerUrl(registeredConsumerUrl); } // 订阅 providers、configurators、routers 等节点数据 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个 Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
调用在倒数第三行代码。
如果看过我之前写的《服务引用》那篇文章,想必对 doRefer 方法不陌生了。在服务目录订阅完注册中心的数据后,就调用 join 方法生成 Cluster Invoker
啰嗦多一句:
可以这么理解,实际负责远程调用的,是服务目录中的 invoker 集合中的 invoker,而 Cluster Invoker 则对服务目录中的 invoker 集合进行处理。
Cluster Invoker
默认的 Cluster Invoker 是FailoverClusterInvoker
,既然是一个 Invoker,我们就从它的 invoke 方法入手。
AbstractClusterInvoker
invoke 方法在它的父类AbstractClusterInvoker
中
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null; // 绑定 attachments 到 invocation 中. Map contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 列举 Invoker List> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { // 加载 LoadBalance loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // 调用 doInvoke 进行后续操作 return doInvoke(invocation, invokers, loadbalance); }
invoke 方法逻辑也很简单:
- 列举 invoker
- 加载 LoadBalance(自适应拓展类)
- 调用 doInvoke 进行后续操作
其中列举 invoker 如下
protected List> list(Invocation invocation) throws RpcException { List> invokers = directory.list(invocation); return invokers; }
list 方法就是调用服务目录的 list 方法,里面做了两件事(结合前面的组件图):
- list:从服务目录拿到 invoker 集合
- route:通过路由过滤出符合规则的 invoker 集合
FailoverClusterInvoker
doInvoke 方法具体实现在FailoverClusterInvoker
中,此 invoker 的容错方式为失败自动切换。
public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException { List> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); // 获取重试次数 int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // 循环调用,失败重试 RpcException le = null; // last exception. List> invoked = new ArrayList>(copyinvokers.size()); // invoked invokers. Set providers = new HashSet(len); for (int i = 0; i < len; i++) { if (i > 0) { checkWhetherDestroyed(); // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了, // 通过调用 list 可得到最新可用的 Invoker 列表 copyinvokers = list(invocation); // check again // 对 copyinvokers 进行判空检查 checkInvokers(copyinvokers, invocation); } // 通过负载均衡选择 Invoker Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked); // 添加到 invoker 到 invoked 列表中 invoked.add(invoker); // 设置 invoked 到 RPC 上下文中 RpcContext.getContext().setInvokers((List) invoked); try { // 调用目标 Invoker 的 invoke 方法 Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } // 若重试失败,则抛出异常 throw new RpcException(xxx); }
doInvoke 方法代码量不少,但是逻辑简化之后也很简单,就是根据重试次数,在 for 循环中进行远程调用,成功则返回,失败就重试。如果重试次数耗尽还无法调用成功,则抛出异常。
从这里可以知道,Dubbo的默认失败重试次数是3次。
此方法中我们关注下 select 方法,它负责从 invoker 集合中选出一个 infoker
protected Invoker select(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; // 获取调用方法名 String methodName = invocation == null ? "" : invocation.getMethodName(); // 获取 sticky 配置,sticky 表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的 // 调用同一个服务提供者,除非该提供者挂了再进行切换 boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY); { // 检测 invokers 列表是否包含 stickyInvoker,如果不包含, // 说明 stickyInvoker 代表的服务提供者挂了,此时需要将其置空 if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } // 在 sticky 为 true,且 stickyInvoker != null 的情况下。如果 selected 包含 // stickyInvoker,表明 stickyInvoker 对应的服务提供者可能因网络原因未能成功提供服务。 // 但是该提供者并没挂,此时 invokers 列表中仍存在该服务提供者对应的 Invoker。 if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { // availablecheck 表示是否开启了可用性检查,如果开启了,则调用 stickyInvoker 的 // isAvailable 方法进行检查,如果检查通过,则直接返回 stickyInvoker。 if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } } // 如果线程走到当前代码处,说明前面的 stickyInvoker 为空,或者不可用。 // 此时继续调用 doSelect 选择 Invoker Invoker invoker = doSelect(loadbalance, invocation, invokers, selected); // 如果 sticky 为 true,则将负载均衡组件选出的 Invoker 赋值给 stickyInvoker if (sticky) { stickyInvoker = invoker; } return invoker; }
select 方法主要处理对粘滞连接特性的支持。注释写的很清楚了。选择 invoker 的操作在 doSelect 方法
private Invoker doSelect(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; if (invokers.size() == 1) return invokers.get(0); if (loadbalance == null) { // 如果 loadbalance 为空,这里通过 SPI 加载 Loadbalance,默认为 RandomLoadBalance loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } // 通过负载均衡组件选择 Invoker Invoker invoker = loadbalance.select(invokers, getUrl(), invocation); // 如果 selected 包含负载均衡选择出的 Invoker,或者该 Invoker 无法经过可用性检查,此时进行重选 if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { // 进行重选 Invoker rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rinvoker != null) { invoker = rinvoker; } else { // rinvoker 为空,定位 invoker 在 invokers 中的位置 int index = invokers.indexOf(invoker); try { // 获取 index + 1 位置处的 Invoker,以下代码等价于: // invoker = invokers.get((index + 1) % invokers.size()); invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); } } return invoker; }
这里通过负载均衡选出 invoker,如果 invoker 在 selected 中(就是在doInvoke方法中调用失败的invoker)或者不可用,则调用 reselect 方法进行重选。如果重选还是选不出 invoker,则返回 invoker 集合中的下一个元素。这里的繁琐判断,就是为了尽量保证拿到可用的 invoker
我们继续看看 reselect 方法
private Invoker reselect(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected, boolean availablecheck) throws RpcException { List> reselectInvokers = new ArrayList>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size()); // 下面的 if-else 分支逻辑有些冗余,pull request #2826 对这段代码进行了简化,可以参考一下 // 根据 availablecheck 进行不同的处理 if (availablecheck) { // invoker.isAvailable() should be checked for (Invoker invoker : invokers) { if (invoker.isAvailable()) { if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } else { // do not check invoker.isAvailable() for (Invoker invoker : invokers) { if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } { // 若线程走到此处,说明 reselectInvokers 集合为空,此时不会调用负载均衡组件进行筛选。 // 这里从 selected 列表中查找可用的 Invoker,并将其添加到 reselectInvokers 集合中 if (selected != null) { for (Invoker invoker : selected) { if ((invoker.isAvailable()) // available first && !reselectInvokers.contains(invoker)) { reselectInvokers.add(invoker); } } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } return null; }
这个方法可以分成两部分:
- 在非 selected 的 invoker 集合中,调用负载均衡选择一个 invoker
- 在步骤1无法选出 invoker 时,在 selected 中选出 invoker
至此,Dubbo的集群就讲完了。负载均衡有空再说。
再论Cluster
前面我们提到 Cluster 接口在多个地方被调用,也讲了同一个服务有多个服务提供者时的处理。那么,有多个注册中心呢,该如何处理?
// 类ReferenceConfig private T createProxy(Map map) { ...... // 本地引用 if (isJvmRefer) { ...... // 远程引用 } else { ...... // 单个注册中心或服务提供者(服务直连,下同) if (urls.size() == 1) { // 调用 RegistryProtocol 的 refer 构建 Invoker 实例 invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 多个注册中心或多个服务提供者,或者两者混合 } else { List> invokers = new ArrayList>(); URL registryURL = null; // 获取所有的 Invoker for (URL url : urls) { // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时 // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法 invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // use last registry url } } if (registryURL != null) { // registry url is available // use AvailableCluster only when register's cluster is available // 如果注册中心链接不为空,则将使用 AvailableCluster URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME); // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并 invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // not a registry url invoker = cluster.join(new StaticDirectory(invokers)); } } } // 生成代理类 return (T) proxyFactory.getProxy(invoker); }
createProxy 是服务引用时,生成服务代理对象的方法。这里会判断,如果有多个注册中心,会再封装一层集群,也就是先选择注册中心,再选择服务提供者。
这里一般情况 registryURL 不为空,cluster 使用的是AvailableCluster
public class AvailableCluster implements Cluster { public static final String NAME = "available"; @Override public Invoker join(Directory directory) throws RpcException { return new AbstractClusterInvoker(directory) { @Override public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { for (Invoker invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers); } }; } }
AvailableCluster的逻辑很简单,按顺序选择可使用的 invoker (这里的invoker其实就是每个注册中心)
总结
本篇文章介绍了Dubbo集群容错的整体工作过程和调用逻辑。Dubbo提供了多种集群实现,本文只介绍了Failover Cluster,其余实现感兴趣的可以自行查看源码。