对Nacos的源码搭建好了后,就可以开始学习啦。
Nacos的数据结构类型为Map
key:是nameSpace的id,起到隔离环境的作用。nameSpace下可以有多个group
value:又是一个Map, Map
key:代表group分组;
value:分组下的某个服务,比如orderService;
下面主要学习服务注册接口
Nacos提供了服务注册的API接口,客户端只需要向该接口发送请求,即可实现服务注册。
请求类型:POST
请求路径: /nacos/v1/ns/instance
错误编码
在nacos中,该接口在如下目录:
该类里面有register方法,即为注册方法
/**
* Register new instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during register
*/
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
会发现里面有个serviceMap结构即为开始描述的,为nacos注册表
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
添加service的核心方法
/**
* Add instance to service.
*
* @param namespaceId namespace
* @param serviceName service name
* @param ephemeral whether instance is ephemeral
* @param ips instances
* @throws NacosException nacos exception
*/
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances);
}
}
该方法对修改服务列表的动作加了sync,确保线程安全。
1)先获取要更新的实例列表,addIpAddresses()方法
2)然后将更新后的数据封装到Instances对象中,后面更新到注册表使用
3)调用put方法完成Nacos集群的数据同步,保证集群一致性。
在第三步操作中的底层如下:
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
onPut(key, value):其中的value就算Instances,要更新的服务信息。基于线程池的方式,异步的将service信息写入注册表中。
sync():就是通过Distro协议将数据同步给集群中的其他Nacos节点。
onPut()方法如下:
/**
* Put a new record.
*
* @param key key of record
* @param value record
*/
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.CHANGE);
}
最后的addTask,就是将任务加入到阻塞队列中:
/**
* Add new notify task to queue.
*
* @param datumKey data key
* @param action action for data
*/
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
同时,Notifier异步更新。有run()方法:
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
具体的异步消费逻辑在handle()方法中。