• Spring cloud负载均衡 @LoadBalanced注解原理


    接上一篇文章,案例代码也在上一篇文章的基础上。

    在上一篇文章的案例中,我们创建了作为Eureka server的Eureka注册中心服务、作为Eureka client的userservice、orderservice。

    orderservice引入RestTemplate,加入了@LoadBalanced注解,代码如下:

    package com;
    
    @SpringBootApplication
    @EnableEurekaClient
    public class OrderServiceApplication {
        public static void main(String[] args) {
            SpringApplication.run(OrderServiceApplication.class);
        }
    
        @Bean
        @LoadBalanced
        public RestTemplate restTemplate(){
            return new RestTemplate();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    从而,我们实现了基于Eureka注册中心的微服务治理框架,在orderservice调用userservice的时候通过加了@LoadBalanced注解的RestTemplate实现了负载均衡。

    今天的目标是:深入研究@LoadBalanced生效的底层原理。

    @LoadBalanced是怎么实现负载均衡的?

    我们要回答的第一个问题是,为什么@LoadBalanced能实现负载均衡?

    我们从代码的源头一路追查下去…

    orderservice通过RestTemplate实现对userservice的调用代码:

    @Service
    public class OrderService {
        @Autowired
        private RestTemplate restTemplate;
        public String getOrder(){
            //通过userService获取user信息
            String url="http://userservice/user/getUser";
            System.out.println("url"+url);
            User user=restTemplate.getForObject(url,User.class);
            System.out.println(user);
            return user.getName();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    很容易的,我们需要有一个认知:这里访问的地址http://userservice/user/getUser只可能在Spring cloud服务治理环境下有意义,最终能访问到我们发布到本机上的userservice的如下服务:

    1. http://localhost:8080
    2. http://localhost:8081
    3. http://localhost:8082

    必定需要借助Spring Cloud的某一机制将http://userservice转换为上述地址之一。这个转换过程,也就是Spring Cloud的负载均衡机制。

    跟踪getForObject:

    	@Override
    	@Nullable
    	public  T getForObject(String url, Class responseType, Object... uriVariables) throws RestClientException {
    		RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
    		HttpMessageConverterExtractor responseExtractor =
    				new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
    		return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    调用到execute方法:

    	@Override
    	@Nullable
    	public  T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback,
    			@Nullable ResponseExtractor responseExtractor, Object... uriVariables) throws RestClientException {
    
    		URI expanded = getUriTemplateHandler().expand(url, uriVariables);
    		return doExecute(expanded, method, requestCallback, responseExtractor);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    doExecute方法:

    @Nullable
    	protected  T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
    			@Nullable ResponseExtractor responseExtractor) throws RestClientException {
    
    		Assert.notNull(url, "URI is required");
    		Assert.notNull(method, "HttpMethod is required");
    		ClientHttpResponse response = null;
    		try {
    			ClientHttpRequest request = createRequest(url, method);
    			if (requestCallback != null) {
    				requestCallback.doWithRequest(request);
    			}
    			response = request.execute();
    			handleResponse(url, method, response);
    			return (responseExtractor != null ? responseExtractor.extractData(response) : null);
    		}
    		catch (IOException ex) {
    			String resource = url.toString();
    			String query = url.getRawQuery();
    			resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
    			throw new ResourceAccessException("I/O error on " + method.name() +
    					" request for \"" + resource + "\": " + ex.getMessage(), ex);
    		}
    		finally {
    			if (response != null) {
    				response.close();
    			}
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    createRequest方法:

    	protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
    		ClientHttpRequest request = getRequestFactory().createRequest(url, method);
    		initialize(request);
    		if (logger.isDebugEnabled()) {
    			logger.debug("HTTP " + method.name() + " " + url);
    		}
    		return request;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    最关键的部分来了,就是这个 getRequestFactory()方法,在RestTemplate的父类InterceptingHttpAccessor中定义:

        private final List interceptors = new ArrayList<>();
    
    
        public ClientHttpRequestFactory getRequestFactory() {
    		List interceptors = getInterceptors();
    		if (!CollectionUtils.isEmpty(interceptors)) {
    			ClientHttpRequestFactory factory = this.interceptingRequestFactory;
    			if (factory == null) {
    				factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
    				this.interceptingRequestFactory = factory;
    			}
    			return factory;
    		}
    		else {
    			return super.getRequestFactory();
    		}
    	}
    
    	public List getInterceptors() {
    		return this.interceptors;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    首先通过getInterceptor()方法检查是否有有拦截器,拦截器interceptors是由ClientHttpRequestInterceptor组成的一个list。如果有的话,就会创建InterceptingClientHttpRequestFactory、并且将拦截器interceptors送给InterceptingClientHttpRequestFactory工厂之后,返回工厂InterceptingClientHttpRequestFactory。

    然后,方法调用返回到createRequest方法中,调用InterceptingClientHttpRequestFactory的createRequest方法,最终会调用到:

    	@Override
    	protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
    		return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
    	}
    
    • 1
    • 2
    • 3
    • 4

    方法最终返回的是InterceptingClientHttpRequest,并且,会将工厂InterceptingClientHttpRequestFactory持有的interceptors传递给InterceptingClientHttpRequest对象:

    protected InterceptingClientHttpRequest(ClientHttpRequestFactory requestFactory,
    			List interceptors, URI uri, HttpMethod method) {
    
    		this.requestFactory = requestFactory;
    		this.interceptors = interceptors;
    		this.method = method;
    		this.uri = uri;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    之后返回到doExecute方法中,会调用InterceptingClientHttpRequest的父类AbstractClientHttpRequest类的execute方法、之后又转回到InterceptingClientHttpRequest类的executeInternal方法:

    	@Override
    	protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
    		InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
    		return requestExecution.execute(this, bufferedOutput);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    executeInternal方法创建了InterceptingClientHttpRequest内部类InterceptingRequestExecution对象之后,调用内部对象的execute方法。

    查看内部类InterceptingRequestExecution,不难发现,他拿到了宿主类InterceptingClientHttpRequest的拦截器interceptors的迭代器:

    private class InterceptingRequestExecution implements ClientHttpRequestExecution {
    
    		private final Iterator iterator;
    
    		public InterceptingRequestExecution() {
    			this.iterator = interceptors.iterator();
    		}
    
    		@Override
    		public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    			if (this.iterator.hasNext()) {
    				ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
    				return nextInterceptor.intercept(request, body, this);
    			}
    			else {
    				HttpMethod method = request.getMethod();
    				Assert.state(method != null, "No standard HTTP method");
    				ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
    				request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
    				if (body.length > 0) {
    					if (delegate instanceof StreamingHttpOutputMessage) {
    						StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
    						streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
    					}
    					else {
    						StreamUtils.copy(body, delegate.getBody());
    					}
    				}
    				return delegate.execute();
    			}
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    然后在execute方法中首先遍历该迭代器iterator并调用迭代器对象ClientHttpRequestInterceptor的intercept方法。

    拦截器ClientHttpRequestInterceptor就是实现负载均衡的关键所在,Spring正是在拦截器ClientHttpRequestInterceptor的intercept方法中,完成了负载均衡的实现:将请求中的服务名称比如本案例中的userservice、替换成了由Eureka注册中心下发下来的具体的userservice服务器(比如http://127.0.0.1:8081)

    我们发现最关键的部分就是RestTemplate对象中的拦截器interceptors。

    接下来的问题就是:interceptors是什么时候、怎么创建出来的?

    拦截器的初始化

    从RestTemplate的父类InterceptingHttpAccessor简单追踪一下,不难发现他的interceptors是通过setInterceptors方法赋值的,然后借助开发工具的帮助:
    在这里插入图片描述
    发现LoadBalanceAutoConfiguration调用了setInterceptors方法。

    这个LoadBalanceAutoConfiguration的命名方式是不是很熟悉啊?我们前面分析过SpringBoot的自动配置,就是各种xxxxAutoConfiguration命名的类负责具体的自动配置任务的。

    简单了解了下,LoadBalanceAutoConfiguration就是SpringBoot的自动配置类。我们找到LoadBalanceAutoConfiguration类在spring-cloud-commons包下,按图索骥,我们找到了包下的/META-INF/spring.factories文件:
    在这里插入图片描述
    因此我们知道,SpringBoot的自动配置机制会通过调用LoadBalanceAutoConfiguration类来完成LoadBalance的相关初始化工作。
    所以我们接下来的工作就是要研究LoadBalanceAutoConfiguration。

    LoadBalanceAutoConfiguration

    第一步,从LoadBalanceAutoConfiguration类的源码知道他要通过restTemplateCustomizer方法加载一个RestTemplateCustomizer对象,方法需要一个参数LoadBalancerInterceptor:

    		@Bean
    		@ConditionalOnMissingBean
    		public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
    			return restTemplate -> {
    				List list = new ArrayList<>(restTemplate.getInterceptors());
    				list.add(loadBalancerInterceptor);
    				restTemplate.setInterceptors(list);
    			};
    		}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    先看一眼RestTemplateCustomizer类,只有一个customize方法,该方法有一个参数RestTemplate:

    public interface RestTemplateCustomizer {
    
    	void customize(RestTemplate restTemplate);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这个customize方法已经在restTemplateCustomizer方法中通过lamda表达式定义出来了,代码逻辑很简单:

    就是要将LoadBalancerInterceptor拦截器对象通过调用RestTemplate的setInterceptors方法加入到RestTemplate的interceptors中!

    似乎快要摸到开关了!!!

    那么我们现在又冒出了以下几个问题:

    1. 这个LoadBalancerInterceptor从哪里来?
    2. RestTemplate从哪里来?
    3. 什么时候调用这个已经注入到Spring Ioc容器中的RestTemplateCustomizer的customize方法?

    第1、第2两个问题其实很简单。看代码:

        @Configuration(proxyBeanMethods = false)
    	@Conditional(RetryMissingOrDisabledCondition.class)
    	static class LoadBalancerInterceptorConfig {
    
    		@Bean
    		public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
    				LoadBalancerRequestFactory requestFactory) {
    			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
    		}
    		...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    LoadBalancerAutoConfiguration类中有一个静态配置类LoadBalancerInterceptorConfig,通过loadBalancerInterceptor方法注入了LoadBalancerInterceptor 对象,创建对象的时候通过参数注入了loadBalancerClient和LoadBalancerRequestFactory。

    第2个问题,RestTemplate的注入,其实是我们从应用层通过@Bean注入的,同时加了@LoadBalanced注解。

    现在我们来回答第3个问题:

    什么时候调用这个已经注入到Spring Ioc容器中的RestTemplateCustomizer的customize方法?

    比前两个问题稍稍复杂了一点:

    	@LoadBalanced
    	@Autowired(required = false)
    	private List restTemplates = Collections.emptyList();
    	
    	@Bean
    	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
    			final ObjectProvider> restTemplateCustomizers) {
    		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
    			for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
    				for (RestTemplateCustomizer customizer : customizers) {
    					customizer.customize(restTemplate);
    				}
    			}
    		});
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    首先看到标注了@Autowire和@LoadBalanced注解的RestTemplates列表的注入,意思是: 如果有加了@LoadBalanced注解的RestTemplates bean的话,就自动装配到restTemplates 变量中。

    之后,通过loadBalancedRestTemplateInitializerDeprecated方法注入了一个SmartInitializingSingleton,我们知道SmartInitializingSingleton在装配到Spring Ioc之后会调用他的afterSingletonsInstantiated()方法。这里注入的SmartInitializingSingleton通过lamda实现了afterSingletonsInstantiated()方法,代码逻辑:通过方法参数打包注入到Ioc容器中的RestTemplateCustomizer(前面讲过了,他已经注入到IoC容器中了)到restTemplateCustomizers中,然后遍历restTemplates(这个list在前面说过了,已经把我们通过@Bean和@LoadBalanced注解的RestTemplate对象注入进来了)、针对每一个RestTemplate再遍历restTemplateCustomizers中的RestTemplateCustomizer对象,逐个调用他的customize方法。

    OK!对于@LoadBalanced注解从应用、到初始化、生效机制,我们就分析清楚了。

    最后还遗留两个小问题,初始化和应用两端各一个:初始化过程中的装配到loadBalancerInterceptor对象中的LoadBalancerClient具体是什么对象、什么时候注入的?应用端最终的负载均衡策略、负载均衡实现逻辑,我们还没具体分析。

    下一篇文章解决上述两个问题。

  • 相关阅读:
    [2]线性规划模型
    开源ffmpeg(三)——音频拉流、解码以及重采样
    Android源码设计模式探索与实战【代理模式】
    windows环境下node安装教程(超详细)
    【vector题解】杨辉三角 | 删除有序数组中的重复项 | 只出现一次的数字Ⅱ
    工作几年,如何快速晋升至架构师?
    独立IP主机怎么样?对网站有什么影响
    搞脑筋的日历积木
    ARM---day02
    面试官:React怎么做性能优化
  • 原文地址:https://blog.csdn.net/weixin_44612246/article/details/134190969