所谓服务发现就是指客户端从注册中心获取记录在注册中心中的服务信息。
接下来阅读SpringBoot项目启动时Nacos客户端如何实现服务发现。
NacosReactiveDiscoveryClientConfiguration会自动注入NacosReactiveDiscoveryClient,这是服务发现的入口,NacosReactiveDiscoveryClient实现了ReactiveDiscoveryClient接口,这是SpringCloud服务发现的规范。
新版SpringCloud服务发现的入口是NacosReactiveDiscoveryClientConfiguration,旧版是NacosDiscoveryClientConfiguration。
public class NacosReactiveDiscoveryClientConfiguration {
/**
* 服务发现的入口
* @param nacosServiceDiscovery
* @return
*/
@Bean
@ConditionalOnMissingBean
public NacosReactiveDiscoveryClient nacosReactiveDiscoveryClient(
NacosServiceDiscovery nacosServiceDiscovery) {
return new NacosReactiveDiscoveryClient(nacosServiceDiscovery);
}
}
NacosReactiveDiscoveryClient实现了SpringCloud的接口ReactiveDiscoveryClient,SpringCloud中其他组件如OpenFeign会调用ReactiveDiscoveryClient的getInstances()方法来获取实例列表。
com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClient#getInstances
@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
return Mono.justOrEmpty(serviceId).flatMapMany(loadInstancesFromNacos())
.subscribeOn(Schedulers.boundedElastic());
}
private Function<String, Publisher<ServiceInstance>> loadInstancesFromNacos() {
return serviceId -> {
try {
/**
* 返回的是Mono,响应式
* @see NacosServiceDiscovery#getInstances(String)
*/
return Mono.justOrEmpty(serviceDiscovery.getInstances(serviceId))
.flatMapMany(instances -> {
// 放入本地缓存
ServiceCache.setInstances(serviceId, instances);
return Flux.fromIterable(instances);
});
}
catch (NacosException e) {
log.error("get service instance[{}] from nacos error!", serviceId, e);
// 如果失败了可以从本地缓存获取实例
return failureToleranceEnabled
? Flux.fromIterable(ServiceCache.getInstances(serviceId))
: Flux.empty();
}
};
}
NacosServiceDiscovery主要负责调用,并对结果进行转换。
com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery#getInstances
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
String group = discoveryProperties.getGroup();
/**
* @see NacosNamingService#selectInstances(java.lang.String, java.lang.String, boolean)
*/
List<Instance> instances = namingService().selectInstances(serviceId, group,
true);
return hostToServiceInstanceList(instances, serviceId);
}
NacosNamingService会对实例列表进行简单的选择,实际开发过程中实例的选择是由Ribbon或者LoadBalancer实现。
Nacos服务端会返回所有的实例列表,包含不健康状态的实例,具体要不要选择不健康的实例由客户端决定。
com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, java.util.List
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
// subscribe默认为true
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
先从本地缓存中查询服务对应的实例列表,如果本地缓存中没有就会实时的去查询Nacos服务端,最后会开启一个定时任务定时去Nacos服务端查询。
com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfo
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 从本地缓存serviceInfoMap中查
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
// 去nacos-server查询
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 定时去nacos-server查询
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
com.alibaba.nacos.client.naming.core.HostReactor#updateServiceNow
private void updateServiceNow(String serviceName, String clusters) {
try {
// 立即查询服务列表
updateService(serviceName, clusters);
} catch (NacosException e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}
}
com.alibaba.nacos.client.naming.core.HostReactor#updateService
public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 发起http调用,这里有个参数给了一个udp端口,当有服务更新时,nacos服务端会通过这个udp端口来通知客户端
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
直接发起http请求Nacos服务端,请求地址为/nacos/v1/ns/service/list。
更多API参考:https://nacos.io/zh-cn/docs/open-api.html
com.alibaba.nacos.client.naming.net.NamingProxy#queryList
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}
启动定时任务默认1秒查询一次服务列表。
com.alibaba.nacos.client.naming.core.HostReactor#scheduleUpdateIfAbsent
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
定时任务里面还是会调用updateService()查询服务列表。
com.alibaba.nacos.client.naming.core.HostReactor.UpdateTask#run
public void run() {
long delayTime = DEFAULT_DELAY;
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
// 还是调用updateService
updateService(serviceName, clusters);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
} finally {
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
为了获取实时的服务列表,Nacos客户端不仅有定时任务1秒执行一次去获取服务列表,还开启了一个UDP端口,用于接收Nacos服务端服务实例信息变更的推送。
com.alibaba.nacos.client.naming.core.PushReceiver#PushReceiver
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
String udpPort = getPushReceiverUdpPort();
if (StringUtils.isEmpty(udpPort)) {
this.udpSocket = new DatagramSocket();
} else {
// 开启一个UDP端口,接收服务器端的服务实例信息推送
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
}
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
this.executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
@Override
public void run() {
while (!closed) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// 死循环,一直接受UDP请求
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
// 处理服务列表
hostReactor.processServiceJson(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}