目录
1.1 ReactiveLoadBalancerClientFilter源码分析
1.2 LoadBalancerClientFactory源码分析
2.1 扩展原生RoundRobinLoadBalancer轮询策略
2.1.1 自定义实现RoundRobinLoadBalancer
2.1.2 配置自定义的RoundRobinLoadBalancer
2.2 扩展原生RandomLoadBalancer随机策略
2.2.2 配置自定义的RandomLoadBalancer
2.3 动态绑定client和loadBalancer并注册到LoadBalancerClientFactory
工作和兴趣的使然,由于需要对各种开源的项目做一些自定义的插件以及扩展,所以会经常研究一些开源组件的源码。正好前段阵子公司内部计划进行产品依赖版本升级,springcloud升级到2021.0.6,spring boot升级到2.7.11了,自然spring cloud gateway就随之升级到了3.1.6,带来的问题就是gateway内部组件的大调整和更新,比如scg在Hoxton.M2 RELEASED版本之前,内部的负载均衡组件一直用的都是ribbon,现在新版本的cloud包括boot内部的负载均衡组件统一用了spring自己的loadbalancer,那么我们开发的针对于老版本的一些插件和扩展就不能用了,需要对新的组件进行适配了,本人正好负责这次的版本升级工作,所以想空闲时间就把这些升级的东西和过程写下来。都是本人手搓的一手代码,创作不易,望诸君高台贵手,点赞支持。
直接进入主题,本文主要核心有两点:
查看scg的源码我们不难发现,scg内部负载均衡的核心逻辑都是由一个GlobalFilter来实现的,这个全局filter叫:ReactiveLoadBalancerClientFilter,查看这个filter的代码我们可以看到它有个核心方法choose(),这个方法的作用就是根据当前所需要路由的serviceId来选择对应的loadBalancer,然后通过loadBalancer来选择出最终要路由到的serviceInstance,那么通过这个方法我们能抓到两个核心要素:
1.拿到路由目标service对应的loadBalancer;
2.通过loadBalancer来choose出最终要路由的实例serviceInstance。
经过上面分析,我们就能明白了,实现自定义loadBalancer的核心逻辑就是:
1.自定义自己的ReactorLoadBalancer;
2.在自定义的ReactorLoadBalancer里面实现自己的choose逻辑;
3.动态将serviceId和自己的loadBalancer绑定并注册到LoadBalancerClientFactory(由代码this.clientFactory.getInstance()可知)中去。
spring cloud loadBalancer是怎么实现每个client绑定自己的配置的呢,官方提供了两个注解:@LoadBalancerClient和@LoadBalancerClients,首先我们来看@LoadBalancerClient的源码,可以看到@LoadBalancerClient提供了三个参数:name、value、configuration[],看注解就知道name、value就是配置客户端的名称(客户端名称我们就可以理解为一个应用的applicationName,因为在实际负载中都是用服务名作为clientId、serviceId),configuration就是客户端对应的自定义的配置,包括负载策略的配置、健康检查策略的配置等等,这里我们只需要关注负载策略的自定义就好,其他的用默认就行,如果有需求也可以都自定义。
接下来我们看@LoadBalancerClients源码
可以发现有个@Import注解,我们打开Import里面的LoadBalancerClientConfigurationRegister类,发现它实现了ImportBeanDefinitionRegister接口并重写了registerBeanDefinitions方法,我们重点看下这个方法,发现它里面干了3件事:
1.获取所有的@LoadBalancerClients注解的元数据,拿到代码里LoadBalancerClients注解的配置,然后根据配置进行loadBalancerClientConfiguration的绑定;
2.如果第一步里面@LoadBalancerClients里面如果配置的是defaultConfiguration,那么就用默认的配置进行绑定;
3.获取所有的@LoadBalancerClient注解的元数据,然后同样的拿到代码LoadBalancerClient注解的配置,然后根据配置进行loadBalancerClientConfiguration的绑定
那我们看下具体是怎么绑定的,看源码:
本质就是构建一个LoadBalancerClientSpecification bean,这个spec就是每个客户端自定义负载策略的核心bean,我们看下它的源码,不难发现,它有两个属性:name、configuration[],是不是发现似曾相识,就是@LoadBalancerClient注解的两个属性,所以最终就是为了装配这个bean,构建完ben之后,那俩注解的作用就完成了,接下来我们在继续看下spring是如何实现前面我们所讲的:在路由时,是如何通过serviceId获取到这个客户端配置的。
接下来我们继续,所有的配置动作已经解析完成了,那么就是将配置交给spring容器了,spring是如何加载这个LoadBalancerClientSpecification的呢,我们跟随配置:spring.cloud.loadbalancer.ebabled会发现有个自动装配类:LoadBalancerAutoConfiguration,查看这个配置类源码我们可以发现,有参构造的参数就是LoadBalancerClientSpecification,然后再进行初始化bean:LoadBalancerClientFactory这个loadBalancer的核心factory。
查看LoadBalancerClientFactory的源码我们可以看到它继承了:NamedContextFactory,那么这个作用是什么呢:子容器之间的数据隔离。NamedContextFactory的作用是创建一个子容器(子上下文context),然后每个子容器通过LoadBalancerClientSpecification来定义客户端容器name以及数据配置。我们回到开头所讲的ReactiveLoadBalancerClientFilter这个filter,在请求进来的时候通过LoadBalancerClientFactory拿serviceId去获取这个客户端对应的loadBalancer,跟进源码,我们发现它最终调用的是NamedContextFactory的getInstance()方法,然后调用getContext(),主要逻辑就是通过serviceId去获取子容器,如果没有那么就创建一个新的子容器(子上下文),查看源码我们不难发现,新的子容器name就是用的serviceId,然后再拿到对应的LoadBalancerClientSpecification来注册到子容器中去;所以到这里,再回到上面,我们看LoadBalancerAutoConfiguration中的LoadBalanceClientrFactory的初始化就干了一件事:将所有的客户端的配置LoadBalancerClientSpecification注册到NamedContextFactory中去;然后随着请求过来时,拿到已经注册好的LoadBalancerClientSpecification对当前请求的客户端进行子上下文的初始化。
createContext()源码截图:
到此,经过上述简单的源码分析,那么原理和实现方案我们就已经大致明白了,接下来就直接上代码来验证。
想要实现自定义负载策略,首先需要实现官方接口:ReactorServiceInstanceLoadBalancer ,查看代码我们会发现这个接口是spring loadBalancer官方提供的接口,所有的策略都需要实现它,所以我们自定义的策略也不例外,具体代码如下:
- package com.primeton.gateway.core.lb;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.beans.factory.ObjectProvider;
- import org.springframework.cloud.client.ServiceInstance;
- import org.springframework.cloud.client.loadbalancer.DefaultResponse;
- import org.springframework.cloud.client.loadbalancer.EmptyResponse;
- import org.springframework.cloud.client.loadbalancer.Request;
- import org.springframework.cloud.client.loadbalancer.Response;
- import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
- import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
- import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
- import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
- import reactor.core.publisher.Mono;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.Random;
- import java.util.concurrent.atomic.AtomicInteger;
-
- /**
- * @Description 轮询
- * @Author wx
- * @Date 2023/5/26
- */
- public class GatewayRoundLoadBalancer implements ReactorServiceInstanceLoadBalancer {
- private static final Log log = LogFactory.getLog(GatewayRoundLoadBalancer.class);
-
- final AtomicInteger position;
-
- private final String serviceId;
-
- private ObjectProvider
serviceInstanceListSupplierProvider; -
- public GatewayRoundLoadBalancer(ObjectProvider
serviceInstanceListSupplierProvider, - String serviceId) {
- this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));
- }
-
- public GatewayRoundLoadBalancer(ObjectProvider
serviceInstanceListSupplierProvider, - String serviceId,
- int seedPosition) {
- this.serviceId = serviceId;
- this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
- this.position = new AtomicInteger(seedPosition);
- }
-
- @Override
- public Mono
> choose(Request request) { - ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
- .getIfAvailable(NoopServiceInstanceListSupplier::new);
- return supplier.get(request).next()
- .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, request));
- }
-
- private Response
processInstanceResponse(ServiceInstanceListSupplier supplier, - List
serviceInstances, - Request request) {
- Response
serviceInstanceResponse = getInstanceResponse(serviceInstances, request); - if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
- ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
- }
- return serviceInstanceResponse;
- }
-
- private Response
getInstanceResponse(List instances, - Request request) {
- if (instances.isEmpty()) {
- if (log.isWarnEnabled()) {
- log.warn("No servers available for service: " + serviceId);
- }
- return new EmptyResponse();
- }
-
- // Do not move position when there is only 1 instance, especially some suppliers
- // have already filtered instances
- if (instances.size() == 1) {
- return new DefaultResponse(instances.get(0));
- }
-
- List
useInstances = customChoose(instances, request); -
- int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
-
- ServiceInstance instance = useInstances.get(pos % useInstances.size());
-
- return new DefaultResponse(instance);
- }
-
- /**
- * 自定义instances choose出满足业务请求的实例,然后按照轮询策略来从
- * 剩下的满足业务需求的实例列表选出最终的实例
- */
- private List
customChoose(List instances, Request request) { - //todo 比如根据request中的参数来筛选、 或者筛选出实例元数据中含有某些符合参数的实例 等等
- List
use = new ArrayList<>(); - for (ServiceInstance instance : instances) {
- Map
metadata = instance.getMetadata(); - if (metadata.containsKey("xxx")) use.add(instance);
- }
- return use;
- }
-
- }
- package com.primeton.gateway.core.lb;
-
- import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
- import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
- import org.springframework.context.annotation.Bean;
- import org.springframework.core.env.Environment;
-
- /**
- * @Description TODO
- * @Author wx
- * @Date 2024/6/13
- */
- public class GatewayRoundLoadBalancerConfiguration {
-
- @Bean
- public GatewayRoundLoadBalancer gatewayRoundLoadBalancer(Environment environment,
- LoadBalancerClientFactory loadBalancerClientFactory) {
- String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
- return new GatewayRoundLoadBalancer(
- loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
- }
-
- }
- package com.primeton.gateway.core.lb;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.beans.factory.ObjectProvider;
- import org.springframework.cloud.client.ServiceInstance;
- import org.springframework.cloud.client.loadbalancer.DefaultResponse;
- import org.springframework.cloud.client.loadbalancer.EmptyResponse;
- import org.springframework.cloud.client.loadbalancer.Request;
- import org.springframework.cloud.client.loadbalancer.Response;
- import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
- import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
- import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
- import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
- import reactor.core.publisher.Mono;
-
- import java.util.List;
- import java.util.concurrent.ThreadLocalRandom;
-
- /**
- * @Description 随机
- * @Author wx
- * @Date 2023/5/26
- */
- public class GatewayRandomLoadBalancer implements ReactorServiceInstanceLoadBalancer {
- private static final Log log = LogFactory.getLog(GatewayRandomLoadBalancer.class);
-
- private final String serviceId;
-
- private ObjectProvider
serviceInstanceListSupplierProvider; -
- public GatewayRandomLoadBalancer(ObjectProvider
serviceInstanceListSupplierProvider, - String serviceId) {
- this.serviceId = serviceId;
- this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
- }
-
- @Override
- public Mono
> choose(Request request) { - ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
- .getIfAvailable(NoopServiceInstanceListSupplier::new);
- return supplier.get(request).next()
- .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, request));
- }
-
- private Response
processInstanceResponse(ServiceInstanceListSupplier supplier, - List
serviceInstances, - Request request) {
- Response
serviceInstanceResponse = getInstanceResponse(serviceInstances, request); - if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
- ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
- }
- return serviceInstanceResponse;
- }
-
- private Response
getInstanceResponse(List instances, - Request request) {
- if (instances.isEmpty()) {
- if (log.isWarnEnabled()) {
- log.warn("No servers available for service: " + serviceId);
- }
- return new EmptyResponse();
- }
-
- List
useInstances = customChoose(instances, request); -
- int index = ThreadLocalRandom.current().nextInt(useInstances.size());
-
- ServiceInstance instance = useInstances.get(index);
-
- return new DefaultResponse(instance);
- }
-
- //todo 节合实际业务来筛选
- private List
customChoose(List instances, Request request) { - return instances;
- }
-
- }
- package com.primeton.gateway.core.lb;
-
- import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
- import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
- import org.springframework.context.annotation.Bean;
- import org.springframework.core.env.Environment;
-
- /**
- * @Description TODO
- * @Author wx
- * @Date 2024/6/13
- */
- public class GatewayRandomLoadBalancerConfiguration {
-
- @Bean
- public GatewayRandomLoadBalancer gatewayRandomLoadBalancer(Environment environment,
- LoadBalancerClientFactory loadBalancerClientFactory) {
- String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
- return new GatewayRandomLoadBalancer(
- loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
- }
-
- }
看完前面的原理分析,那我们就明白要实现client动态绑定loadBalancer并注册到LoadBalancerClientFactory去,要做的就是两件事:1.根据业务配置对每个客户端进行LoadBalancerClientSpecification组装;2.组装好LoadBalancerClientSpecification注册到LoadBalancerClientFactory也就是NamedContextFactory上下文里面去。具体实现如下:
- package com.primeton.gateway.core.lb;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientSpecification;
- import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.core.env.ConfigurableEnvironment;
- import org.springframework.core.env.EnumerablePropertySource;
- import org.springframework.core.env.PropertySource;
-
- import javax.annotation.PostConstruct;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
-
- /**
- * @Description TODO
- * @Author wx
- * @Date 2023/5/26
- */
- @Configuration
- public class GatewayLoadBalancerConfiguration {
-
- private static final String SUFFIX = ".loadbalancer.LoadBalancer-configuration-class-name";
-
- @Autowired
- private ConfigurableEnvironment env;
-
- @Autowired
- private LoadBalancerClientFactory loadBalancerClientFactory;
-
- @PostConstruct
- public void postConstruct() {
- //第一步:解析业务配置,从而解析出来每个客户端对应的负载策略配置
- HashMap
configs = new HashMap<>(); - for (PropertySource> propertySource : env.getPropertySources()) {
- if (propertySource instanceof EnumerablePropertySource) {
- for (String name : ((EnumerablePropertySource) propertySource).getPropertyNames()) {
- if (name != null && name.endsWith(SUFFIX)) {
- configs.put(name, env.getProperty(name));
- }
- }
- }
- }
-
- //第二步:组装Specification并绑定到spring上下文中去
- List
configurations = new ArrayList<>(); -
- for (String clientId : configs.keySet()) {
- String id = clientId.substring(0, clientId.length() - SUFFIX.length());
- try {
- Class>[] classes = {Class.forName(configs.get(clientId))};
-
- LoadBalancerClientSpecification specification = new LoadBalancerClientSpecification();
- specification.setName(id);
- specification.setConfiguration(classes);
- configurations.add(specification);
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
-
- }
- loadBalancerClientFactory.setConfigurations(configurations);
- }
-
-
- }
上面代码我做一下简单的思路描述:我们可以把上面代码分为两部分:
1.第一部分我是自定义了一个配置规则:serviceId..loadbalancer.LoadBalancer-configuration-class-name=xxxxx (全路径);比如:DEMO01..loadbalancer.LoadBalancer-configuration-class-name=com.primeton.gateway.core.lb.GatewayRoundLoadBalancerConfiguration(这个配置类看2.1.2章节),这就表示客户端应用DEMO01的负载均衡策略就是我在2.1.2自定义的轮询策略,所有需要路由到DEMO01的请求都要走我们2.1.1里面的逻辑进行choose()筛选出最终要路由的实例,这个配置可以放在配置文件,也可以放在其他地方;
2.第二部分就是针对所有的客户端负载策略配置进行组装spring loadBalancer需要的LoadBalancerClientSpecification,然后最终模拟源码里面的绑定动作将我们自己组装好的数据设置到NamedContextFactory上下文中去。
到此我们自定义客户端负载均衡策略方案就实现了,但是还差最后一步:怎么动态更新呢,比如现在DEMO01我们配置的是我们写的GatewayRoundLoadBalancerConfiguration,我们想要将它换成GatewayRandomLoadBalancerConfiguration 随机策略,由于时间问题,我这里就给大家出个方案,具体实现我就不写了,有时间再给大家写:
方案1:结合配置中心:nacos、Apollo等做配置热更新,监听nacos、Apollo配置,当配置有变化时,nacos、Apollo服务端都会发送通知,你只需要在代码里创建一个listener,然后针对我们上面定义的key,然后把上面代码的步骤再走一遍就行了。
方案2:将配置更新做成接口化,gateway写个controller专门用来管理配置,然后配置有变化通过调用gateway对应接口来通知gateway进行更新,最后再走一遍上面代码即可
方案3:结合redis,配置数据存在redis里面,利用redis的键空间监听通知监听这个配置,当配置有改动的时候redis会发出通知,我们在gateway里面监听好,然后进行上面的代码即可
当然还有很多其他方案,核心实现已经分享给大家了,剩下的就看诸位结合自身业务采取什么样的配置方案了。