• Nacos源码之AP一致性协议实现


    CAP介绍

    在了解Nacos一致性协议之前先了解一下CAP是什么:

    C:一致性

    A:可用性

    P:分区容错性。针对多节点部署的系统,分区就代表了网络分区,由于网络原因节点之间无法通信进行数据同步。容错指在这种情况下,系统仍然可以对外提供服务。

    分布式系统中,首先需要保证P,然后在C和A之间做权衡:

    在满足P的情况下,如果向node1写入一条数据,因为分区产生则数据无法同步给其他节点,此时就需要在C和A直接做出选择。

    选择C,则需要保证所有节点数据的一致性,则整个分布式系统对外暂时不可用。

    选择A,则会舍弃C,分布性系统仍然可以对外提供服务,但是因为数据没有同步,则通过不同节点查询返回的结果回不一致。

    Nacos中的一致性协议

    作为一个分布式系统,Nacos 的服务管理和配置管理都支持 AP、CP 协议。相比较而言Zookeeper作为服务发现常用的一种实现方式只支持CP协议。本篇还是以注册中心功能为基础进行介绍。

    服务之间感知对方服务可正常提供服务的实例信息,必须从注册中心获取。因此对注册中心的可用性就会有着更高的要求,尽可能保证服务注册功能的可用性。

    在Nacos中,注册中心分为非持久化和持久化两种服务:

    非持久化对应了AP协议:保障服务的可用性,在一定时间内,各节点数据可以达成一致。

    持久化则对应的CP协议:保障了各个节点数据的强一致性。

    Nacos中AP协议的具体实现则是Distro 协议,是Nacos自研的一种协议。而CP协议的实现则是Raft协议。具体代码代码实现都在naming.consistency包中。

    在Distro 协议的设计下

    Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。

    每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据⼀致性。

    对于读请求每个节点可以独立处理,及时从本地发出响应。

    对于写请求,Nacos中会员前置Filter根据请求中包含的 IP 和 port 信息计算其所属的 Distro 责任节点, 并将该请求转发到所属的 Distro 责任节点上。

    Distro协议源码分析

    在之前我们已经知道了,Nacos注册中心实现的大体流程,最终的数据存储在ServiceManager类中的双层Map中,之后就会通知客户端,推送最新的实例信息。

    1. /**
    2. * Map(namespace, Map(group::serviceName, Service)).
    3. */
    4. private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
    5. 复制代码

    在往Service模型中添加实例(**ServiceManager.**addInstance)的时候调用了:

    1. consistencyService.put(key, instances);
    2. 复制代码

    这里的ConsistencyService就是我们需要关注的重点了。首先ConsistencyService接口具有多个实现类,那么在调用上面方法的时候到底是哪个实现类?通过注入的name可以发现其实是DelegateConsistencyServiceImpl:

    1. @Resource(name = "consistencyDelegate")
    2. private ConsistencyService consistencyService;
    3. //实现类
    4. @DependsOn("ProtocolManager")
    5. @Service("consistencyDelegate")
    6. public class DelegateConsistencyServiceImpl ...
    7. //在DelegateConsistencyServiceImpl中定义了两种实现类
    8. private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;
    9. private final EphemeralConsistencyService ephemeralConsistencyService;
    10. @Override
    11. public void put(String key, Record value) throws NacosException {
    12. mapConsistencyService(key).put(key, value);
    13. }
    14. private ConsistencyService mapConsistencyService(String key) {
    15. return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    16. }
    17. 复制代码

    通过上面的代码可以知道,在非持久化的情况下,真正的实现类是ephemeralConsistencyService。而ephemeralConsistencyService具有一个唯一的实现类就是DistroConsistencyServiceImpl。到此就对应了上面所说的非持久化对应了AP协议,AP协议的具体实现则是Distro 协议。

    Distro协议下完整的的put方法如下:

    1. @Override
    2. public void put(String key, Record value) throws NacosException {
    3. onPut(key, value);
    4. // If upgrade to 2.0.X, do not sync for v1.
    5. if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
    6. return;
    7. }
    8. distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
    9. DistroConfig.getInstance().getSyncDelayMillis());
    10. }
    11. 复制代码

    可以看到在onPut之后,调用了distroProtocol.sync方法,这个方法就是用来像其他节点同步当前节点注册实例数据用的,可以保证每个节点都有全量的注册实例信息,具体实现:

    1. /**
    2. * Start to sync data to all remote server.
    3. *
    4. * @param distroKey distro key of sync data
    5. * @param action the action of data operation
    6. * @param delay delay time for sync
    7. */
    8. public void sync(DistroKey distroKey, DataOperation action, long delay) {
    9. //遍历除当前节点之外所有的集群节点
    10. for (Member each : memberManager.allMembersWithoutSelf()) {
    11. syncToTarget(distroKey, action, each.getAddress(), delay);
    12. }
    13. }
    14. 复制代码

    到这里我们发现执行sync方法的是DistroProtocol,它是个什么东西呢?它其实就是整个Distro协议的入口。其中实现了集群多节点之间同步数据所需要的各种操作:比如初始化时向其他节点加载数据、同步数据到指定节点、获取当前节点的快照数据。

    这里简单看一下新加入节点的的初始化操作:

    1. @Component
    2. public class DistroProtocol {
    3. public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
    4. DistroTaskEngineHolder distroTaskEngineHolder) {
    5. this.memberManager = memberManager;
    6. this.distroComponentHolder = distroComponentHolder;
    7. this.distroTaskEngineHolder = distroTaskEngineHolder;
    8. startDistroTask();
    9. }
    10. ....
    11. )
    12. 复制代码

    通过注解可以知道在服务启动之后就会调用startDistroTask,从集群的其他节点同步注册实例数据到当前节点,来保障集群的的每台机器上都维护了当前的所有注册上来的非持久化实例数 据:

    1. private void load() throws Exception {
    2. while (memberManager.allMembersWithoutSelf().isEmpty()) {
    3. Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
    4. TimeUnit.SECONDS.sleep(1);
    5. }
    6. while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
    7. Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
    8. TimeUnit.SECONDS.sleep(1);
    9. }
    10. for (String each : distroComponentHolder.getDataStorageTypes()) {
    11. if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
    12. loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
    13. }
    14. }
    15. }
    16. private boolean loadAllDataSnapshotFromRemote(String resourceType) {
    17. DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
    18. DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    19. if (null == transportAgent || null == dataProcessor) {
    20. Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
    21. resourceType, transportAgent, dataProcessor);
    22. return false;
    23. }
    24. for (Member each : memberManager.allMembersWithoutSelf()) {
    25. long startTime = System.currentTimeMillis();
    26. try {
    27. Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
    28. DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
    29. Loggers.DISTRO.info("[DISTRO-INIT] it took {} ms to load snapshot {} from {} and snapshot size is {}.",
    30. System.currentTimeMillis() - startTime, resourceType, each.getAddress(),
    31. getDistroDataLength(distroData));
    32. boolean result = dataProcessor.processSnapshot(distroData);
    33. Loggers.DISTRO
    34. .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
    35. result);
    36. if (result) {
    37. distroComponentHolder.findDataStorage(resourceType).finishInitial();
    38. return true;
    39. }
    40. } catch (Exception e) {
    41. Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
    42. }
    43. }
    44. return false;
    45. }
    46. 复制代码

    可以看到这里会遍历集群其他节点进行数据的同步,具体操作是轮询所有的 Distro 节点,通过向其他的机器发送请求拉取全量数据。

    DistroProtocol中还有很多其他同步数据相关的方法实现,大家可以自己结合某个点来去查看阅读。

    总结

    到此我们已经基本了解了Naocs中临时节点所使用的一致性协议。作为注册中心功能来说需要保证其可用性,一般会选用AP协议,也就是Distro。在Distro 协议的设计思想下,每个 Distro 节点都可以接收到读写请求,并且会全量或者定期的进行数据同步,保障集群中所有节点数据的最终一致性。

    关于相关的代码,在源码中还有更多实现,大家可以自行去阅读。

  • 相关阅读:
    ASP.NET Core中创建中间件的几种方式
    java学习(常用类)
    如何查看dll文件内导出函数名称
    秦皇岛科学选育新品种 国稻种芯·中国水稻节:河北谱丰收曲
    winform开发经验(1)——调用Invoke更新UI时程序卡死原因以及解决办法
    Word隐藏批注知识分享,快速提升工作效率!
    JAVA泛型及元组
    Python元组详细教程
    多主复制下处理写冲突(1)-同步与异步冲突检测及避免冲突
    Netty编码和解码
  • 原文地址:https://blog.csdn.net/BASK2311/article/details/128182383