• Nacos服务发现原理分析


    微服务将自己的实例注册到nacos注册中心,nacos服务端存储了注册列表,然后通过ribbon调用服务,具体是如何调用?如果nacos服务挂了,还能正常调用服务吗?调用的服务列表发生变化,调用方是如何感知变化的?带着这些问题,来探索一下服务发现的原理。

    版本 2.1.1

    • Nacos Server:2.1.1
    • spring-cloud-starter-alibaba:2.1.1.RELEASE
    • spring-boot:2.1.1.RELEASE
    • spring-cloud-starter-netflix-ribbon:2.1.1.RELEASE

    客户端和服务端版本号都为2.1.1

    从 Ribbon 讲起

    使用ribbon来调用服务,就添加ribbon依赖:

    <dependency>
        <groupId>org.springframework.cloudgroupId>
        <artifactId>spring-cloud-starter-netflix-ribbonartifactId>
    dependency>
    

    ribbon依赖包含spring-cloud-commons依赖,而在spring-cloud-commons包中spring.factories自动配置LoadBalancerAutoConfiguration类:

    @LoadBalanced
    @Autowired(required = false)
    private List restTemplates = Collections.emptyList();
    
    @Bean
    public LoadBalancerInterceptor ribbonInterceptor(
        LoadBalancerClient loadBalancerClient,
        LoadBalancerRequestFactory requestFactory) {
      return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
    }
    

    只要标注了@LoadBalanced注解的restTemplates都会添加负载均衡拦截器LoadBalancerInterceptor

    使用Ribbon组件调用服务:

    restTemplate.getForObject("http://service-name",String.class);
    

    restTemplatehttp请求方法,最终会调用到doExecute方法。doExecute在发起http请求之前,会先执行LoadBalancerInterceptor负载均衡拦截器的intercept方法。 该方法调用execute方法。

    而在execute方法中,主要有两个方法:

    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer, hint);
    

    execute先通过getLoadBalancer获取ILoadBalancer实例,然后再通过getServer获取Server实例。

    getLoadBalancer最终会调用RibbonServerList接口,具体调用流程:

    getLoadBalancer() ->
    ZoneAwareLoadBalancer -> 
    DynamicServerListLoadBalancer -> 
    restOfInit()->
    updateListOfServers()->
    ServerList.getUpdatedListOfServers()->
    

    Nacos实现类NacosServerList实现了ServerList接口。

    总之我们在进行微服务调用的时候,Ribbon最终会调用NacosServerList类中的getUpdatedListOfServers方法。

    Nacos 获取服务

    NacosServerList类的getUpdatedListOfServers方法调用了该类的getServers方法:

    private List getServers() {
      try {
        // 获取分组 
        String group = discoveryProperties.getGroup();
        // 重点,查询实例列表
        List instances = discoveryProperties.namingServiceInstance()
            .selectInstances(serviceId, group, true);
        return instancesToServerList(instances);
      }
      catch (Exception e) {
        throw new IllegalStateException(
            "Can not get service instances from nacos, serviceId=" + serviceId,
            e);
      }
    }
    

    重点看NacosNamingService类的selectInstances方法,会调用以下selectInstances三个重载方法:

    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
        return selectInstances(serviceName, groupName, healthy, true);
    }
        
    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {
        return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
    }
        
    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
    
        ServiceInfo serviceInfo;
        // 默认订阅
        if (subscribe) {
            // 获取服务,这是重点
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        return selectInstances(serviceInfo, healthy);
    }
    

    最后一个selectInstances方法里面的hostReactor.getServiceInfo方法是获取服务的核心方法:

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        // 先在本地缓存查询
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        // 查询不到 
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);
    
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            updatingMap.put(serviceName, new Object());
            // 请求Nacos Server实例,并更新服务实例
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
    
        } else if (updatingMap.containsKey(serviceName)) {
    
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        // 定时更新本地缓存
        scheduleUpdateIfAbsent(serviceName, clusters);
    
        return serviceInfoMap.get(serviceObj.getKey());
    }
    

    getServiceInfo是服务发现的核心方法,先查询serviceInfoMap集合中查询本地缓存,本地缓存查询不到就请求Nacos Server实例,并更新本地缓存。

    请求Nacos Server实例,实际就是发送http请求Nacos Server

    public void updateServiceNow(String serviceName, String clusters) {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            // 调用 Nacos Server 查询服务
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
            // 结果不为空,更新缓存  
            if (StringUtils.isNotEmpty(result)) {
                processServiceJSON(result);
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }
    
    //向 Nacos Server发起 HTTP 列表查询
    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {
    
        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
    
        return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
    }
    

    queryList方法主要封装号请求参数,然后向Nacos Server服务端发送http请求。

    当服务端实例发生改变时,Nacos Server会推送最新的实例给服务端。

    服务发现是先获取本地缓存,如果没有本地缓存,就请求Nacos Server服务端获取数据,如果Nacos Server挂了,也不会影响服务的调用。

    总结

    • Ribbon
      • 项目启动时,会创建一个负载均衡拦截器。
      • Ribbon发起服务请求开始,最终会调用到拦截器的拦截方法。
      • 拦截方法又调用ServerList获取实例接口,而NacosServerList实现获取实例列表。
    • Nacos调用服务
      • NacosServerList实现了获取服务实例列表。
      • NacosServerListselectInstances方法最终调用了hostReactor.getServiceInfo方法
      • getServiceInfo方法先从serviceInfoMap集合中获取本地缓存,如果本地缓存找不到,就请求Nacos Server获取服务实例,并更新本地缓存。
      • 获取服务之后,定时更新本地缓存。

    参考

  • 相关阅读:
    node笔记_koa框架是什么?
    朴素贝叶斯(基于概率论)
    Python+Selenium做到浏览器所见即所得(全网最简单教程)
    【Spring Boot】常用参数与注解
    在VSCode中自定义文件类型和扩展名关联
    用matlab编写了一个DSP数据处理小软件
    项目成本超支的主要原因以及解决方法
    利用存储过程造测试数据
    JavaScript 进阶 - 第4天
    基于BPM(业务流程管理)的低代码开发平台有哪些优势?
  • 原文地址:https://www.cnblogs.com/jeremylai7/p/17137747.html