
第一次阅读Nacos源码是从源码启动单机运行nacos开始。时隔1年再次Nacos源码,会有更深刻的感受。写下此文章前,笔者阅读了Nacos官方发布的《Nacos 架构与原理》。
- 如从未阅读过Nacos源码,推荐从源码启动单机运行nacos开始阅读,该文章所在的专栏也会进一步介绍。
- 如果阅读过Nacos源码,推荐再阅读Nacos官方发布的《Nacos 架构与原理》,可以从架构层面认识Nacos是怎样设计的。
- 本博客主要记录笔者二次阅读研究到的东西,或者说是理顺了某些逻辑。因此部分地方可能写得比较简略,部分地方写得比较详细。本博客在行文构思处还有待提高,看不懂本文或有疑惑的小伙伴可留言。
- 笔者以思维导图的方式从宏观角度归纳总结了关于Nacos服务注册Distro协议的设计机制思维导图,如果看不懂则可以先看Nacos辅服务注册总结思维导图。强烈建议先看前面2篇的思维导图,再回来看博客的源码级别讲解。
客户端启动后,会发布一个WebServerInitializedEvent。spring cloud的服务发现组件AbstractAutoServiceRegistration会监听该事件,底层会调用将客户端注册到Nacos服务端的逻辑。
底层实际是封装好一个客户端实例的数据发送HTTP请求给Nacos服务端
服务端处理服务注册的核心入口在com.alibaba.nacos.naming.controllers.InstanceController#register,其实还有其他入口,比如服务端集群做一致性,此处仅从最平常的入口切入研究
parseInstance():从请求参数中解析出一个实例instance,其中有一个操作是instance.setLastBeat(System.currentTimeMillis());,服务端做健康检查的时候,会用当前时间戳-instance.getLastBeat判断实例是否健康serviceManager.registerInstance():注册服务的核心操作。ServiceManager更加关心Service服务层面,比如注册服务、注销服务;Service更加关心 Instance实例层面,比如更新实例。createEmptyService():如果当前Nacos服务端没有当前要注册的服务,则创建一个服务。存储服务的容器是Map,他的key-value分别是Map(namespace, Map(groupName@@serviceName, Service)),第一个key对应的value是ConcurrentSkipListMap类型。
1.1 创建完服务后会把Service服务放入serviceMap容器。
1.2 健康检查 。启动一个带延迟事件的调度循环执行(线程池的应用),每5000毫秒检查客户端有无上报心跳。超过15秒没有上报,则标记客户端为不健康,再过15秒(即超过30秒)没有上报,则剔除该客户端。
1.3 服务端一致性监听。把Service放入Map监听器。此处放了2次,临时实例、永久实例各放了一次,由key做区分。此处监听器涉及了观察者设计模式,降低一致性算法的耦合性,将服务端做一致性协议的能力下沉到Nacos内核模块,更加通用,更加低耦合。前面提到服务注册的入口不只在controller有,服务端集群做一致性的地方也会有,这就是一致性算法下沉到内核模块的作用。
addInstance():添加实例到服务里面。
2.1 KeyBuilder.buildInstanceListKey:构建一个Service的全局唯一标识,基本上所有地方都是靠这个key做逻辑处理。
2.2 addIpAddresses():拿到Service下的所有实例,初始化clusterMap
2.3 consistencyService.put():处理数据一致性。这个很核心,下面单独拿出来讲。
put()方法有2个核心操作,一个是onPut(),更新自身服务端节点的实例数据,推送实例数据给Nacos客户端们。一个是distroProtocol.sync(),做Nacos服务端集群的数据一致性。
onPut():有两个核心操作(1)更新当前Nacos服务端的Cluster中的临时实例Set集合的数据;(2)把Service服务最新的Instances实例数据推送给订阅了该服务的Nacos客户端。datastore。notifier.addTask(key, DataOperation.CHANGE)发布change事件。notifier是一个Runable实现类,该类是DistroConsistencyServiceImpl的内部类,而DistroConsistencyServiceImpl有一个带有@PostConstruct注解的方法,说明Nacos服务端启动完成后,会回调该注解标注的方法,该方法会用线程池执行notifier任务。notifier任务实际就是调用监听器listener的onChange方法listener.onChange()。onChange()方法实际上是将实例添加到Cluster中的Set ephemeralInstances 临时实例Set集合。getPushService().serviceChanged(this)广播Service 被改变的消息,实际是封装UDP报文,发给订阅了该服务的Nacos客户端(客户端获取某个服务的所有实例时,该客户端被添加到订阅容器里面)。总结:用了一个队列做解耦,并配合观察者设计模式,底层实际是把instance实例数据都存储到Cluster中的临时实例Set集合中。
distroProtocol.sync():当前Nacos服务端节点上注册了一个服务实例,将最新的数据同步给Nacos服务端集群的其他节点。DistroKey:含有目标Nacos服务端节点的地址。DistroDelayTask:含有DistroKey。distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask):将key和task添加到一个类型是ConcurrentHashMap的Map中。distroTaskEngineHolder.getDelayTaskExecuteEngine():返回一个Engine,该Engine实例化时,会用Executors.newScheduledThreadPool的线程池执行一个ProcessRunnable任务,该任务会拿到类型是ConcurrentHashMap的Map中所有的key,然后根据key获取对应的Processor(此处拿到的时候DistroDelayTaskProcessor,该processor在delayTaskExecuteEngine实例化的时候就被set进去了),然后执行process()方法。此处用了策略模式。processor()方法底层是构建了DistroSyncChangeTask。DistroSyncChangeTask。他是一个Runnable实现类。实际是将数据发送给Nacos服务端集群的其他节点。待发送的Service数据是Datum结构的二进制序列化。
DistroConsistencyServiceImpl实例化的时候,会注入DistroProtocol,在DistroProtocol实例化的时候,会启动一个线程去拉取Nacos服务端集群其他节点上的服务数据。
DistroProtocol构造器会调用一个startDistroTask(),底层会用线程池执行一个DistroLoadDataTask任务,该任务底层是加载Nacos服务端集群的其他节点的数据,核心方法是load()
load():
2.1 如果只有自己一个服务端节点,那么就休眠1s,一直while循环扫描发现其他服务端节点。
2.2 循坏等待distroComponentHolder.getDataStorageTypes()不为空(此处代码给出的日志打印是distro data storage register),果有数据则退出while循坏,否则休眠1s。distroComponentHolder.getDataStorageTypes()该值是在com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroHttpRegistry#doRegister用@PostConstruct注入的。
2.3 loadAllDataSnapshotFromRemote():发送http请求,拿到远程服务器的所有数据(这些数据是二进制的数据,DistroData)
Nacos在服务注册方面采用了遵循AP定理的自研Distro一致性算法。该算法并不是由单独一处逻实现的,而是由8大机制实现的。详情可见Nacos Distro协议的设计机制
笔者总结Distro协议有8大机制: