• 【Nacos】源码之客户端服务发现


    所谓服务发现就是指客户端从注册中心获取记录在注册中心中的服务信息。

    接下来阅读SpringBoot项目启动时Nacos客户端如何实现服务发现。

    程序的入口

    NacosReactiveDiscoveryClientConfiguration会自动注入NacosReactiveDiscoveryClient,这是服务发现的入口,NacosReactiveDiscoveryClient实现了ReactiveDiscoveryClient接口,这是SpringCloud服务发现的规范。

    新版SpringCloud服务发现的入口是NacosReactiveDiscoveryClientConfiguration,旧版是NacosDiscoveryClientConfiguration。

    public class NacosReactiveDiscoveryClientConfiguration {
    
    	/**
    	 * 服务发现的入口
    	 * @param nacosServiceDiscovery
    	 * @return
    	 */
    	@Bean
    	@ConditionalOnMissingBean
    	public NacosReactiveDiscoveryClient nacosReactiveDiscoveryClient(
    			NacosServiceDiscovery nacosServiceDiscovery) {
    		return new NacosReactiveDiscoveryClient(nacosServiceDiscovery);
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    NacosReactiveDiscoveryClient#getInstances

    NacosReactiveDiscoveryClient实现了SpringCloud的接口ReactiveDiscoveryClient,SpringCloud中其他组件如OpenFeign会调用ReactiveDiscoveryClient的getInstances()方法来获取实例列表。

    com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClient#getInstances

    @Override
    public Flux<ServiceInstance> getInstances(String serviceId) {
    
    	return Mono.justOrEmpty(serviceId).flatMapMany(loadInstancesFromNacos())
    		.subscribeOn(Schedulers.boundedElastic());
    }
    
    private Function<String, Publisher<ServiceInstance>> loadInstancesFromNacos() {
    	return serviceId -> {
    		try {
    			/**
    				 * 返回的是Mono,响应式
    				 * @see NacosServiceDiscovery#getInstances(String)
    				 */
    			return Mono.justOrEmpty(serviceDiscovery.getInstances(serviceId))
    				.flatMapMany(instances -> {
    					// 放入本地缓存
    					ServiceCache.setInstances(serviceId, instances);
    					return Flux.fromIterable(instances);
    				});
    		}
    		catch (NacosException e) {
    			log.error("get service instance[{}] from nacos error!", serviceId, e);
    			// 如果失败了可以从本地缓存获取实例
    			return failureToleranceEnabled
    				? Flux.fromIterable(ServiceCache.getInstances(serviceId))
    				: Flux.empty();
    		}
    	};
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    NacosServiceDiscovery#getInstances

    NacosServiceDiscovery主要负责调用,并对结果进行转换。

    com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery#getInstances

    public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
    	String group = discoveryProperties.getGroup();
    	/**
    		 * @see NacosNamingService#selectInstances(java.lang.String, java.lang.String, boolean)
    		 */
    	List<Instance> instances = namingService().selectInstances(serviceId, group,
    															   true);
    	return hostToServiceInstanceList(instances, serviceId);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    NacosNamingService#selectInstances

    NacosNamingService会对实例列表进行简单的选择,实际开发过程中实例的选择是由Ribbon或者LoadBalancer实现。

    Nacos服务端会返回所有的实例列表,包含不健康状态的实例,具体要不要选择不健康的实例由客户端决定。

    com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, java.util.List, boolean, boolean)

    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
    									  boolean subscribe) throws NacosException {
    
    	ServiceInfo serviceInfo;
    	if (subscribe) {
    		// subscribe默认为true
    		serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
    												 StringUtils.join(clusters, ","));
    	} else {
    		serviceInfo = hostReactor
    			.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
    											  StringUtils.join(clusters, ","));
    	}
    	return selectInstances(serviceInfo, healthy);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    HostReactor#getServiceInfo

    先从本地缓存中查询服务对应的实例列表,如果本地缓存中没有就会实时的去查询Nacos服务端,最后会开启一个定时任务定时去Nacos服务端查询。

    com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfo

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    
    	NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    	String key = ServiceInfo.getKey(serviceName, clusters);
    	if (failoverReactor.isFailoverSwitch()) {
    		return failoverReactor.getService(key);
    	}
    
    	// 从本地缓存serviceInfoMap中查
    	ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    
    	if (null == serviceObj) {
    		serviceObj = new ServiceInfo(serviceName, clusters);
    
    		serviceInfoMap.put(serviceObj.getKey(), serviceObj);
    
    		updatingMap.put(serviceName, new Object());
    		// 去nacos-server查询
    		updateServiceNow(serviceName, clusters);
    		updatingMap.remove(serviceName);
    
    	} else if (updatingMap.containsKey(serviceName)) {
    
    		if (UPDATE_HOLD_INTERVAL > 0) {
    			// hold a moment waiting for update finish
    			synchronized (serviceObj) {
    				try {
    					serviceObj.wait(UPDATE_HOLD_INTERVAL);
    				} catch (InterruptedException e) {
    					NAMING_LOGGER
    						.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
    				}
    			}
    		}
    	}
    
    	// 定时去nacos-server查询
    	scheduleUpdateIfAbsent(serviceName, clusters);
    
    	return serviceInfoMap.get(serviceObj.getKey());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    HostReactor#updateServiceNow

    com.alibaba.nacos.client.naming.core.HostReactor#updateServiceNow

    private void updateServiceNow(String serviceName, String clusters) {
    	try {
    		// 立即查询服务列表
    		updateService(serviceName, clusters);
    	} catch (NacosException e) {
    		NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    HostReactor#updateService

    com.alibaba.nacos.client.naming.core.HostReactor#updateService

    public void updateService(String serviceName, String clusters) throws NacosException {
    	ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    	try {
    
    		// 发起http调用,这里有个参数给了一个udp端口,当有服务更新时,nacos服务端会通过这个udp端口来通知客户端
    		String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
    
    		if (StringUtils.isNotEmpty(result)) {
    			processServiceJson(result);
    		}
    	} finally {
    		if (oldService != null) {
    			synchronized (oldService) {
    				oldService.notifyAll();
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    NamingProxy#queryList

    直接发起http请求Nacos服务端,请求地址为/nacos/v1/ns/service/list。

    更多API参考:https://nacos.io/zh-cn/docs/open-api.html

    com.alibaba.nacos.client.naming.net.NamingProxy#queryList

    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
    	throws NacosException {
    
    	final Map<String, String> params = new HashMap<String, String>(8);
    	params.put(CommonParams.NAMESPACE_ID, namespaceId);
    	params.put(CommonParams.SERVICE_NAME, serviceName);
    	params.put("clusters", clusters);
    	params.put("udpPort", String.valueOf(udpPort));
    	params.put("clientIP", NetUtils.localIP());
    	params.put("healthyOnly", String.valueOf(healthyOnly));
    
    	return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    HostReactor#scheduleUpdateIfAbsent

    启动定时任务默认1秒查询一次服务列表。

    com.alibaba.nacos.client.naming.core.HostReactor#scheduleUpdateIfAbsent

    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    	if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
    		return;
    	}
    
    	synchronized (futureMap) {
    		if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
    			return;
    		}
    
    		ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
    		futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    	}
    }
    
    public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    	return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    UpdateTask#run

    定时任务里面还是会调用updateService()查询服务列表。

    com.alibaba.nacos.client.naming.core.HostReactor.UpdateTask#run

    public void run() {
    	long delayTime = DEFAULT_DELAY;
    
    	try {
    		ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
    
    		if (serviceObj == null) {
    			// 还是调用updateService
    			updateService(serviceName, clusters);
    			return;
    		}
    
    		if (serviceObj.getLastRefTime() <= lastRefTime) {
    			updateService(serviceName, clusters);
    			serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
    		} else {
    			// if serviceName already updated by push, we should not override it
    			// since the push data may be different from pull through force push
    			refreshOnly(serviceName, clusters);
    		}
    
    		lastRefTime = serviceObj.getLastRefTime();
    
    		if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
    			.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
    			// abort the update task
    			NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
    			return;
    		}
    		if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
    			incFailCount();
    			return;
    		}
    		delayTime = serviceObj.getCacheMillis();
    		resetFailCount();
    	} catch (Throwable e) {
    		incFailCount();
    		NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
    	} finally {
    		executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    PushReceiver

    为了获取实时的服务列表,Nacos客户端不仅有定时任务1秒执行一次去获取服务列表,还开启了一个UDP端口,用于接收Nacos服务端服务实例信息变更的推送。
    com.alibaba.nacos.client.naming.core.PushReceiver#PushReceiver

    public PushReceiver(HostReactor hostReactor) {
    	try {
    		this.hostReactor = hostReactor;
    		String udpPort = getPushReceiverUdpPort();
    		if (StringUtils.isEmpty(udpPort)) {
    			this.udpSocket = new DatagramSocket();
    		} else {
    			// 开启一个UDP端口,接收服务器端的服务实例信息推送
    			this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
    		}
    		this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
    			@Override
    			public Thread newThread(Runnable r) {
    				Thread thread = new Thread(r);
    				thread.setDaemon(true);
    				thread.setName("com.alibaba.nacos.naming.push.receiver");
    				return thread;
    			}
    		});
    
    		this.executorService.execute(this);
    	} catch (Exception e) {
    		NAMING_LOGGER.error("[NA] init udp socket failed", e);
    	}
    }
    
    @Override
    public void run() {
    	while (!closed) {
    		try {
    
    			// byte[] is initialized with 0 full filled by default
    			byte[] buffer = new byte[UDP_MSS];
    			DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
    
    			// 死循环,一直接受UDP请求
    			udpSocket.receive(packet);
    
    			String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
    			NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
    
    			PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
    			String ack;
    			if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
    				// 处理服务列表
    				hostReactor.processServiceJson(pushPacket.data);
    
    				// send ack to server
    				ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
    					+ "\"\"}";
    			} else if ("dump".equals(pushPacket.type)) {
    				// dump data to server
    				ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
    					+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
    					+ "\"}";
    			} else {
    				// do nothing send ack only
    				ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
    					+ "\", \"data\":" + "\"\"}";
    			}
    
    			udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
    											  packet.getSocketAddress()));
    		} catch (Exception e) {
    			if (closed) {
    				return;
    			}
    			NAMING_LOGGER.error("[NA] error while receiving push data", e);
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
  • 相关阅读:
    自制操作系统日记(6):静态桌面初步
    商业建筑利用物联网设备实现能源效率的3种方式
    Java中double类型保留小数点后两位的方法
    WEB开发技能树-PHP-用户登录
    CSRF及SSRF详解
    dubbo-admin安装以及dubbo-admin简单使用
    Webpack 5 超详细解读(二)
    MyBatis完成增删改查案例(详细代码)
    基于 gin + websocket 即时通讯项目 (一、项目初始化)
    x.509
  • 原文地址:https://blog.csdn.net/u022812849/article/details/127772471