• Nacos服务发现与注册中心之服务消费(发现)者(客户端源码)


    Nacos服务发现与注册中心之服务消费(发现)者(客户端源码)

    入口类

    SpringCloud提供了一层抽象,方便用户自定义自己的服务发现客户端。这个接口就是:DiscoveryClient,想要实现这个类,只需要是实现两个核心方法:List getInstances(String serviceId);List getServices(),当我们进行服务调用时,会先通过调用服务的名称,调用getInstances方法,到对应的服务中心获取到服务的IP等信息的列表,最后通过负载均衡算法调用具体的ip地址。

    代码解析

    对于Nacos,它的DiscoveryClient的实现类为:NacosDiscoveryClient,先来看其核心代码:

    
        private NacosServiceDiscovery serviceDiscovery;
    
        @Override
    	public List<ServiceInstance> getInstances(String serviceId) {
    		try {
    			return serviceDiscovery.getInstances(serviceId);
    		}
    		catch (Exception e) {
    			throw new RuntimeException(
    					"Can not get hosts from nacos server. serviceId: " + serviceId, e);
    		}
    	}
    
    	@Override
    	public List<String> getServices() {
    		try {
    			return serviceDiscovery.getServices();
    		}
    		catch (Exception e) {
    			log.error("get service name from nacos server fail,", e);
    			return Collections.emptyList();
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    可以看到主要逻辑是转发到了ServiceDiscovery执行,我们再来看NacosServiceDiscovery的代码

        public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
    		String group = discoveryProperties.getGroup();
    		// 1
    		List<Instance> instances = namingService().selectInstances(serviceId, group,
    				true);
    		return hostToServiceInstanceList(instances, serviceId);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    我们来看标注为1的代码,通过这一行到调用代码去获取对应服务的IP信息等列表。我们一路点进去,最终会来到这个方法com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, java.util.List, boolean, boolean),我们来看这个方法的代码:

        @Override
        public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
                boolean subscribe) throws NacosException {
            
            ServiceInfo serviceInfo;
            if (subscribe) {
                // 2
                serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                        StringUtils.join(clusters, ","));
            } else {
                serviceInfo = hostReactor
                        .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                                StringUtils.join(clusters, ","));
            }
            return selectInstances(serviceInfo, healthy);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    其中,healthy(默认为true) 表示获取的是否状态为健康的节点,我们在之前的文章中讲解过,Nacos与服务提供者之间维持5秒一次的心跳,如果某个服务节点在15秒内没有接收到心跳,那么Nacos会将该节点的状态的健康值设置为false。而如果超过30秒没有接收到心跳,那么就会下线该节点。subscribe参数表示是否订阅该服务,默认为true
    所以我们的代码会走到标注为2那一行,代码方法为:com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfo,我们来看该方法的代码

       public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
            
           // ... 不关注的代码
            
            // 3
            ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
            
            if (null == serviceObj) {
                serviceObj = new ServiceInfo(serviceName, clusters);
                
                serviceInfoMap.put(serviceObj.getKey(), serviceObj);
                
                updatingMap.put(serviceName, new Object());
                // 5
                updateServiceNow(serviceName, clusters);
                updatingMap.remove(serviceName);
                
            } else if (updatingMap.containsKey(serviceName)) {
                
               // ... 不关注的代码
            }
            
            scheduleUpdateIfAbsent(serviceName, clusters);
            
            return serviceInfoMap.get(serviceObj.getKey());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    我们看到标注3的代码, 程序会尝试先从本地缓存中获取服务的服务提供者列表,我们这次讨论的是首次访问服务的情况,那么获取到的ServiceInfo为NULL,此时会走null == serviceObj分支,那么我们重点来看标注为5的代码,这里终于真实的访问Nacos去拿服务提供者的信息了,我们顺着该方法一路DEBUG会来到这个方法:com.alibaba.nacos.client.naming.core.HostReactor#updateService

        public void updateService(String serviceName, String clusters) throws NacosException {
            ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
            try {
                
    			// 6
                String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
                
                if (StringUtils.isNotEmpty(result)) {
                    processServiceJson(result);
                }
            } finally {
                if (oldService != null) {
                    synchronized (oldService) {
                        oldService.notifyAll();
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    其中标注6的为发起HTTP请求到Nacos服务器,获取服务提供者信息,该方法为:com.alibaba.nacos.client.naming.net.NamingProxy#queryList,代码为:

        public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
                throws NacosException {
            
            final Map<String, String> params = new HashMap<String, String>(8);
            params.put(CommonParams.NAMESPACE_ID, namespaceId);
            params.put(CommonParams.SERVICE_NAME, serviceName);
            params.put("clusters", clusters);
            // 7
            params.put("udpPort", String.valueOf(udpPort));
            params.put("clientIP", NetUtils.localIP());
            params.put("healthyOnly", String.valueOf(healthyOnly));
            
            return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    我们看到标注7,请求时还发送了一个UDP的端口号,这个是用来做什么的呢?,这个后续会有说明,现在先知道这里会传这个数据。
    我们回到方法:com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfo,当我们获取到了服务提供者的信息后

       public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
            
           // ... 不关注的代码
            
            // 3
            ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
            
            if (null == serviceObj) {
                serviceObj = new ServiceInfo(serviceName, clusters);
                
                serviceInfoMap.put(serviceObj.getKey(), serviceObj);
                
                updatingMap.put(serviceName, new Object());
                // 5
                updateServiceNow(serviceName, clusters);
                updatingMap.remove(serviceName);
                
            } else if (updatingMap.containsKey(serviceName)) {
                
               // ... 不关注的代码
            }
            
            // 8
            scheduleUpdateIfAbsent(serviceName, clusters);
            
            return serviceInfoMap.get(serviceObj.getKey());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    我们看到标注为8的代码,执行了标注5的代码后,执行scheduleUpdateIfAbsent(serviceName, clusters)方法,顾名思义,它的作用是定时更新,因为我们在获取了服务提供者的信息后会将其保存到本地缓存,那么此时为了保证本地缓存与Nacos服务端数据的一致性,就需要有个定时任务去更新这个服务提供者的信息了,我们来看scheduleUpdateIfAbsent(serviceName, clusters)方法的源码

    	
    	long lastRefTime = Long.MAX_VALUE;
    
        public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
            
            synchronized (futureMap) {
                if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                    return;
                }
                
                ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
                futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    其中定时任务的逻辑在类:UpdateTask中,它实现了Runnable接口,我们来看这个类的逻辑代码:

            @Override
            public void run() {
                long delayTime = DEFAULT_DELAY;
                
                try {
                    ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                    
                    if (serviceObj == null) {
                        updateService(serviceName, clusters);
                        return;
                    }
                    
                    // 9
                    if (serviceObj.getLastRefTime() <= lastRefTime) {
                        updateService(serviceName, clusters);
                        serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                    } else {
                        // if serviceName already updated by push, we should not override it
                        // since the push data may be different from pull through force push
                        refreshOnly(serviceName, clusters);
                    }
                    
                    // 10
                    lastRefTime = serviceObj.getLastRefTime();
                    
                    if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
                            .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                        // abort the update task
                        NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                        return;
                    }
                    if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                        incFailCount();
                        return;
                    }
                    // 11
                    delayTime = serviceObj.getCacheMillis();
                    resetFailCount();
                } catch (Throwable e) {
                    incFailCount();
                    NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
                } finally {
                    executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    我们看到标注9处,从缓存中拿到serviceObj后,在此处判断serviceObj的最后更新时间是否小于等于变量:lastRefTime的值,而该值的初始值为:Long.MAX_VALUE,并且后面会在标注10处,将serviceObj.getLastRefTime()赋值给到lastRefTime,所以是可以基本保证每次标注9处都为true的,什么时候不为true呢?这里留个坑,就先叫这个坑为坑1。那么当标注9处为true时,就会调用updateService(serviceName, clusters);,这个方法正是上面提到的到Nacos获取服务提供者地址的方法,并且这个方法在获取到服务提供者的数据后,会更新本地缓存。
    接下来我们来看标注11,这里会更新定时任务的间隔,看到finally代码块,这里有个判断:会拿到delayTime << failCountDEFAULT_DELAY * 60之间的最小值来做下一次定时任务的时间间隔,那么这两个值分别是多少呢?我们先来看DEFAULT_DELAY ,它的值为:1000L, 那么很明显DEFAULT_DELAY * 60表示的就是一分钟了,而delayTime的值是从ServiceInfo中拿到的,我们DEBUG这个字段可以看到它的默认值也为1000L,但是我们知道ServiceInfo是从Nacos中获取到的,那么它的具体值,应该由Nacos返回的数据决定。我们看到上文的com.alibaba.nacos.client.naming.net.NamingProxy#queryList的源码,请求服务提供者信息的接口地址为:/instance/list,因为,我们到Nacos的源码来看这个方法,方法名为:com.alibaba.nacos.naming.controllers.InstanceController#list

        @RequestMapping(value = "/list", method = RequestMethod.GET)
        public JSONObject list(HttpServletRequest request) throws Exception {
    
            String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
                Constants.DEFAULT_NAMESPACE_ID);
    
            String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            String agent = request.getHeader("Client-Version");
            if (StringUtils.isBlank(agent)) {
                agent = request.getHeader("User-Agent");
            }
            String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
            String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
            Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
            String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
            boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    
            String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    
            String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    
            boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
            return doSrvIPXT(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    可以看到最终返回的数据在方法:doSrvIPXT中,那么我们来看这个方法:

        public JSONObject doSrvIPXT(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort,
                                    String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
    
    	// ... 省略不关注的代码
    
            long cacheMillis = switchDomain.getDefaultCacheMillis();
    
            // now try to enable the push
            try {
                if (udpPort > 0 && pushService.canEnablePush(agent)) {
                    pushService.addClient(namespaceId, serviceName,
                        clusters,
                        agent,
                        new InetSocketAddress(clientIP, udpPort),
                        pushDataSource,
                        tid,
                        app);
                    cacheMillis = switchDomain.getPushCacheMillis(serviceName);
                }
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-API] failed to added push client", e);
                cacheMillis = switchDomain.getDefaultCacheMillis();
            }
    
            // ... 省略不关注的代码
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    由于方法太长,我们省略掉不关注的代码,只看cacheMillis ,这个放回会影响到我们上文讨论的定时任务间隔时间(delayTime),首先这个值会被赋值为switchDomain.getDefaultCacheMillis();,DEBUG这个方法可以发现这个值为:TimeUnit.SECONDS.toMillis(3);,即3秒,那么后续会有个判断udpPort > 0 && pushService.canEnablePush(agent),我们还记得之前有说过调用获取服务提供者列表的接口会传一个UDP的端口号,这个判断就是说如果服务发现者客户端有传UDP端口号,并且这个端口号是可用的,那么会走这个逻辑,这里我们看cacheMillis = switchDomain.getPushCacheMillis(serviceName);,而switchDomain.getPushCacheMillis(serviceName);的值为:TimeUnit.SECONDS.toMillis(10);即10秒,所以我们可以得出结论:正常情况下,如果服务发现在获取服务提供者列表时有传UDP端口号,并且这个端口号是可以进行推送的,那么服务发现者更新服务提供者信息的时间间隔为:10秒,否则为3秒,我想其中的原因是:当服务提供者状态发生变化时,如果Nacos服务端可以通过UDP端口主动通知服务发现者,那么服务发现者是可以相对即使更新本地缓存的,此时服务发现者就不需要太频繁的调用接口来获取服务提供者的最新数据了,否则的话,为了保证数据的实施有效性,就需要3秒的间隔请求Nacos来获取服务提供者的最新数据了。

    填坑:

    • 坑1:因为Nacos可以主动推送服务提供者信息变更的消息到服务消费端,因此此时定时任务中的本地字段lastRefTime就可能小于本地缓存中的serviceObj.getLastRefTime()了。

    总结

    • Nacos服务消费者获取到服务提供者信息后会在本地保存一份缓存。
    • Nacos服务消费者为了保持本地缓存数据与Nacos的一致性,会定时的获取最新数据。
    • 获取最新数据的时间间隔为:如果服务消费者开启了UDP推送,并且端口可用,则每10秒拉取一次,否则3秒拉去一次。
    • Nacos服务端,可通过服务消费者开启的UDP端口,主动发送服务提供者状态变更信息(这样可以减少服务消费者请求Nacos服务端的次数,减少压力,并且还可以提高双方数据的一致性)。
  • 相关阅读:
    技术分享| anyRTC之RTN网络
    Vue学习之认识到应用(三)
    C++语法基础(8)——指针与引用
    HTML5期末大作业:美妆网页主题网站设计——清新的手工肥皂网站展示(4页)HTML+CSS+JavaScript
    Electron安装问题
    php实现选择排序法
    i5 12600HX比i5 12600H选哪个好
    css之复合选择器
    acwing 5283. 牛棚入住
    【Vue3 源码解析】computed
  • 原文地址:https://blog.csdn.net/zmjmvp/article/details/126028247