前言:
前面的系列文章让我们对 Nacos 有了一个基本了解,并知道了如何去试用 Nacos 作为注册中心和配置中心,并且知道了是何时触发 Nacos 的服务注册流程,本篇我们继续从源码层面去分析 Nacos 的服务注册流程。
Nacos 系列文章传送门:
Nacos 配置管理模型 – 命名空间(Namespace)、配置分组(Group)和配置集ID(Data ID)
NacosServiceRegistry#register 方法源码分析
前文我们分析了服务注册到 Nacos 最终是调用了 NacosServiceRegistry#register 方法,该方法中我们看到了 Namespace、Group 等概念,最终又调用了 NacosNamingService#registerInstance 方法完成服务注册。
//com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
public void register(Registration registration) {
//判断注册实例信息中的 service ID 是否为空
if (StringUtils.isEmpty(registration.getServiceId())) {
//为空 记录日志
log.warn("No service to register for nacos client...");
} else {
//不为空 创建名称空间 service
NamingService namingService = this.namingService();
//获取 serviceid
String serviceId = registration.getServiceId();
//获取名称空间下的组 就是 nacos 中的组 默认是 DEFAULT_GROUP 其实就是读取配置文件中配置的组信息
String group = this.nacosDiscoveryProperties.getGroup();
//构建 Instance 实例
Instance instance = this.getNacosInstanceFromRegistration(registration);
try {
//注册服务实例
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
} catch (Exception var7) {
log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
ReflectionUtils.rethrowRuntimeException(var7);
}
}
}
//com.alibaba.cloud.nacos.registry.NacosServiceRegistry#getNacosInstanceFromRegistration
private Instance getNacosInstanceFromRegistration(Registration registration) {
//创建实例对象
Instance instance = new Instance();
//设置ip
instance.setIp(registration.getHost());
//设置端口
instance.setPort(registration.getPort());
//设置权重
instance.setWeight((double)this.nacosDiscoveryProperties.getWeight());
//设置集群名称
instance.setClusterName(this.nacosDiscoveryProperties.getClusterName());
//设置元数据
instance.setMetadata(registration.getMetadata());
return instance;
}
NamingProxy#reqAPI 方法源码解析
NamingProxy#reqAPI 方法会获取所有的 Nacos Server 的地址,然后区分 Nacos Server 是配置的服务列表还是域名,根据不同配置,区分情况发送请求到 Nacos Server 完成服务注册,服务注册的地址是:/nacos/v1/ns/instance。
//com.alibaba.nacos.client.naming.net.NamingProxy#reqAPI(java.lang.String, java.util.Map, java.lang.String)
public String reqAPI(String api, Map<String, String> params, String method) throws NacosException {
//Nacos 服务地址快照
List<String> snapshot = this.serversFromEndpoint;
if (!CollectionUtils.isEmpty(this.serverList)) {
//serverList 不为空 再次赋值
snapshot = this.serverList;
}
//调用 reqAPI 方法
return this.reqAPI(api, params, snapshot, method);
}
//com.alibaba.nacos.client.naming.net.NamingProxy#reqAPI(java.lang.String, java.util.Map, java.util.List, java.lang.String)
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
//获取 namespaceId id
params.put("namespaceId", this.getNamespaceId());
//Nacos 服务地址是否为控股
//nacosDomain nacos 域名是否为空
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
throw new IllegalArgumentException("no server available");
} else {
//异常
Exception exception = new Exception();
//Nacos Server 为空判断
if (servers != null && !servers.isEmpty()) {
//获取 Nacos 服务数的随机数
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for(int i = 0; i < servers.size(); ++i) {
//获取 Nacos Server
String server = (String)servers.get(index);
try {
//发送请求到 Nacos Server 完成注册
return this.callServer(api, params, server, method);
} catch (NacosException var11) {
exception = var11;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
} catch (Exception var12) {
exception = var12;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
}
index = (index + 1) % servers.size();
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
} else {
//配置的 Nacos 域名的情况
int i = 0;
//最多重试 三次
while(i < 3) {
try {
//发送请求到 Nacos Server 完成注册
return this.callServer(api, params, this.nacosDomain);
} catch (Exception var13) {
exception = var13;
LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);
++i;
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
}
}
}
static {
VERSION = "Nacos-Java-Client:v" + VersionUtils.VERSION;
WEB_CONTEXT = "/nacos";
NACOS_URL_BASE = WEB_CONTEXT + "/v1/ns";
NACOS_URL_INSTANCE = NACOS_URL_BASE + "/instance";
NACOS_URL_SERVICE = NACOS_URL_BASE + "/service";
DEFAULT_CLIENT_BEAT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() / 2 : 1;
DEFAULT_POLLING_THREAD_COUNT = Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() / 2 : 1;
}
NamingProxy#callServer 方法源码分析
NamingProxy#callServer 方法逻辑十分简单,校验签名,构建 Header,发送 Http 请求到 Nacos Server,完成服务注册。
//com.alibaba.nacos.client.naming.net.NamingProxy#callServer(java.lang.String, java.util.Map, java.lang.String, java.lang.String)
public String callServer(String api, Map<String, String> params, String curServer, String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0L;
//检查签名
this.checkSignature(params);
//构建 请求 header
List<String> headers = this.builderHeaders();
String url;
//构建请求地址
if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) {
if (!curServer.contains(":")) {
curServer = curServer + ":" + this.serverPort;
}
url = HttpClient.getPrefix() + curServer + api;
} else {
url = curServer + api;
}
//通过 HttpClient 发送请求
HttpResult result = HttpClient.request(url, headers, params, "UTF-8", method);
end = System.currentTimeMillis();
//当前请求的指标监控
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe((double)(end - start));
//响应结果处理
if (200 == result.code) {
return result.content;
} else if (304 == result.code) {
return "";
} else {
throw new NacosException(500, "failed to req API:" + curServer + api + ". code:" + result.code + " msg: " + result.content);
}
}
上面分析 Nacos Client 服务注册的时候提到了心跳发送,这里我们分析一下 Nacos Client 的心跳发生机制和服务续约。
BeatInfo 类
BeatInfo 类是 Nacot 的心跳类,其中的属性也很简单,都是和服务实例相关,如下:
public class BeatInfo {
//端口
private int port;
//ip
private String ip;
//权重
private double weight;
//服务名称
private String serviceName;
//集群
private String cluster;
//元数据信息
private Map<String, String> metadata;
//调度
private volatile boolean scheduled;
//周期
private volatile long period;
private volatile boolean stopped;
public BeatInfo() {
}
}
BeatTask 心跳任务类
BeatTask 作为服务续约的心跳线程对象,run方法中 通过 BeatReactor.this.serverProxy.sendBeat 发送心跳。
//com.alibaba.nacos.client.naming.beat.BeatReactor.BeatTask
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
public void run() {
//判断是否有在进行中的心跳
if (!this.beatInfo.isStopped()) {
//发送心跳
long result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo);
//判断发送心跳的耗时 得到下一次心跳的时间
long nextTime = result > 0L ? result : this.beatInfo.getPeriod();
//定时心跳发送
BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
BeatReactor#addBeatInfo 发送心跳
BeatReactor#addBeatInfo 方法使用线程池 ScheduledExecutorService 来实现心跳发送,默认频率是5秒一次。
//com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
//构建 心跳key
String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//防止并发
if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
//加入到 dom2Beat 中记录
this.dom2Beat.put(key, beatInfo);
//使用线程池 定期执行心跳任务 周期为 period 默认 5 秒
this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
//指标监控
MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
至此,Nacos Client 的服务注册流程分析完毕,后面我们继续分析,服务端的注册机制。
欢迎提出建议及对错误的地方指出纠正。