• Nacos Client 端服务注册流程源码分析


    前言:

    前面的系列文章让我们对 Nacos 有了一个基本了解,并知道了如何去试用 Nacos 作为注册中心和配置中心,并且知道了是何时触发 Nacos 的服务注册流程,本篇我们继续从源码层面去分析 Nacos 的服务注册流程。

    Nacos 系列文章传送门:

    Nacos 初步认识和 Nacos 部署细节

    Nacos 配置管理模型 – 命名空间(Namespace)、配置分组(Group)和配置集ID(Data ID)

    Nacos 注册中心和配置中心【实战】

    服务启动何时触发 Nacos 的注册流程?

    Nacos Client 服务注册源码分析

    NacosServiceRegistry#register 方法源码分析

    前文我们分析了服务注册到 Nacos 最终是调用了 NacosServiceRegistry#register 方法,该方法中我们看到了 Namespace、Group 等概念,最终又调用了 NacosNamingService#registerInstance 方法完成服务注册。

    //com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
    public void register(Registration registration) {
    	//判断注册实例信息中的 service ID 是否为空 
    	if (StringUtils.isEmpty(registration.getServiceId())) {
    		//为空 记录日志
    		log.warn("No service to register for nacos client...");
    	} else {
    		//不为空 创建名称空间 service
    		NamingService namingService = this.namingService();
    		//获取 serviceid
    		String serviceId = registration.getServiceId();
    		//获取名称空间下的组 就是 nacos 中的组 默认是  DEFAULT_GROUP 其实就是读取配置文件中配置的组信息
    		String group = this.nacosDiscoveryProperties.getGroup();
    		//构建 Instance 实例
    		Instance instance = this.getNacosInstanceFromRegistration(registration);
    
    		try {
    			//注册服务实例
    			namingService.registerInstance(serviceId, group, instance);
    			log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
    		} catch (Exception var7) {
    			log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
    			ReflectionUtils.rethrowRuntimeException(var7);
    		}
    
    	}
    }
    
    //com.alibaba.cloud.nacos.registry.NacosServiceRegistry#getNacosInstanceFromRegistration
    private Instance getNacosInstanceFromRegistration(Registration registration) {
    	//创建实例对象
    	Instance instance = new Instance();
    	//设置ip
    	instance.setIp(registration.getHost());
    	//设置端口
    	instance.setPort(registration.getPort());
    	//设置权重
    	instance.setWeight((double)this.nacosDiscoveryProperties.getWeight());
    	//设置集群名称
    	instance.setClusterName(this.nacosDiscoveryProperties.getClusterName());
    	//设置元数据
    	instance.setMetadata(registration.getMetadata());
    	return instance;
    }
    

    NamingProxy#reqAPI 方法源码解析

    NamingProxy#reqAPI 方法会获取所有的 Nacos Server 的地址,然后区分 Nacos Server 是配置的服务列表还是域名,根据不同配置,区分情况发送请求到 Nacos Server 完成服务注册,服务注册的地址是:/nacos/v1/ns/instance。

    //com.alibaba.nacos.client.naming.net.NamingProxy#reqAPI(java.lang.String, java.util.Map, java.lang.String)
    public String reqAPI(String api, Map<String, String> params, String method) throws NacosException {
    	//Nacos 服务地址快照
    	List<String> snapshot = this.serversFromEndpoint;
    	if (!CollectionUtils.isEmpty(this.serverList)) {
    		//serverList 不为空 再次赋值
    		snapshot = this.serverList;
    	}
    	//调用 reqAPI 方法
    	return this.reqAPI(api, params, snapshot, method);
    }
    
    
    //com.alibaba.nacos.client.naming.net.NamingProxy#reqAPI(java.lang.String, java.util.Map, java.util.List, java.lang.String)
    public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
    	//获取 namespaceId id
    	params.put("namespaceId", this.getNamespaceId());
    	//Nacos 服务地址是否为控股
    	//nacosDomain nacos 域名是否为空
    	if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
    		throw new IllegalArgumentException("no server available");
    	} else {
    		//异常
    		Exception exception = new Exception();
    		//Nacos Server 为空判断
    		if (servers != null && !servers.isEmpty()) {
    			//获取 Nacos 服务数的随机数
    			Random random = new Random(System.currentTimeMillis());
    			int index = random.nextInt(servers.size());
    
    			for(int i = 0; i < servers.size(); ++i) {
    				//获取 Nacos Server
    				String server = (String)servers.get(index);
    
    				try {
    					//发送请求到 Nacos Server 完成注册
    					return this.callServer(api, params, server, method);
    				} catch (NacosException var11) {
    					exception = var11;
    					LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
    				} catch (Exception var12) {
    					exception = var12;
    					LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
    				}
    
    				index = (index + 1) % servers.size();
    			}
    
    			throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
    		} else {
    			//配置的 Nacos 域名的情况 
    			int i = 0;
    			//最多重试 三次
    			while(i < 3) {
    				try {
    					//发送请求到 Nacos Server 完成注册
    					return this.callServer(api, params, this.nacosDomain);
    				} catch (Exception var13) {
    					exception = var13;
    					LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);
    					++i;
    				}
    			}
    
    			throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
    		}
    	}
    }
    
    static {
    	VERSION = "Nacos-Java-Client:v" + VersionUtils.VERSION;
    	WEB_CONTEXT = "/nacos";
    	NACOS_URL_BASE = WEB_CONTEXT + "/v1/ns";
    	NACOS_URL_INSTANCE = NACOS_URL_BASE + "/instance";
    	NACOS_URL_SERVICE = NACOS_URL_BASE + "/service";
    	DEFAULT_CLIENT_BEAT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() / 2 : 1;
    	DEFAULT_POLLING_THREAD_COUNT = Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() / 2 : 1;
    }
    

    NamingProxy#callServer 方法源码分析

    NamingProxy#callServer 方法逻辑十分简单,校验签名,构建 Header,发送 Http 请求到 Nacos Server,完成服务注册。

    //com.alibaba.nacos.client.naming.net.NamingProxy#callServer(java.lang.String, java.util.Map, java.lang.String, java.lang.String)
    public String callServer(String api, Map<String, String> params, String curServer, String method) throws NacosException {
    	long start = System.currentTimeMillis();
    	long end = 0L;
    	//检查签名
    	this.checkSignature(params);
    	//构建 请求 header 
    	List<String> headers = this.builderHeaders();
    	String url;
    	//构建请求地址
    	if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) {
    		if (!curServer.contains(":")) {
    			curServer = curServer + ":" + this.serverPort;
    		}
    
    		url = HttpClient.getPrefix() + curServer + api;
    	} else {
    		url = curServer + api;
    	}
    	//通过 HttpClient 发送请求
    	HttpResult result = HttpClient.request(url, headers, params, "UTF-8", method);
    	end = System.currentTimeMillis();
    	//当前请求的指标监控
    	MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe((double)(end - start));
    	//响应结果处理
    	if (200 == result.code) {
    		return result.content;
    	} else if (304 == result.code) {
    		return "";
    	} else {
    		throw new NacosException(500, "failed to req API:" + curServer + api + ". code:" + result.code + " msg: " + result.content);
    	}
    }
    

    Nacos Client 心跳发送与服务续约

    上面分析 Nacos Client 服务注册的时候提到了心跳发送,这里我们分析一下 Nacos Client 的心跳发生机制和服务续约。

    BeatInfo 类

    BeatInfo 类是 Nacot 的心跳类,其中的属性也很简单,都是和服务实例相关,如下:

    public class BeatInfo {
    	//端口
        private int port;
    	//ip
        private String ip;
    	//权重
        private double weight;
    	//服务名称
        private String serviceName;
    	//集群
        private String cluster;
    	//元数据信息
        private Map<String, String> metadata;
    	//调度
        private volatile boolean scheduled;
    	//周期
        private volatile long period;
        private volatile boolean stopped;
    
        public BeatInfo() {
        }
    	
    }
    

    BeatTask 心跳任务类

    BeatTask 作为服务续约的心跳线程对象,run方法中 通过 BeatReactor.this.serverProxy.sendBeat 发送心跳。

    //com.alibaba.nacos.client.naming.beat.BeatReactor.BeatTask
    class BeatTask implements Runnable {
    	BeatInfo beatInfo;
    
    	public BeatTask(BeatInfo beatInfo) {
    		this.beatInfo = beatInfo;
    	}
    
    	public void run() {
    		//判断是否有在进行中的心跳
    		if (!this.beatInfo.isStopped()) {
    			//发送心跳
    			long result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo);
    			//判断发送心跳的耗时 得到下一次心跳的时间
    			long nextTime = result > 0L ? result : this.beatInfo.getPeriod();
    			//定时心跳发送
    			BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
    		}
    	}
    }
    

    BeatReactor#addBeatInfo 发送心跳

    BeatReactor#addBeatInfo 方法使用线程池 ScheduledExecutorService 来实现心跳发送,默认频率是5秒一次。

    //com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo 
    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    	LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    	//构建 心跳key
    	String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    	BeatInfo existBeat = null;
    	//防止并发 
    	if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
    		existBeat.setStopped(true);
    	}
    	//加入到 dom2Beat 中记录
    	this.dom2Beat.put(key, beatInfo);
    	//使用线程池 定期执行心跳任务 周期为 period 默认 5 秒 
    	this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    	//指标监控
    	MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
    }
    

    至此,Nacos Client 的服务注册流程分析完毕,后面我们继续分析,服务端的注册机制。

    欢迎提出建议及对错误的地方指出纠正。

  • 相关阅读:
    Java中各种数据格式-json/latex/obo/rdf/ turtle/owl/xml介绍对比示例加使用介绍
    Java之IO简述 第一篇——File类
    WinForm应用实战开发指南 - 教你如何实现表头的全选操作?
    17 | Spark中的map、flatMap、mapToPair mapvalues 的区别
    苹果 MacBook如何取消开盖自动开机功能?
    Tomcat安装部署及构建虚拟主机
    正则表示式——6.处理比较复杂的正则表示法
    (数字图像处理MATLAB+Python)第十章图像分割-第四,五节:分水岭分割和综合案例
    Unity之MVC思想(通过普通方法和使用MVC思想完成同一个小案例:掌握MVC简单框架)
    centos7安装mysql8
  • 原文地址:https://blog.csdn.net/weixin_42118323/article/details/140234175