• Spring Cloud(五):Spring Cloud Alibaba Nacos 1.4.X 注册中心AP & CP架构Raft源码分析


    Spring Cloud(二):Spring Cloud Alibaba Nacos

    https://blog.csdn.net/menxu_work/article/details/126721803

    准备

    下载源码

    git clone git@github.com:alibaba/nacos.git

    git checkout -b 1.4.1 1.4.1

    查找启动类

    找一个Nacos启动脚本查看启动jar包 vim bin/startup.sh

    找到启动 -jar 名 export SERVER="nacos-server"

    方法一: 全局搜索 nacos-server 打包名字pom.xml 文件所在项目

    方法二:解压jar包查找nacos-server/META-INF/MANIFEST.MF

    解压 nacos-server.jar 查找启动入口类 tar -zxvf nacos-server-cp.jar -C nacos-server

    查看文件 vim nacos-server/META-INF/MANIFEST.MF

    Manifest-Version: 1.0
    Implementation-Title: nacos-console 2.1.0
    Implementation-Version: 2.1.0
    Archiver-Version: Plexus Archiver
    Built-By: xiweng.yy
    Spring-Boot-Layers-Index: BOOT-INF/layers.idx
    Specification-Vendor: Alibaba Group
    Specification-Title: nacos-console 2.1.0
    Implementation-Vendor-Id: com.alibaba.nacos
    Spring-Boot-Version: 2.6.7
    Implementation-Vendor: Alibaba Group
    Main-Class: org.springframework.boot.loader.PropertiesLauncher
    Spring-Boot-Classpath-Index: BOOT-INF/classpath.idx
    Start-Class: com.alibaba.nacos.Nacos
    Spring-Boot-Classes: BOOT-INF/classes/
    Spring-Boot-Lib: BOOT-INF/lib/
    Created-By: Apache Maven 3.6.3
    Build-Jdk: 1.8.0_231
    Specification-Version: 2.1.0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    找到入口类: Start-Class: com.alibaba.nacos.Nacos

    nacos consistency entity不存在

    nacos1.4.1采用了protobuf,需要特殊处理才行

    protobuf是google团队开发的用于高效存储和读取结构化数据的工具

    分析:

    这个包目录是由protobuf在编译时自动生成

    方法一: idea 模块consistency执行mvn copmpile在target自动生成他们

    方法二:安装protoc工具在 protof 下执行 protoc --java_out=../java/ ./xxx.proto

    `protoc --java_out=../java/ ./consistency.proto`
    `protoc --java_out=../java/ ./Data.proto`
    
    • 1
    • 2

    自动生成文件标红解决

    在这里插入图片描述

    在这里插入图片描述

    启动Nacos 添加VM参数 -Dnacos.standalone=true

    源码

    服务注册

    客户端服务注册源码

    <dependency>
        <groupId>com.alibaba.cloudgroupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述
    服务注册自动装配类:NacosServiceRegistryAutoConfiguration 查看Auto类

     @Bean
     @ConditionalOnBean({AutoServiceRegistrationProperties.class})
     public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
         return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    NacosAutoServiceRegistration 类图

    在这里插入图片描述

    • AbstractAutoServiceRegistration#onApplicationEvent
    • AbstractAutoServiceRegistration#start
    • AbstractAutoServiceRegistration#register
       protected void register() {
       	this.serviceRegistry.register(this.getRegistration());
       }
      
      • 1
      • 2
      • 3
    • ServiceRegistry#register
    • NacosServiceRegistry#register
    • NacosNamingService.registerInstance(serviceId, group, instance);
    • NamingProxy.registerService(groupedServiceName, groupName, instance);
      在这里插入图片描述
    • JdkHttpClientRequest 使用JDK 的 java.net.HttpURLConnection 请求 (Feign默认也是使用的JDK的这个对象调用)

    Nacos Open Api 接口

    在这里插入图片描述

    服务端服务注册源码

    NacosNamingService * NamingProxy * /nacos/v1/ns/instance

    查找入口

    Naming -> controller -> instance -> POST

    定位到 InstanceController#register 接口类型 Restful 风格

    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = parseInstance(request);
        
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    数据结构

    ServiceManager##serviceMap

    Map(namespace, Map(group::serviceName, Service))

    Map> serviceMap = new ConcurrentHashMap<>();

    Service##clusterMap

    Map clusterMap = new HashMap<>()

    Cluster##persistentInstances / ephemeralInstances

    Set persistentInstances = new HashSet<>();
    Set ephemeralInstances = new HashSet<>();

    在这里插入图片描述

    服务发现

    客户端服务发现源码

    Ribbon 底层调用

    • NacosNamingService#getAllInstances
    • HostReactor#getServiceInfo
    • HostReactor#updateServiceNow(serviceName, clusters);
    • NamingProxy#queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);
      在这里插入图片描述
      在这里插入图片描述

    服务端服务发现源码

    @GetMapping("/list")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
    public ObjectNode list(HttpServletRequest request) throws Exception {
    	...
    	return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                    healthyOnly);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    doSrvIpxt

    ...
    Service service = serviceManager.getService(namespaceId, serviceName);
    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
    ...
    
    • 1
    • 2
    • 3
    • 4

    srvIPs ==> srvIPs#allIPs(clusters)=>clusterObj.allIPs()

    /**
     * Get all instances.
     *
     * @return list of instance
     */
    public List<Instance> allIPs() {
        List<Instance> allInstances = new ArrayList<>();
        allInstances.addAll(persistentInstances);
        allInstances.addAll(ephemeralInstances);
        return allInstances;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Nacos 集群CP架构基于Raft协议源码

    • InstanceController#register
    • serviceManager#registerInstance
    • serviceManager#addInstance
    • consistencyService.put(key, instances);
    • DelegateConsistencyServiceImpl#put mapConsistencyService(key).put(key, value);
      • AP DistroConsistencyServiceImpl#put
      • CP RaftConsistencyServiceImpl#put

    CP RaftConsistencyServiceImpl#put

    • 判断是否是Leader 不是转发给Leader
    • onPublish -> raftStore.write(datum) -> ValueChangeEvent(同步内存cache数据) -> 同步其他节点(没有使用两阶段:直接提交/raft/datum/commit)
    • 采用直接提交,如果发生异常,但是这时内存已经同步了

    Nacos AP 架构是写Mysql
    Nacos CP 架构是写文件
    Nacos 配置中心写Mysql

    /**
      * Signal publish new record. If not leader, signal to leader. If leader, try to commit publish.
      *
      * @param key   key
      * @param value value
      * @throws Exception any exception during publish
      */
     public void signalPublish(String key, Record value) throws Exception {
         if (stopWork) {
             throw new IllegalStateException("old raft protocol already stop work");
         }
         if (!isLeader()) { //判断当前是否Leader
             ObjectNode params = JacksonUtils.createEmptyJsonNode();
             params.put("key", key);
             params.replace("value", JacksonUtils.transferToJsonNode(value));
             Map<String, String> parameters = new HashMap<>(1);
             parameters.put("key", key);
             
             final RaftPeer leader = getLeader();
             //不是Leader,转发到leader
             raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); 
             return;
         }
         
         OPERATE_LOCK.lock();
         try {
             final long start = System.currentTimeMillis();
             final Datum datum = new Datum();
             datum.key = key;
             datum.value = value;
             if (getDatum(key) == null) {
                 datum.timestamp.set(1L);
             } else {
                 datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
             }
             
             ObjectNode json = JacksonUtils.createEmptyJsonNode();
             json.replace("datum", JacksonUtils.transferToJsonNode(datum));
             json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
             //记录数据到文件,并同步内存数据
             onPublish(datum, peers.local());
             
             final String content = json.toString();
             //门栓控制
             final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
             for (final String server : peers.allServersIncludeMyself()) {
                 if (isLeader(server)) {
                     latch.countDown();
                     continue;
                 }
                 final String url = buildUrl(server, API_ON_PUB);//直接调用其他节点commit接口(不严谨,应该采用两阶段模式)
                 HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                     @Override
                     public void onReceive(RestResult<String> result) {
                         if (!result.ok()) {
                             Loggers.RAFT
                                     .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                             datum.key, server, result.getCode());
                             return;
                         }
                         latch.countDown();
                     }
                     
                     @Override
                     public void onError(Throwable throwable) {
                         Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                     }
                     
                     @Override
                     public void onCancel() {
                     
                     }
                 });
                 
             }
             //由于没有采用2阶段模式,这时异常抛出异常,但是内存数据已经修改了(只能靠心跳重置)
             if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                 // only majority servers return success can we consider this update success
                 Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
                 throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
             }
             
             long end = System.currentTimeMillis();
             Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
         } finally {
             OPERATE_LOCK.unlock();
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    Ribbon 是如何监控到 Nacos 服务的 Endpoint

    NacosDiscoveryEndpoint

    @Endpoint(id = "nacosdiscovery")
    public class NacosDiscoveryEndpoint {
    	...
    	@ReadOperation
    	public Map<String, Object> nacosDiscovery() {
    		Map<String, Object> result = new HashMap<>();
    		result.put("NacosDiscoveryProperties", nacosDiscoveryProperties);
    		
    		//拉取服务通过 NacosServiceManager.getAllInstances(serviceInfo.getName(), serviceInfo.getGroupName())
    		NamingService namingService = nacosServiceManager
    				.getNamingService(nacosDiscoveryProperties.getNacosProperties());
    		List<ServiceInfo> subscribe = Collections.emptyList();
    
    		try {
    			subscribe = namingService.getSubscribeServices();
    			for (ServiceInfo serviceInfo : subscribe) {
    				List<Instance> instances = namingService.getAllInstances(
    						serviceInfo.getName(), serviceInfo.getGroupName());
    				serviceInfo.setHosts(instances);
    			}
    		}
    		catch (Exception e) {
    			log.error("get subscribe services from nacos fail,", e);
    		}
    		result.put("subscribe", subscribe);
    		return result;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    application.yml 开启:

    management:
      endpoint:
      endpoints:
        web:
          exposure:
            include:
              - "*"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    访问 http://localhost:9002/actuator/nacosdiscovery

    在这里插入图片描述

    Nacos源码剖析-服务注册与发现(临时实例AP模式)

    在这里插入图片描述

    Nacos源码剖析-集群数据一致性(持久化实例CP模式Raft协议实现)

    在这里插入图片描述

    Nacos配置中心源码分析

    在这里插入图片描述

    CAP

    CAP 与 BASE 原则详解

    CAP 原则

    • C 一致性 consistency
    • A 可用性 availability
    • P 分区容错性 partition tolerrance

    BASE 原则

    • BA 基本可用 Basically Available
    • S 软状态 Soft State
    • E 最终一致性 Eventual Consistency

    CAP 介绍

    • P 分区容错性: 一般针对的是多节点部署系统,分区指网络分区(由于网路节点故障无法通信),容错是指在系统节点出现分区之后保证服务正常

    在满足P的前提下,client发送一条数据给节点1,因分区这条数据暂时无法到达节点2

    • CP: 如果要保证整个分布式系统的一致性(C), 肯定要牺牲可用性(A),也就是整个分布式系统暂时不可用或者节点2不可用,否则client对节点1和节点2查询获取的数据就不一致
    • AP:如果要保证可用性(A), 肯定需要牺牲一致性(C),因为数据还在同步中,client对节点1和节点2查询的数据会不一致

    抉择

    • CAP 原则是三选二
    • BASE 是CAP折中 CAP都要,但不用100%保证每一个原则
    • 分布式系统必须保证P(分区容错性),之后再A和C之间权衡

    Nacos 临时节点 AP 模式 (Nacos/Eureka/Redis)

    满足系统AP在一定程度上也符合BASE理论

    比如Eureka集群,三个节点挂了2个,系统还是能保持基本可用(BA),此时如果有系统来注册,因为挂了2个节点,这是整个系统各节点数据是不一致的,但是等到挂了的2个节点恢复了,数据会同步过去(E),对于中间暂时的数据不一致状态称为软状态(S)

    Nacos 持久节点 CP 模式 (Nacos/Zookeeper)

    脑裂 集群(M-S情况)通常发生在节点之间通信不可达(分区)情况下

    集群会分裂成不同的小集群,小集群各自选出自己的master节点,导致原有的集群出现多个master节点情况

    nacos 和 zookeeper 是如何避免脑裂问题的?

    leader 选举要求节点数量 > 总节点数量/2 , 有这个原则保证集群出现分区,最后只有一个小集群能选出leader

    主从模式的集群节点个数为什么一般推荐是奇数?(偶数也行,但是很多问题)

    1. 偶数个节点的集群一单节点对半分区(比如4个节点分成2个节点和2个节点情况),整个集群无法选出leader,集群也无法提供服务(这种情况就无法满足CAP中的P分区容错性)
    2. 容错能力相同情况下奇数个节点比偶数个节点节约资源,比如5个节点挂2个还能选leader,一个节点最多也只能挂2个节点才能保证选出leader

    Nacos 、Zookeeper、Eureka 的CAP架构横向对比

    Raft 协议理解

    Nacos(CP) 类似Raft协议
    Zookeeper ZAB协议

    Raft 和 ZAB 协议都是分布式一致性协议Paxos的简化,两者类似,主要包括两部分:

    1. Leader 选举(半数以上节点投票同意)
    2. 集群写入数据同步(两阶段提交,半数以上节点写入成功)

    1. 主要区别–选举:

    Raft

    • Raft 选举随机有一个休眠时间,谁先苏醒,谁先发起投票,有可能成功leader
    • Raft 谁先发选票,谁就是leader, 如果是同时发选票,需要重新发起进入下一轮

    Zab

    • Zab是全部需要发起投票,比较zxid, myid(选票需要PK之后再比对)
    • Zab 不管是否收到投票,先给自己投一票,再发出去,之后再PK,比较zxid, myid

    2. 主要区别–数据同步心跳:

    Raft

    • Raft 数据同步发送给follower之后,发起心跳

    Zab

    • Zab tcp 长链接

    http://thesecretlivesofdata.com/raft/

    https://raft.github.io/

    Nacos Raft 问题

    1. Nacos 集群数据同步,先写内存,再向其他节点发起同步----没有严格使用两阶段同步
    2. Nacos 集群心跳 带上所有key 再批量获取比较更新(大数据量几十万情况)

    基于云Sass的超大规模注册中心架构设计

    类似于Redis集群的分片存储

    概念

    • 集群:同一个业务,部署在不同服务器上
    • 分布式:
      • 分布式部署: 一个业务拆分不同子业务,每一个子业务分别部署不同服务器上
      • 分布式存储:存储在一台机器上的数据被拆分成多份存储在不同的机器上
    • 微服务:一种分布式部署架构

    关键点

    • Nacos 服务注册与发现源码分析
    • Nacos高并发支撑异步任务与内存队列 Notifier(tasks.offer/take)
    • Nacos注册表如何防止多节点读写并发冲突 run handle updateIps(copy on write)
    • Nacos心跳机制与服务健康检查&服务下线
    • Nacos服务变动事件发布
    • Nacos集群心跳设计
    • Nacos集群状态同步
    • Nacos集群新增数据同步
    • Nacos集群状态变更同步

    1. Nacos高并发支撑异步任务与内存队列 Notifier(tasks.offer/take)

    @DependsOn("ProtocolManager")
    @org.springframework.stereotype.Service("distroConsistencyService")
    public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
        ...
        private volatile Notifier notifier = new Notifier();
        ...
        @PostConstruct
        public void init() {
            GlobalExecutor.submitDistroNotifyTask(notifier);
        }
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2. Nacos注册表如何防止多节点读写并发冲突 Notifier.run (copy on write)

    Cluster#updateIps

    /**
     * Update instance list.
     *
     * @param ips       instance list
     * @param ephemeral whether these instances are ephemeral
     */
    public void updateIps(List<Instance> ips, boolean ephemeral) {
        
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
        
        HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
        
        for (Instance ip : toUpdateInstances) {
            oldIpMap.put(ip.getDatumKey(), ip);
        }
        
        List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
        if (updatedIPs.size() > 0) {
            for (Instance ip : updatedIPs) {
                Instance oldIP = oldIpMap.get(ip.getDatumKey());
                
                // do not update the ip validation status of updated ips
                // because the checker has the most precise result
                // Only when ip is not marked, don't we update the health status of IP:
                if (!ip.isMarked()) {
                    ip.setHealthy(oldIP.isHealthy());
                }
                
                if (ip.isHealthy() != oldIP.isHealthy()) {
                    // ip validation status updated
                    Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                            (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
                }
                
                if (ip.getWeight() != oldIP.getWeight()) {
                    // ip validation status updated
                    Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
                            ip.toString());
                }
            }
        }
        
        List<Instance> newIPs = subtract(ips, oldIpMap.values());
        if (newIPs.size() > 0) {
            Loggers.EVT_LOG
                    .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                            getName(), newIPs.size(), newIPs.toString());
            
            for (Instance ip : newIPs) {
                HealthCheckStatus.reset(ip);
            }
        }
        
        List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
        
        if (deadIPs.size() > 0) {
            Loggers.EVT_LOG
                    .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                            getName(), deadIPs.size(), deadIPs.toString());
            
            for (Instance ip : deadIPs) {
                HealthCheckStatus.remv(ip);
            }
        }
        
        toUpdateInstances = new HashSet<>(ips);
        
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    3. Nacos心跳机制与服务健康检查

    客户端心跳机制NacosNamingService#registerInstance

     public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
         NamingUtils.checkInstanceIsLegal(instance);
         String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
         if (instance.isEphemeral()) { //如果是临时节点添加心跳检查
             BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
             this.beatReactor.addBeatInfo(groupedServiceName, beatInfo); //5秒之后开启任务
         }
    
         this.serverProxy.registerService(groupedServiceName, groupName, instance);
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    嵌套调用5秒钟延时

    class BeatTask implements Runnable {
    	...
    	public void run() {
    		...
    		//发送心跳请求
    		JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
    		...
    		//嵌套调用
    		BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
    	}
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15L);
    DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
    DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5L);
    
    • 1
    • 2
    • 3

    服务端健康检查

    • serviceManager.registerInstance(namespaceId, serviceName, instance);
    • createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    • putServiceAndInit(service);
    • service.init();

    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);

    futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));

    public class ClientBeatCheckTask implements Runnable {
    	...
    	public void run() {
    		...
                
                List<Instance> instances = service.allIPs(true);
                
                // first set health status of instances:
                for (Instance instance : instances) {
                	//超过15秒健康状态标记为false
                    if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                        if (!instance.isMarked()) {
                            if (instance.isHealthy()) {
                                instance.setHealthy(false);
                                Loggers.EVT_LOG
                                        .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                                instance.getIp(), instance.getPort(), instance.getClusterName(),
                                                service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                                instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                                getPushService().serviceChanged(service);
                                ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                            }
                        }
                    }
                }
                
                if (!getGlobalConfig().isExpireInstance()) {
                    return;
                }
                
                // then remove obsolete instances:
                for (Instance instance : instances) {
                    
                    if (instance.isMarked()) {
                        continue;
                    }
                    //超过30秒调用删除接口删除实例
                    if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                        // delete instance
                        Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                                JacksonUtils.toJson(instance));
                        deleteIp(instance);
                    }
                }
    		...
    	}
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    4. Nacos服务变动事件发布–保证时性–主动推送 UDP

    • Notifier.run
    • handle(pair);
    • listener.onChange(datumKey, dataStore.get(datumKey).value);
    • Service#updateIPs
    • getPushService().serviceChanged(this);

    this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));

    public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
    	...
        @Override
        public void onApplicationEvent(ServiceChangeEvent event) {
        	...
        	Future future = GlobalExecutor.scheduleUdpSender(() -> {
        		...
        		udpPush(ackEntry); //向客户端发送UDP请求
        	}
        	...
        }
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    UDP端口是在客户端发起心跳时指定的UDP端口

    • NacosNamingService#getAllInstances
    • hostReactor.getServiceInfo
    • hostReactor.updateServiceNow
      String result = this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);

    对比 Zookeeper VS Nacos

    Zookeeper 通过tcp长链接发送通知
    Nacos UDP这种模式相比Zookeeper 节约资源,就算大量节点也不会出现性能瓶颈
    Nacos客户端如果接收到udp消息会返回ACK,如果一定时间Nacos Service没有收到ACK, 会进行重试,当超过一定重发时间之后不再重发,虽然UDP不能保证真正的送达到订阅者,但是Nacos还有定时轮询基础
    Nacos通过这两种手段,保证了实时性,又保证了数据更新不会丢失

    Euroke

    底层是
    ReadOnlyCache
    ReadWriteCache
    注册表Map>

    服务注册直接写注册表,定时同步到ReadWriteCache,再定时同步到ReadOnlyCache
    服务发现是读取ReadOnlyCache

    5. Nacos集群心跳设计 (serviceName.hashCode % Integer.MAX_VALUE) % servers.size()

    • InstanceController#register
    • serviceManager#registerInstance
    • serviceManager#createEmptyService
    • serviceManager#createServiceIfAbsent
    • serviceManager#putServiceAndInit
    • service#init
    • HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    public class ClientBeatCheckTask implements Runnable {
    	...
    	public void run() {
    	     if (!getDistroMapper().responsible(service.getName())) {
                 return;
             }
    	}    
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    DistroMapper#responsible ⇒ (serviceName.hashCode % Integer.MAX_VALUE) % servers.size()

     public boolean responsible(String serviceName) {
         final List<String> servers = healthyList;
         
         if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
             return true;
         }
         
         if (CollectionUtils.isEmpty(servers)) {
             // means distro config is not ready yet
             return false;
         }
         
         int index = servers.indexOf(EnvUtil.getLocalAddress());
         int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
         if (lastIndex < 0 || index < 0) {
             return true;
         }
         
         int target = distroHash(serviceName) % servers.size();
         return target >= index && target <= lastIndex;
     }
     private int distroHash(String serviceName) {
         return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    6. Nacos集群状态同步 ServerStatusReporter.init

    @Component("serverListManager")
    public class ServerListManager extends MemberChangeListener {
        @PostConstruct
        public void init() {
        	//Nacos集群状态同步
            GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
            GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
     private class ServerStatusReporter implements Runnable {
         
         @Override
         public void run() {
         	...
         	synchronizer.send(server.getAddress(), msg);
         	//String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator/server/status";
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    7. Nacos集群状态变更同步

    @Component
    public class ServiceManager implements RecordListener<Service> {
        @PostConstruct
        public void init() {
            GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
            
            GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
            
            if (emptyServiceAutoClean) {
                
                Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms",
                        cleanEmptyServiceDelay, cleanEmptyServicePeriod);
                
                // delay 60s, period 20s;
                
                // This task is not recommended to be performed frequently in order to avoid
                // the possibility that the service cache information may just be deleted
                // and then created due to the heartbeat mechanism
                
                GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay,
                        cleanEmptyServicePeriod);
            }
            
            try {
                Loggers.SRV_LOG.info("listen for service meta change");
                consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
            } catch (NacosException e) {
                Loggers.SRV_LOG.error("listen for service meta change failed!");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    ServiceReporter

    private class ServiceReporter implements Runnable {
        
        @Override
        public void run() {
        	...
                        Message msg = new Message();
                        
                        msg.setData(JacksonUtils.toJson(checksum));
                        
                        Collection<Member> sameSiteServers = memberManager.allMembers();
                        
                        if (sameSiteServers == null || sameSiteServers.size() <= 0) {
                            return;
                        }
                        
                        for (Member server : sameSiteServers) {
                            if (server.getAddress().equals(NetUtils.localServer())) {
                                continue;
                            }
                            synchronizer.send(server.getAddress(), msg);
                            //String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator/server/status";
                        }
        	...
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    8. Nacos集群新增数据同步 ServiceManager.init

    • InstanceController#register
    • serviceManager#registerInstance
    • serviceManager#addInstance
    • consistencyService.put(key, instances);
    • DelegateConsistencyServiceImpl#mapConsistencyService(key).put(key, value);
    • DistroConsistencyServiceImpl#put

    DistroProtocol#sync

    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        for (Member each : memberManager.allMembersWithoutSelf()) {
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                    each.getAddress());
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    写的异常绕总结就是新增添加到map里面,异步同步到其他节点 :distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);

    @Component
    public class DistroTaskEngineHolder {
        private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
    
        private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();
    
    	....
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    DistroDelayTaskExecuteEngine

    public class DistroDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
        
        public DistroDelayTaskExecuteEngine() {
            super(DistroDelayTaskExecuteEngine.class.getName(), Loggers.DISTRO);
        }
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    NacosDelayTaskExecuteEngine

    public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
    	...
        public NacosDelayTaskExecuteEngine(String name, Logger logger) {
            this(name, 32, logger, 100L);
        }
        ...
        public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
            super(logger);
            tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
            processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
            processingExecutor
                    .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
        }   
        ... 
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    ProcessRunnable

        private class ProcessRunnable implements Runnable {
            @Override
            public void run() {
                try {
                    processTasks();
                } catch (Throwable e) {
                    getEngineLog().error(e.toString(), e);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • processor.process(task)
    • distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
    • worker.process(task);
    • queue.put(task);
    • InnerWorker.run
    • task.run();
    • DistroSyncChangeTask.run
    • result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
    • 调用server实例同步接口 /distro/datum

    9. Nacos集群新节点启动时向其他节点拉取数据的同步流程

    @Component
    public class DistroProtocol {
    	...
        public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
                DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
    		...
            startDistroTask();
        }
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • startDistroTask
    • startLoadTask
    • GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
    • DistroLoadDataTask.run
    • load()
    • loadAllDataSnapshotFromRemote(each)
    //新节点从其他节点同步一次拉取数据
    DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
    boolean result = dataProcessor.processSnapshot(distroData);
    
    • 1
    • 2
    • 3

    10. Nacos集群CP数据同步–通过心跳同步

    • leader 发送心跳每隔一段时间发送一次心跳 keys gzip 压缩
    • Follower 收到数据打包一批一批到leader拉取数据同步批量50个一批(keys gzip)
    @Component
    public class RaftCore implements Closeable {
    @PostConstruct
        public void init() throws Exception {
            Loggers.RAFT.info("initializing Raft sub-system");
            final long start = System.currentTimeMillis();
            //加载持久化实例数据
            raftStore.loadDatums(notifier, datums);
            //设置周期
            setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
    
            Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
    
            initialized = true;
    
            Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
            //定时执行集群leader选举任务
            masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
            //定时执行集群心跳任务
            heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
    
            versionJudgement.registerObserver(isAllNewVersion -> {
                stopWork = isAllNewVersion;
                if (stopWork) {
                    try {
                        shutdown();
                        raftListener.removeOldRaftMetadata();
                    } catch (NacosException e) {
                        throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
                    }
                }
            }, 100);
    
            NotifyCenter.registerSubscriber(notifier);
    
            Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
                    GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    HeartBeat

    public class HeartBeat implements Runnable {
    	....
    	private void sendBeat() throws IOException, InterruptedException {
    	}
    	....
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    C++ Reference: Standard C++ Library reference: C Library: ctime: gmtime
    2、24 个常见的 Docker 疑难杂症处理技巧(二)
    新美域杂志新美域杂志社新美域编辑部2022年第6期目录
    ffmpeg推流报错
    微信小程序音频播放 InnerAudioContext 的用法
    【Linux学习】—Linux常用指令(一)
    LeetCode 43. 字符串相乘
    Web前端开发者的福音,这款APP让你更上一层楼
    Nginx 重新编译添加新的模块
    通过Power Platform自定义D365 CE 业务需求 - 3. 使用Microsoft Power应用程序
  • 原文地址:https://blog.csdn.net/menxu_work/article/details/126888164