• SpringCloud系列-SpringCloudLoadBalancer是如何通过@BalanceLoad使用nacos中的微服务


    前言

    SpringCloud作为Spring家族的衍生平,扮演着微服务框架的重要角色。本篇主要涉及到SpringCloud的源代码中@LoadBalance注解是如何生效的。使用的开发环境中,注册中心使用的是nacos,负载均衡使用的是spring-cloud-loadbalancer,而不是ribbon,所以会涉及到一些关于nacos是怎么样为LoadBalancer提供服务支持的,但是主要还是分析loadbalance注解的工作流程。

    SpringCloud的依赖包

        <dependencies>
            <dependency>
                <groupId>com.alibaba.cloudgroupId>
                <artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId>
                <version>2021.0.1.0version>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
                <version>2.6.3version>
            dependency>
            <dependency>
                <groupId>org.springframework.cloudgroupId>
                <artifactId>spring-cloud-starter-loadbalancerartifactId>
                <version>3.1.1version>
            dependency>
        dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    依赖spring-cloud-starter-loadbalancer时,会引入spring-cloud-common这个jar包。
    在这里插入图片描述

    流程图

    在这里插入图片描述

    LoadBalance的前期准备

    LoadBalancerAutoConfiguration类

    spring-cloud-common这个jar包中有一个LoadBalancerAutoConfiguration类,这个类因为添加了@Configuration的注解,所以会被当做configuration的Bean对象进行初始化。但是细心的读者可以看到,这个Configuration还有一个@ConditionalOnBean(LoadBalancerClient.class)注解,这个注解会去找java程序是否有LoadBalancerClient接口的实现类。所以当我们在pom.xml文件引入spring-cloud-starter-loadbalancer类库或者ribbon的类库时,两者都是实现了LoadBalancerClient接口的,所以这个LoadBalancerAutoConfiguration类才会生效。

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(RestTemplate.class)
    @ConditionalOnBean(LoadBalancerClient.class)
    @EnableConfigurationProperties(LoadBalancerClientsProperties.class)
    public class LoadBalancerAutoConfiguration {
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    LoadBalancerClient接口和BlockingLoadBalancerClient类

    接下来就到了BlockingLoadBalancerClient类实现LoadBalancerClient接口的研究了。LoadBalancerClient接口定义了三个方法,父接口ServiceInstanceChooser定义了两个方法,选择注册中心的服务,使用该服务处理request,返回response

    public interface ServiceInstanceChooser {
    	ServiceInstance choose(String serviceId);
    	<T> ServiceInstance choose(String serviceId, Request<T> request);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    public interface LoadBalancerClient extends ServiceInstanceChooser {
    	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
    	<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
    	URI reconstructURI(ServiceInstance instance, URI original);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    下面是BlockingLoadBalancerClient 类的代码实现,省略了部分代码。choose方法是通过serviceId,拿到对应的微服务实例ServiceInstance,具体怎么拿到这个实例的,后面在讲。然后,execute方法使用服务实例request,输出response

    public class BlockingLoadBalancerClient implements LoadBalancerClient {
    
    	private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;
    
    	public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) 						{
    		this.loadBalancerClientFactory = loadBalancerClientFactory;
    	}
    
    	@Override
    	public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
    		...
    	    // 通过serviceId,获取注册中心里的ServcieInstance
    		ServiceInstance serviceInstance = choose(serviceId, lbRequest);
    		...
    		return execute(serviceId, serviceInstance, lbRequest);
    	}
    
    	@Override
    	public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
    			throws IOException {
    			...
    	    	// 通过serviceInstance,处理request,拿到response
    			T response = request.apply(serviceInstance);
    			...
    	}
    
    	@Override
    	public <T> ServiceInstance choose(String serviceId, Request<T> request) {
    	    // 通过serviceId,获取注册中心里的ServcieInstance
    		ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
    		...
    		Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
    		...
    		return loadBalancerResponse.getServer();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    LoadBalance获取ServiceInstance

    上面说到了Choose方法里,通过serviceId可以拿到注册中心里的服务实例。Choose方法会调用
    ReactiveLoadBalancer.Factory接口的getInstance(serviceId)方法,来获取注册中心的服务实例。
    LoadBalancerClientFactory类实现了接口ReactiveLoadBalancer.Factory,它的getInstance方法是调用父类NamedContextFactory的getInstance方法,获取ReactorServiceInstanceLoadBalancer接口的实现Bean

    public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification>
    		implements ReactiveLoadBalancer.Factory<ServiceInstance> {
    	@Override
    	public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {
    		return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可以看到RoundRobinLoadBalancer类 实现了ReactorServiceInstanceLoadBalancer 接口

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnDiscoveryEnabled
    public class LoadBalancerClientConfiguration {
    
    	private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465;
    
    	@Bean
    	@ConditionalOnMissingBean
    	public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
    			LoadBalancerClientFactory loadBalancerClientFactory) {
    		String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
    		return new RoundRobinLoadBalancer(
    				loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    RoundRobinLoadBalancer

    RoundRobinLoadBalancer.choose方法

    上面的BlockingLoadBalancerClient.choose方法,会调用RoundRobinLoadBalancer类的choose方法

    public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    
    	@SuppressWarnings("rawtypes")
    	@Override
    	public Mono<Response<ServiceInstance>> choose(Request request) {
    		ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
    				.getIfAvailable(NoopServiceInstanceListSupplier::new);
    		return supplier.get(request).next()
    				.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
    	}
    
    	private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
    			List<ServiceInstance> serviceInstances) {
    		Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
    		if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
    			((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
    		}
    		return serviceInstanceResponse;
    	}
    
    	private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
    		if (instances.isEmpty()) {
    			if (log.isWarnEnabled()) {
    				log.warn("No servers available for service: " + serviceId);
    			}
    			return new EmptyResponse();
    		}
    		// TODO: enforce order?
    		int pos = Math.abs(this.position.incrementAndGet());
    
    		ServiceInstance instance = instances.get(pos % instances.size());
    
    		return new DefaultResponse(instance);
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    ServiceInstanceListSupplier 和 ServiceInstanceListSupplierBuilder

    上面的RoundRobinLoadBalancer类的choose方法会先拿到ServiceInstanceListSupplier的实例,再用它调用processInstanceResponse方法。ServiceInstanceListSupplier的bean对象实例化有些复杂,注册的地方如下。

    @Bean
    @ConditionalOnBean(DiscoveryClient.class)
    @ConditionalOnMissingBean
    @Conditional(DefaultConfigurationCondition.class)
    public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
    		ConfigurableApplicationContext context) {
    	return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().build(context);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ServiceInstanceListSupplier先使用了builder模式,创建出ServiceInstanceListSupplierBuilder实例,然后定义了两个ServiceInstanceListSupplier,一个是用于缓存的CachingServiceInstanceListSupplier和一个是用于注册中心的DiscoveryClientServiceInstanceListSupplier。由于它们是lambda表达式,当build的时候,它们会使用Spring上下文ConfigurableApplicationContext,获取Spring上下文中的Bean实例。注意这里的CachingServiceInstanceListSupplier有一个next是指向DiscoveryClientServiceInstanceListSupplier的,当Caching拿不到结果,再从DiscoveryClient拿数据。

    public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {
    
    	public ServiceInstanceListSupplierBuilder withBlockingDiscoveryClient() {
    		if (baseCreator != null && LOG.isWarnEnabled()) {
    			LOG.warn("Overriding a previously set baseCreator with a blocking DiscoveryClient baseCreator.");
    		}
    		this.baseCreator = context -> {
    			DiscoveryClient discoveryClient = context.getBean(DiscoveryClient.class);
    
    			return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
    		};
    		return this;
    	}
    	
    	public ServiceInstanceListSupplierBuilder withCaching() {
    		if (cachingCreator != null && LOG.isWarnEnabled()) {
    			LOG.warn(
    					"Overriding a previously set cachingCreator with a CachingServiceInstanceListSupplier-based cachingCreator.");
    		}
    		this.cachingCreator = (context, delegate) -> {
    			ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
    					.getBeanProvider(LoadBalancerCacheManager.class);
    			if (cacheManagerProvider.getIfAvailable() != null) {
    				return new CachingServiceInstanceListSupplier(delegate, cacheManagerProvider.getIfAvailable());
    			}
    			if (LOG.isWarnEnabled()) {
    				LOG.warn("LoadBalancerCacheManager not available, returning delegate without caching.");
    			}
    			return delegate;
    		};
    		return this;
    	}
    	
    	public ServiceInstanceListSupplier build(ConfigurableApplicationContext context) {
    		Assert.notNull(baseCreator, "A baseCreator must not be null");
    
    		//初始化DiscoveryClientServiceInstanceListSupplier
    		ServiceInstanceListSupplier supplier = baseCreator.apply(context);
    
    		for (DelegateCreator creator : creators) {
    			supplier = creator.apply(context, supplier);
    		}
    
    		//初始化CachingServiceInstanceListSupplier,当Caching拿不到结果,再从DiscoveryClient拿数据
    		if (this.cachingCreator != null) {
    			supplier = this.cachingCreator.apply(context, supplier);
    		}
    		return supplier;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    CompositeDiscoveryClient和NacosDiscoveryClient

    CompositeDiscoveryClient的Bean声明,声明的地方会拿到所有的DiscoveryClient的Bean实例集合

    @Configuration(proxyBeanMethods = false)
    @AutoConfigureBefore(SimpleDiscoveryClientAutoConfiguration.class)
    public class CompositeDiscoveryClientAutoConfiguration {
    
    	@Bean
    	@Primary
    	public CompositeDiscoveryClient compositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {
    		return new CompositeDiscoveryClient(discoveryClients);
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    CompositeDiscoveryClient调用getInstances方法,会去调用List集合中每个实例的getInstances方法,如果拿到结果就返回。

    public class CompositeDiscoveryClient implements DiscoveryClient {
    
    	private final List<DiscoveryClient> discoveryClients;
    
    	public CompositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {
    		AnnotationAwareOrderComparator.sort(discoveryClients);
    		this.discoveryClients = discoveryClients;
    	}
    
    	@Override
    	public List<ServiceInstance> getInstances(String serviceId) {
    		if (this.discoveryClients != null) {
    			for (DiscoveryClient discoveryClient : this.discoveryClients) {
    				List<ServiceInstance> instances = discoveryClient.getInstances(serviceId);
    				if (instances != null && !instances.isEmpty()) {
    					return instances;
    				}
    			}
    		}
    		return Collections.emptyList();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    NacosDiscoveryClient 实现了DiscoveryClient 接口,所以会出现在上面的DiscoveryClient集合,就会触发getInstances方法。之后就是nacos客户端根据yaml文件中的配置去请求nacos服务端的api,拿到ServiceId对应的ServiceInstance集合了。因为篇幅原因就不讨论nacos的源代码了。

    
    public class NacosDiscoveryClient implements DiscoveryClient {
    
    	@Override
    	public List<ServiceInstance> getInstances(String serviceId) {
    		try {
    			return Optional.of(serviceDiscovery.getInstances(serviceId)).map(instances -> {
    						ServiceCache.setInstances(serviceId, instances);
    						return instances;
    					}).get();
    		}
    		catch (Exception e) {
    			if (failureToleranceEnabled) {
    				return ServiceCache.getInstances(serviceId);
    			}
    			throw new RuntimeException(
    					"Can not get hosts from nacos server. serviceId: " + serviceId, e);
    		}
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    RoundRobinLoadBalancer.processInstanceResponse方法

    上面RoundRobinLoadBalancer.choose方法拿到ServiceInstance集合后,会调用processInstanceResponse方法,position记录了调用次数,通过求模,达到轮询服务列表的目的。

    private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
    			List<ServiceInstance> serviceInstances) {
    		Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
    		if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
    			((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
    		}
    		return serviceInstanceResponse;
    	}
    
    	private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
    		if (instances.isEmpty()) {
    			if (log.isWarnEnabled()) {
    				log.warn("No servers available for service: " + serviceId);
    			}
    			return new EmptyResponse();
    		}
    		// TODO: enforce order?
    		int pos = Math.abs(this.position.incrementAndGet());
    
    		ServiceInstance instance = instances.get(pos % instances.size());
    
    		return new DefaultResponse(instance);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    RestTemplate

    我们平时开发微服务时,注册RestTemplate会加上@LoadBalanced,那是因为自动装配是,RestTemplate的集合上也加了@LoadBalanced,而且@LoadBalanced是集成@Qualifier的,因此Spring在getBean的时候,会找加@LoadBalanced的RestTemplate的Bean。

    public class LoadBalancerAutoConfiguration {
    	@LoadBalanced
    	@Autowired(required = false)
    	private List<RestTemplate> restTemplates = Collections.emptyList();
    
    	@Bean
    	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
    			final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
    		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
    			for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
    				for (RestTemplateCustomizer customizer : customizers) {
    					customizer.customize(restTemplate);
    				}
    			}
    		});
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    RestTemplate 在调用execute去访问注册中的微服务时,会使用父类InterceptingHttpAccessor的getRequestFactory方法,拿到一个有interceptors拦截器集合的InterceptingClientHttpRequestFactory

    public abstract class InterceptingHttpAccessor extends HttpAccessor {
    	@Override
    	public ClientHttpRequestFactory getRequestFactory() {
    		List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
    		if (!CollectionUtils.isEmpty(interceptors)) {
    			ClientHttpRequestFactory factory = this.interceptingRequestFactory;
    			if (factory == null) {
    				factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
    				this.interceptingRequestFactory = factory;
    			}
    			return factory;
    		}
    		else {
    			return super.getRequestFactory();
    		}
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    拦截器LoadBalancerInterceptor

    下面的源代码来自LoadBalancerAutoConfiguration.java文件,可以看到loadBalancedRestTemplateInitializerDeprecated方法在Bean实例化时,去获取restTemplateCustomizers集合,遍历集合customizers,嵌套遍历restTemplates,调用RestTemplateCustomizer这个lamda的customize方法。于是LoadBalancerInterceptor实例就被加入到restTemplate的Interceptors里面了。这里的Interceptor在实例化时,是初始化了LoadBalancerRequestFactory 和BlockingLoadBalancerClient的

    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();
    
    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
    		final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
    	return () -> restTemplateCustomizers.ifAvailable(customizers -> {
    		for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
    			for (RestTemplateCustomizer customizer : customizers) {
    				customizer.customize(restTemplate);
    			}
    		}
    	});
    }
    
    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
    	return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
    }
    
    @Bean
    public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
    		LoadBalancerRequestFactory requestFactory) {
    	return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
    }
    
    @Bean
    @ConditionalOnMissingBean
    public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
    	return restTemplate -> {
    		List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
    		list.add(loadBalancerInterceptor);
    		restTemplate.setInterceptors(list);
    	};
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    RestTemplate发起http请求

    当调用RestTemplate的http请求方法是,都会进入doExecute方法。于是就会通过createRequest方法创建一个ClientHttpRequest

    public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
    	@Nullable
    	protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
    			@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
    			...
    			ClientHttpRequest request = createRequest(url, method);
    			...
    			response = request.execute();
    			...
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    InterceptingClientHttpRequestFactory是上面的bean的一系列实例化生成的,它的createRequest方法实例化了一个InterceptingClientHttpRequest

    public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {
    	@Override
    	protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
    		return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    InterceptingClientHttpRequest 作为ClientHttpRequest执行executeInternal方法时,会遍历interceptors,调用intercept方法。于是就会进入LoadBalancerInterceptor的intercept方法

    class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
    
    	private final List<ClientHttpRequestInterceptor> interceptors;
    
    	@Override
    	protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
    		InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
    		return requestExecution.execute(this, bufferedOutput);
    	}
    
    
    	private class InterceptingRequestExecution implements ClientHttpRequestExecution {
    
    		private final Iterator<ClientHttpRequestInterceptor> iterator;
    
    		@Override
    		public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    			// 当使用服务id访问时,会被拦截,去注册中心拿到真实的url地址
    			if (this.iterator.hasNext()) {
    				ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
    				return nextInterceptor.intercept(request, body, this);
    			}
    			// 当拿到真实的url地址后会进入else,发起http请求
    			else {
    				HttpMethod method = request.getMethod();
    				Assert.state(method != null, "No standard HTTP method");
    				ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
    				request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
    				if (body.length > 0) {
    					if (delegate instanceof StreamingHttpOutputMessage) {
    						StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
    						streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
    					}
    					else {
    						StreamUtils.copy(body, delegate.getBody());
    					}
    				}
    				return delegate.execute();
    			}
    		}
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    LoadBalancerInterceptor.intercept调用上面讲的BlockingLoadBalancerClient.execute方法

    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    
    	@Override
    	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
    			final ClientHttpRequestExecution execution) throws IOException {
    		final URI originalUri = request.getURI();
    		String serviceName = originalUri.getHost();
    		Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    		return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    BlockingLoadBalancerClient.execute方法,会触发LoadBalancerRequestFactory 生成的lamda的HttpRequest,传入instance。这里的instance包含了注册中心微服务实例的真实url地址。拿到真实地址就进入上面的InterceptingRequestExecution.execute的else部分。后面就不再赘述了。

    public class LoadBalancerRequestFactory {
    
    	public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
    			final ClientHttpRequestExecution execution) {
    		return instance -> {
    			HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
    			if (this.transformers != null) {
    				for (LoadBalancerRequestTransformer transformer : this.transformers) {
    					serviceRequest = transformer.transformRequest(serviceRequest, instance);
    				}
    			}
    			return execution.execute(serviceRequest, body);
    		};
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
  • 相关阅读:
    Continua CI持续集成,Continua CI将在整个网络中分发您的构建
    ClickHouse S3 外表调研
    Jekyll 选项(options)和子命令(subcommand)小手册
    shutdown的各类参数及用法
    【Spring boot 集成 JSP】
    WebDAV之葫芦儿·派盘+墨阅
    docker——知识点回顾(1)
    基于C语言BP神经网络的实现
    FPGA project : IIC_wr_eeprom
    python基于PHP+MySQL的图书馆自习室预约占座系统
  • 原文地址:https://blog.csdn.net/camel84/article/details/126650303