• Spring Cloud源码分析之eureka+feign远程调用


    是什么

    Eureka是一个REST (Representational State Transfer)服务,用于定位服务,以实现中间层服务器的负载平衡和故障转移,我们称此服务为Eureka服务器。Eureka还有一个基于java的客户端组件,Eureka客户端,这使得与服务的交互更加容易,同时客户端也有一个内置的负载平衡器,它执行基本的循环负载均衡。

    Feign 是一种声明式服务调用组件,它在 RestTemplate 的基础上做了进一步的封装。通过 Feign,我们只需要声明一个接口并通过注解进行简单的配置(类似于 Dao 接口上面的 Mapper 注解一样)即可实现对 HTTP 接口的绑定。

    通过 Feign,我们可以像调用本地方法一样来调用远程服务,而完全感觉不到这是在进行远程调用。

    为什么

    eureka解决了什么问题?为什么要用eureka?

    eureka为分布式环境引入了服务注册与发现的能力,如果没有服务注册与发现,我们便需要手动去维护一大堆服务的地址信息,而当服务的状态变化发生变化,比如有新的服务加入或某些现有服务突然不可用时,现存的服务也无法感知到并及时切换到可用的服务上去。

    相对其他提供服务注册与发现的组件(比如说zk),Eureka更偏向于AP。如果分布式系统对可用性的要求更高,那么,Eureka会是一个不错的选择。

    另外,feign屏蔽了请求构建、发送、重试、负载均衡等相关细节,让我们能够从重复的工作中解放出来,更专注于业务本身。

    架构图

    在这里插入图片描述
    在这里插入图片描述

    案例

    eureka server

    引入依赖

    implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-server'
    
    • 1

    启动类

    @EnableEurekaServer
    @SpringBootApplication
    public class EurekaServerApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(EurekaServerApplication.class, args);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    配置文件

    server.port=8761
    eureka.client.register-with-eureka=false
    eureka.client.fetch-registry=false
    
    • 1
    • 2
    • 3

    service consumer

    依赖

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.springframework.cloud</groupId>
    	<artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.springframework.cloud</groupId>
    	<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    启动类

    @SpringBootApplication
    @EnableDiscoveryClient
    @RestController
    @EnableFeignClients
    public class HelloClientApplication {
    	@Autowired
    	HelloClient client;
    
    	@RequestMapping("/")
    	public String hello() {
    		return client.hello();
    	}
    
    	public static void main(String[] args) {
    		SpringApplication.run(HelloClientApplication.class, args);
    	}
    
    	@FeignClient("HelloServer")
    	interface HelloClient {
    		@RequestMapping(value = "/", method = GET)
    		String hello();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    配置文件

    spring:
      application:
        name: HelloClient
    
    server:
      port: 7211
    
    eureka:
      password: password
      client:
        serviceUrl:
          defaultZone: http://user:${eureka.password}@localhost:8761/eureka/
      instance:
        leaseRenewalIntervalInSeconds: 10
        metadataMap:
          instanceId: ${vcap.application.instance_id:${spring.application.name}:${spring.application.instance_id:${server.port}}}
    
    endpoints:
      restart:
        enabled: true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    service provider

    依赖

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.springframework.cloud</groupId>
    	<artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.springframework.cloud</groupId>
    	<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    启动类

    @SpringBootApplication
    @EnableDiscoveryClient
    @RestController
    public class HelloServerApplication {
    	@Autowired
    	DiscoveryClient client;
    
    	@RequestMapping("/")
    	public String hello() {
    		List<ServiceInstance> instances = client.getInstances("HelloServer");
    		ServiceInstance selectedInstance = instances
    				.get(new Random().nextInt(instances.size()));
    		return "Hello World: " + selectedInstance.getServiceId() + ":" + selectedInstance
    				.getHost() + ":" + selectedInstance.getPort();
    	}
    
    	public static void main(String[] args) {
    		SpringApplication.run(HelloServerApplication.class, args);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    配置文件

    spring:
      application:
        name: HelloServer
    
    server:
      port: 7111
    
    eureka:
      password: password
      client:
        serviceUrl:
          defaultZone: http://user:${eureka.password}@localhost:8761/eureka/
      instance:
        leaseRenewalIntervalInSeconds: 10
        metadataMap:
          instanceId: ${vcap.application.instance_id:${spring.application.name}:${spring.application.instance_id:${server.port}}}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    运行

    依次启动eureka server、service provider、service consumer。访问http://localhost:7211/

    如果在service provider的hello方法里打上断点,就可以发现请求由从service consumer的hello方法进到了service provider的hello方法。

    而且,这里并没有在service consumer里配置service provider的地址信息,只是在service consumer里调用了HelloClient接口里的hello方法,便发出了一个到service provider的http请求。

    提出问题

    1. 服务消费者怎么调用服务提供者
    2. 服务消费者如何拿到服务提供者的注册信息
    3. 有多个服务提供者的情况下,服务消费者如何判断访问哪一个
    4. 什么时候往eureka server注册
    5. eureka server怎么判断服务是否可用
    6. 有新服务上线或者服务地址发生变化怎么办
    7. 网络超时怎么办

    初步分析

    service consumerHttpClient接口很简单,只有两个注解和一个接口定义,没有任何Http相关的细节。

    而访问HttpClient的方式是通过容器里的bean来实现的,自然而然可以想到这里一定用到了代理。而FeignClient注解大概率就是代理的Pointcut,代理内部就是网络访问的细节。

    像这种开箱即用的组件,按之前文章的分析经验来看,Spring的套路基本都是通过XXXAutoConfiguration以及EnableXXX注解来实现的。

    可以看到在service consumer的启动类上有EnableDiscoveryClientEnableFeignClients两个注解。

    然后搜一搜eurekaClientAutoConfiguration

    源码分析

    Eureka

    先看整体流程图

    在这里插入图片描述

    EurekaClient初始化

    public class EurekaClientAutoConfiguration {
    
    		//默认启用这个配置类,除非主动将eureka.client.refresh.enable设置为false
    		protected static class RefreshableEurekaClientConfiguration {
    	
    			@Autowired
    			private ApplicationContext context;
    	
    			@Autowired
    			private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
    	
    			public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
    					EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
    
    				//创建eurekaclient,注册register,renew,refresh任务到线程池
    				CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
    						this.context);
    				cloudEurekaClient.registerHealthCheck(healthCheckHandler);
    				return cloudEurekaClient;
    			}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    获取注册信息并注册定时任务

    public class DiscoveryClient implements EurekaClient {
    		DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    				//获取注册信息并保存
    				boolean primaryFetchRegistryResult = fetchRegistry(false);
    
    				if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
                try {
    								//注册
                    if (!register() ) {
                        throw new IllegalStateException("Registration error at startup. Invalid server response.");
                    }
                } catch (Throwable th) {
                    logger.error("Registration error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
    				//初始化用于refresh跟renew的线程池
    				//线程数为2,保证了refresh跟renew的单线程运行
    				scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
    
    				//注册定时任务
    				initScheduledTasks();
    		}
    		private void initScheduledTasks() {
    				//检查eureka.client.fetch-registry配置项的值,默认为true
            if (clientConfig.shouldFetchRegistry()) {
    						//任务执行间隔,默认30s。配置项:eureka.client.registryFetchIntervalSeconds
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
    						//创建任务,内部会执行CacheRefreshThread#run
                cacheRefreshTask = new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                );
                // 注册refresh任务,负责刷新从eureka server获取的注册信息
                scheduler.schedule(
                        cacheRefreshTask,
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
    				//eureka.client.register-with-eurek 默认为true
            if (clientConfig.shouldRegisterWithEureka()) {
    						//间隔,默认10s
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                heartbeatTask = new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                );
    						//注册renew/heartbeatTask
                scheduler.schedule(
                        heartbeatTask,
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    
                // 周期性检查当前服务的状态信息,比如hostname变化,或者改变InstanceStatus或者renew失败。
    						// 一旦状态发生变化,会重新将自己注册到eureka server
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
    
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
    						//注册状态监听器,好处是状态发生变更时可能比周期性更早的感知到变化
    						//参考:ApplicationInfoManager#setInstanceStatus
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    refresh任务: CacheRefreshThread

    class CacheRefreshThread implements Runnable {
        void refreshRegistry() {
            try {
                boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
    
                boolean remoteRegionsModified = false;
                // 判断region是否发生变化
    						// 比如突然某个机房的服务出问题了,需要切到另一个机房。修改fetchRemoteRegionsRegistry配置项
                String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
                if (null != latestRemoteRegions) {
                    String currentRemoteRegions = remoteRegionsToFetch.get();
                    if (!latestRemoteRegions.equals(currentRemoteRegions)) {
    
                        synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                            if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
    														//更新region
                                String[] remoteRegions = latestRemoteRegions.split(",");
                                remoteRegionsRef.set(remoteRegions);
                                instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
    														//保存region变化状态
                                remoteRegionsModified = true;
                            } else {
                                logger.info("Remote regions to fetch modified concurrently," +
                                        " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                            }
                        }
                    }
                }
    						//根据配置的url从eureka server拉取信息
    						//如果region发生变化,则拉取注册信息后进行全量覆盖。否则根据applicationName更新
    						//参考:DiscoveryClient#updateDelta -> applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                boolean success = fetchRegistry(remoteRegionsModified);
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    renew

    public class DiscoveryClient implements EurekaClient {
    
    		boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
    						//将当前实例的Id等信息发至eureka
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                //如果status时404
                if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
    								//将dirty设置为true
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
    										//如果成功,则将dirty设置为false。
    										//如果失败,InstanceInfoReplicator在扫描到dirty为false后会重新注册
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    Feign

    核心组件关系概览

    LoadBalancerAutoConfiguration 引入LoadBalancerAutoConfiguration

    LoadBalancerAutoConfiguration引入LoadBalancerClientConfiguration

    LoadBalancerClientConfiguration引入ServiceInstanceListSupplier

    ServiceInstanceListSupplierBeanFactory获取DiscoveryClient

    DiscoveryClient持有注册到eureka server的服务的信息

    ----

    LoadBalancerClientConfiguration引入RoundRobinLoadBalancer

    RoundRobinLoadBalancerBeanFactory获取ServiceInstanceListSupplier

    ----

    DefaultFeignLoadBalancerConfiguration 引入FeignBlockingLoadBalancerClient

    FeignBlockingLoadBalancerClient从容器中获取RoundRobinLoadBalancer

    ----

    EnableFeignClients 引入FeignClientsRegistrar

    FeignClientsRegistrar 为所有标注了FeignClient的类生成BeanDefinition

    FeignClientsRegistrar 为生成的BeanDefinition注册callback用于生成代理对象

    ----

    FeignClientsRegistrar注册的callback:创建SynchronousMethodHandler ,在其中注入FeignBlockingLoadBalancerClient,然后将SynchronousMethodHandler包装成InvocationHandler并注入代理对象

    调用关系概览

    访问代理对象->SynchronousMethodHandler->FeignBlockingLoadBalancerClient->RoundRobinLoadBalancer->DiscoveryClient获取服务提供者信息→RoundRobinLoadBalancer轮询选择服务提供者→调用目标服务

    流程图

    初始化

    在这里插入图片描述
    在这里插入图片描述

    远程调用

    在这里插入图片描述

    EnableFeignClients

    点进EnableFeignClients. 可以看到它import了FeignClientsRegistrar

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    @Documented
    @Import(FeignClientsRegistrar.class)
    public @interface EnableFeignClients {
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    扫描FeignClient

    FeignClientsRegistrar主要做了两件事

    1. 扫描basePackage下带FeignClient注解的类
    2. 注册工厂方法,用于为第一步扫描出来的类生成代理对象
    public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
    
    		LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>();
    		
    		//指定注解过滤器,过滤的注解为FeignClient
    		scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class));
    		Set<String> basePackages = getBasePackages(metadata);
    		for (String basePackage : basePackages) {
    			//扫描basePackage下带FeignClient注解的类并包装成beanDefinition
    			candidateComponents.addAll(scanner.findCandidateComponents(basePackage));
    		}
    
    		for (BeanDefinition candidateComponent : candidateComponents) {
    			if (candidateComponent instanceof AnnotatedBeanDefinition) {
    				// verify annotated class is an interface
    				AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
    				AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
    
    				Map<String, Object> attributes = annotationMetadata
    						.getAnnotationAttributes(FeignClient.class.getCanonicalName());
    
    				String name = getClientName(attributes);
    				registerClientConfiguration(registry, name, attributes.get("configuration"));
    
    				registerFeignClient(registry, annotationMetadata, attributes);
    			}
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    注册beanDefinition以及callback

    factoryBean.getObject(); 生成代理对象。通过将SynchronousMethodHandler包装成InvocationHandler植入目标对象来完成。而SynchronousMethodHandler里负责http调用的细节。

    private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,
    			Map<String, Object> attributes) {
    		String className = annotationMetadata.getClassName();
    		Class clazz = ClassUtils.resolveClassName(className, null);
    
    		FeignClientFactoryBean factoryBean = new FeignClientFactoryBean();
    		factoryBean.setType(clazz);
    
    		//注册一个回调来生成类型为clazz的bean
    		//往beanDefinition里放入一个Supplier的实例,Spring会优先通过Supplier来创建目标bean
    		BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(clazz, () -> {
    			//返回代理对象
    			return factoryBean.getObject();
    		});
    
    		BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, qualifiers);
    		//注册beanDefinition
    		BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    拦截器

    创建SynchronousMethodHandler.Factory

    public abstract class Feign {
    
    	public <T> T target(Target<T> target) {
          return build().newInstance(target);
      }
    
    	public <T> T target(Target<T> target) {
          return build().newInstance(target);
        }
    
      public Feign build() {
    
    		//创建SynchronousMethodHandler的factory
        SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
            new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
                logLevel, decode404, closeAfterDecode, propagationPolicy, forceDecoding);
        ParseHandlersByName handlersByName =
            new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
                errorDecoder, synchronousMethodHandlerFactory);
        return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    生成代理对象

    public class ReflectiveFeign extends Feign {
    		public <T> T newInstance(Target<T> target) {
    			//通过SynchronousMethodHandler.Factory的create方法创建SynchronousMethodHandler
    		  Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
    		  Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
    
    		  methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
    
    			//通过SynchronousMethodHandler创建FeignInvocationHandler
    		  InvocationHandler handler = factory.create(target, methodToHandler);
    
    			//生成代理对象
    		  T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
    		      new Class<?>[] {target.type()}, handler);
    		
    		  return proxy;
    		}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    InvocationHandler

    InvocationHandler的生成与执行

    static class FeignInvocationHandler implements InvocationHandler {
    
        private final Target target;
        private final Map<Method, MethodHandler> dispatch;
    
        FeignInvocationHandler(Target target, Map<Method, MethodHandler> dispatch) {
    			//SynchronousMethodHandler是MethodHandler的子类
    			//method是目标方法
          this.target = checkNotNull(target, "target");
          this.dispatch = checkNotNull(dispatch, "dispatch for %s", target);
        }
    
    		@Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    			//根据目标方法拿到MethodHandler并调用其invoke方法
          return dispatch.get(method).invoke(args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    重试策略

    SynchronousMethodHandler,http调用入口以及重试策略

    final class SynchronousMethodHandler implements MethodHandler {
    	@Override
      public Object invoke(Object[] argv) throws Throwable {
        RequestTemplate template = buildTemplateFromArgs.create(argv);
        Retryer retryer = this.retryer.clone();
        while (true) {
          try {
    				//执行http request
            return executeAndDecode(template, options);
          } catch (RetryableException e) {
    				//IO异常
    				//服务提供者返回服务端错误(status>300&status!=404),并且response header里有Retry-After
            try {
    					//检查是否重试,重试次数达到retryer.maxAttempts就会抛出RetryableException
    					//重试间隔:如果是服务端错误,则使用Retry-After指定的值,但是不能超过maxPeriod
    					//如果是IO异常,则每次的间隔延长1.5倍
    					//如果符合重试要求,则不抛出异常
    					//服务端异常参考:ErrorDecoder#decode
    					//IO异常参考:SynchronousMethodHandler#executeAndDecode
              retryer.continueOrPropagate(e);
            } catch (RetryableException th) {
              Throwable cause = th.getCause();
              if (propagationPolicy == UNWRAP && cause != null) {
                throw cause;
              } else {
                throw th;
              }
            }
            continue;
          }
        }
      }
    
    	Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
    		...
        Request request = targetRequest(template);
    		try {
    		    response = client.execute(request, options);
    		} catch (IOException e) {
          //抛出RetryableException
          throw errorExecuting(request, e);
        }
    
    		...
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    调用LoadBalancer

    获取服务提供者信息并执行http请求

    public class FeignBlockingLoadBalancerClient implements Client {
    		@Override
    		public Response execute(Request request, Request.Options options) throws IOException {
    			//从eureka client中根据serviceId获取服务提供方的地址信息
    			//serviceId是FeignClient注解的value
    			//通过eurekaClient从eurekaServer获取服务提供方的地址信息
    			ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
    			...
    
    			//生成并执行http请求
    			return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
    					supportedLifecycleProcessors);
    		}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    获取服务提供者信息

    根据FeignClient里设置的value获取服务提供者信息

    public class BlockingLoadBalancerClient implements LoadBalancerClient {
    		public <T> ServiceInstance choose(String serviceId, Request<T> request) {
    				//从容器中获取ReactorServiceInstanceLoadBalancer的实例
    				//默认是RoundRobinLoadBalancer
    				//参考LoadBalancerClientConfiguration
    		   ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
    		   if (loadBalancer == null) {
    		      return null;
    		   }
    				//通过loadBanlancer获取地址信息
    		   Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
    		   if (loadBalancerResponse == null) {
    		      return null;
    		   }
    		   return loadBalancerResponse.getServer();
    		}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    负载均衡处理

    public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {	
    	@Override
    	public Mono<Response<ServiceInstance>> choose(Request request) {
    		//拿到ServiceInstanceListSupplier,持有eurekaClient
    		ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
    				.getIfAvailable(NoopServiceInstanceListSupplier::new);
    
    		//根据请求从eureka拿到所有的服务端实例后选出一个实例
    		return supplier.get(request).next()
    				.map(serviceInstances -> 
    						//从serviceInstances选择一个实例
    						processInstanceResponse(supplier, serviceInstances)
    				);
    	}
    
    	private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
    			List<ServiceInstance> serviceInstances) {
    		//轮询选出一个实例
    		Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
    		if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
    			((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
    		}
    		return serviceInstanceResponse;
    	}
    
    	private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
    		//每次获取+1
    		int pos = Math.abs(this.position.incrementAndGet());
    		//取余实现轮询
    		ServiceInstance instance = instances.get(pos % instances.size());
    
    		return new DefaultResponse(instance);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    解决问题

    1. 通过代理生成http进行远程调用
    2. 过eureka clienteureka server根据appName(FeignClient注解的value)获取服务提供者信息,并通过refresh保证信息及时性
    3. 默认通过轮询的方式
    4. 启动时,并通过周期性任务以及状态监听器保证数据变更时能够及时通知到eureka server
    5. 通过定时(默认10s)向eureka server发送心跳消息(renew)
    6. 通过定时(默认30s)从eureka server获取服务信息,然后根据appName更新本地缓存(refresh)
    7. 如果没超过最大重试次数,则定时重试,间隔时间每次增加1.5倍,但不超过最大重试时间(默认5s)。如果服务端返回的header里设置了Retry-After,则根据该header指定的值来设置间隔时间,且不超过配置的最大间隔

    参考

    https://github.com/Netflix/eureka/wiki/Eureka-at-a-glance

  • 相关阅读:
    笔记本电脑没有麦克风,声音无法找到输入设备
    0010【Edabit ★☆☆☆☆☆】Maximum Edge of a Triangle
    10月9日,每日信息差
    python_selenium自动化测试框架
    VMWARE安装Ubuntu24.04桌面版的问题
    x64 简介
    NLP:生成熟悉NLP开源工具,如NLTK、 HanLP等,并搜寻、下载和熟悉PKU、 CoreNLP, LTP MSR, AS CITYI 等语料库。
    C语言进阶——字符函数和字符串函数(下)
    react源码分析:深度理解React.Context
    文件防泄密系统如何保障企业文档的安全性?
  • 原文地址:https://blog.csdn.net/scientificCommunity/article/details/127661696