• 【微服务】Nacos服务发现源码分析


    💖Spring家族及微服务系列文章

    【微服务】SpringBoot监听器机制以及在Nacos中的应用

    【微服务】Nacos服务端完成微服务注册以及健康检查流程

    【微服务】Nacos客户端微服务注册原理流程

    【微服务】SpringCloud中使用Ribbon实现负载均衡的原理

    【微服务】SpringBoot启动流程注册FeignClient

    【微服务】SpringBoot启动流程初始化OpenFeign的入口

    Spring Bean的生命周期

    Spring事务原理

    SpringBoot自动装配原理机制及过程

    SpringBoot获取处理器流程

    SpringBoot中处理器映射关系注册流程

    Spring5.x中Bean初始化流程

    Spring中Bean定义的注册流程

    Spring的处理器映射器与适配器的架构设计

    SpringMVC执行流程图解及源码

    目录

    💖Spring家族及微服务系列文章

    💖前言

    💖Nacos服务发现

    ✨流程图

    ✨服务发现的入口

    💫SpringCloud原生项目spring-cloud-commons

    💫Nacos是如何继承下来的?

    💫NacosServiceDiscovery#getInstances()获取服务实例

    ✨NacosNamingService初始化流程

    💖NacosNamingService构造初始化

    💫HostReactor构造初始化

    💖PushReceiver构造初始化

    💫PushReceiver#run

    ✨从集成的client模块本地服务发现

    💫获取服务实例列表

    💖从本地缓存/发送http从服务端获取服务信息

    💫从本地缓存获取

    💫发送HTTP调用从Nacos服务端获取

    💫scheduleUpdateIfAbsent()

    💫UpdateTask#run()任务逻辑

    💫queryList()发送http请求注册中心


    💖前言

         这篇文章就介绍下,服务发现的入口是什么?本地缓存数据结构、缓存时机、如果缓存中没有如何处理?使用了定时任务,那定时任务的底层基于什么的、它是干什么的、定时间隔?监听服务端UDP通知、发送ACK?发送http请求到服务端,谁发的、如何接收?服务端如何推送服务实例的、采用什么方式?带着这些问题,下面我们来探究探究。

        注意:Nacos源码版本为1.x

    💖Nacos服务发现

    ✨流程图

    ✨服务发现的入口

    💫SpringCloud原生项目spring-cloud-commons

        你会发现@EnableDiscoveryClient注解也是在spring-cloud-commons项目,还有个discovery文件夹。我们本节注意下DiscoveryClient接口,以及其中声明的接口方法。SpringCloud是由几个关键项目组成的,spring-cloud-commons项目是其中之一。SpringCloud Alibaba也不是完全替代SpringCloud的,一些基本的规范还是继承下来了,做扩展等。

    💫Nacos是如何继承下来的?

        Nacos是通过自己的spring-cloud-alibaba-nacos-discovery项目去集成到SpringCloud的以及基于SpringBoot的自动装配机制集成到SpringBoot项目的。而服务发现方面,NacosDiscoveryClient 实现了spring-cloud-commons项目的DiscoveryClient接口,即Nacos中服务发现入口是NacosDiscoveryClient类。

    点击方法继续跟进到下面的逻辑

    💫NacosServiceDiscovery#getInstances()获取服务实例

    1. public List getInstances(String serviceId) throws NacosException {
    2. // 获取配置文件组信息
    3. String group = this.discoveryProperties.getGroup();
    4. // 调用API模块中NamingService的selectInstances()方法,
    5. // 引用是NacosNamingService的反射获取,之前文章已分析
    6. List instances = this.namingService().selectInstances(serviceId, group, true);
    7. // 将Nacos的服务实例适配为SpringCloud的ServiceInstance服务实例
    8. return hostToServiceInstanceList(instances, serviceId);
    9. }

    主要逻辑:

    1. 获取配置文件组信息
    2. 调用API模块中NamingService接口的selectInstances()方法。引用是NacosNamingService的,通过反射获取,之前文章已详细分析。NacosNamingService是Nacos的client模块里面的一个组件,下面分析。
    3. 将Nacos的服务实例适配为SpringCloud的ServiceInstance服务实例

    ✨NacosNamingService初始化流程

        它的构造方法是在NamingFactory通过反射方式调用的,上面也提到了。因为这个流程也是不小的,故在获取服务实例前先讲解。

    💖NacosNamingService构造初始化

    1. public NacosNamingService(Properties properties) throws NacosException {
    2. init(properties);
    3. }
    4. private void init(Properties properties) throws NacosException {
    5. ValidatorUtils.checkInitParam(properties);
    6. this.namespace = InitUtils.initNamespaceForNaming(properties);
    7. InitUtils.initSerialization();
    8. initServerAddr(properties);
    9. InitUtils.initWebRootContext();
    10. initCacheDir();
    11. initLogName(properties);
    12. this.eventDispatcher = new EventDispatcher();
    13. // 初始化服务代理
    14. this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
    15. // 初始化心跳组件
    16. this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
    17. // 初始化hostReactor
    18. this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
    19. isLoadCacheAtStart(properties), initPollingThreadCount(properties));
    20. }

    初始化服务代理、心跳发送组件以及hostReactor,重点看hostReactor的构造初始化

    💫HostReactor构造初始化

    1. public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,
    2. String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {
    3. // init executorService
    4. this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
    5. @Override
    6. public Thread newThread(Runnable r) {
    7. Thread thread = new Thread(r);
    8. thread.setDaemon(true);
    9. thread.setName("com.alibaba.nacos.client.naming.updater");
    10. return thread;
    11. }
    12. });
    13. this.eventDispatcher = eventDispatcher;
    14. this.beatReactor = beatReactor;
    15. this.serverProxy = serverProxy;
    16. this.cacheDir = cacheDir;
    17. // 初始化本地缓存
    18. if (loadCacheAtStart) {
    19. this.serviceInfoMap = new ConcurrentHashMap(DiskCache.read(this.cacheDir));
    20. } else {
    21. this.serviceInfoMap = new ConcurrentHashMap(16);
    22. }
    23. this.updatingMap = new ConcurrentHashMap();
    24. this.failoverReactor = new FailoverReactor(this, cacheDir);
    25. // 初始化pushReceiver
    26. this.pushReceiver = new PushReceiver(this);
    27. }

    初始化本地缓存、pushReceiver,重点关注PushReceiver的构造方法

    💖PushReceiver构造初始化

    1. public PushReceiver(HostReactor hostReactor) {
    2. try {
    3. this.hostReactor = hostReactor;
    4. // 初始化udp套接字
    5. this.udpSocket = new DatagramSocket();
    6. // 启动一个线程
    7. this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
    8. @Override
    9. public Thread newThread(Runnable r) {
    10. Thread thread = new Thread(r);
    11. thread.setDaemon(true);
    12. thread.setName("com.alibaba.nacos.naming.push.receiver");
    13. return thread;
    14. }
    15. });
    16. // 执行任务,下面的run()
    17. this.executorService.execute(this);
    18. } catch (Exception e) {
    19. NAMING_LOGGER.error("[NA] init udp socket failed", e);
    20. }
    21. }

    初始化udp套接字用于监听注册中心变更服务推送以及发送ack确认、启动一个线程死循环用于监听注册中心udp推送服务变更、执行任务,this就是PushReceiver的引用即任务,所以执行下面的run()逻辑。

    💫PushReceiver#run

    1. @Override
    2. public void run() {
    3. while (!closed) {
    4. try {
    5. // byte[] is initialized with 0 full filled by default
    6. byte[] buffer = new byte[UDP_MSS];
    7. DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
    8. // 监听Nacos服务端服务实例信息变更后的通知
    9. udpSocket.receive(packet);
    10. String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
    11. NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
    12. PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
    13. String ack;
    14. if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
    15. // 将数据缓存到本地
    16. hostReactor.processServiceJson(pushPacket.data);
    17. // send ack to server
    18. ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
    19. + "\"\"}";
    20. } else if ("dump".equals(pushPacket.type)) {
    21. // dump data to server
    22. ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
    23. + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
    24. + "\"}";
    25. } else {
    26. // do nothing send ack only
    27. ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
    28. + "\", \"data\":" + "\"\"}";
    29. }
    30. // 发送ack到服务端
    31. udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
    32. packet.getSocketAddress()));
    33. } catch (Exception e) {
    34. NAMING_LOGGER.error("[NA] error while receiving push data", e);
    35. }
    36. }
    37. }

    主要逻辑:

    1. 监听Nacos服务端服务实例信息变更后的通知
    2. 解析注册中心推送的结果,组装回调ack报文,将注册中心推送的变更服务信息缓存到本地
    3. 发送ack到注册中心,以便注册中心决定是否需要重试。

    ✨从集成的client模块本地服务发现

        本节点讲解的就是客户端服务发现,之所以这样说是因为SpringBoot的自动装配将Nacos的client模块集成进来了,想了解更多去看前面的文章分析。

    💫获取服务实例列表

    调用重载的selectInstances()方法,healthy默认true即健康,subscribe默认true即订阅

    1. @Override
    2. public List selectInstances(String serviceName, String groupName, List clusters, boolean healthy,
    3. boolean subscribe) throws NacosException {
    4. ServiceInfo serviceInfo;
    5. // 默认订阅模式
    6. if (subscribe) {
    7. // 委托hostReactor处理
    8. serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
    9. StringUtils.join(clusters, ","));
    10. } else {
    11. serviceInfo = hostReactor
    12. .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
    13. StringUtils.join(clusters, ","));
    14. }
    15. // 选取健康实例
    16. return selectInstances(serviceInfo, healthy);
    17. }

        默认使用订阅模式,但是委托hostReactor去获取服务信息,以服务名、分组拼接作为入参即Nacos可识别的服务名。

    💖从本地缓存/发送http从服务端获取服务信息

    1. public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    2. // failover-mode:默认false
    3. NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    4. String key = ServiceInfo.getKey(serviceName, clusters);
    5. if (failoverReactor.isFailoverSwitch()) {
    6. return failoverReactor.getService(key);
    7. }
    8. // 从本地缓存serviceInfoMap获取
    9. ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    10. // 如果本地缓存中没有,则发送HTTP调用从Nacos服务端获取
    11. if (null == serviceObj) {
    12. serviceObj = new ServiceInfo(serviceName, clusters);
    13. serviceInfoMap.put(serviceObj.getKey(), serviceObj);
    14. updatingMap.put(serviceName, new Object());
    15. // 更新服务
    16. updateServiceNow(serviceName, clusters);
    17. updatingMap.remove(serviceName);
    18. } else if (updatingMap.containsKey(serviceName)) {
    19. if (UPDATE_HOLD_INTERVAL > 0) {
    20. // hold a moment waiting for update finish等待更新完成
    21. synchronized (serviceObj) {
    22. try {
    23. serviceObj.wait(UPDATE_HOLD_INTERVAL);
    24. } catch (InterruptedException e) {
    25. NAMING_LOGGER
    26. .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
    27. }
    28. }
    29. }
    30. }
    31. // 开启一个定时任务,每隔1秒从Nacos服务端获取最新的服务实例信息,
    32. // 更新到本地缓存seriveInfoMap中
    33. scheduleUpdateIfAbsent(serviceName, clusters);
    34. // 从本地缓存serviceInfoMap中获取服务实例信息
    35. return serviceInfoMap.get(serviceObj.getKey());
    36. }

    主要逻辑:

    1. 从本地缓存serviceInfoMap获取
    2. 如果本地缓存中没有,则发送HTTP调用从Nacos服务端获取
    3. 开启一个定时任务,每隔1秒从Nacos服务端获取最新的服务实例信息, 更新到本地缓存seriveInfoMap中
    4.  从本地缓存serviceInfoMap中获取服务实例信息

    💫从本地缓存获取

    1. private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
    2. String key = ServiceInfo.getKey(serviceName, clusters);
    3. // 本地缓存serviceInfoMap获取
    4. return serviceInfoMap.get(key);
    5. }

    就单纯地从本地缓存serviceInfoMap获取

    💫发送HTTP调用从Nacos服务端获取

    1. public void updateService(String serviceName, String clusters) throws NacosException {
    2. ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    3. try {
    4. // 通过NamingProxy发送HTTP调用,获取服务信息
    5. String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
    6. if (StringUtils.isNotEmpty(result)) {
    7. // 更新本地缓存serviceInfoMap
    8. processServiceJson(result);
    9. }
    10. } finally {
    11. if (oldService != null) {
    12. synchronized (oldService) {
    13. oldService.notifyAll();
    14. }
    15. }
    16. }
    17. }

        通过NamingProxy发送HTTP调用,获取服务信息;响应结果不为空更新本地缓存serviceInfoMap

    💫scheduleUpdateIfAbsent()

    1. public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    2. if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
    3. return;
    4. }
    5. synchronized (futureMap) {
    6. if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
    7. return;
    8. }
    9. // 启动定时任务
    10. ScheduledFuture future = addTask(new UpdateTask(serviceName, clusters));
    11. futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    12. }
    13. }

    DEFAULT_DELAY默认1000在这里即1秒。启动定时任务,每隔1秒执行一次,任务逻辑如下:

    💫UpdateTask#run()任务逻辑

    1. @Override
    2. public void run() {
    3. long delayTime = DEFAULT_DELAY;
    4. try {
    5. // 尝试从本地获取
    6. ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
    7. if (serviceObj == null) {
    8. // 本地还是没有则发送http从服务端获取,并缓存到本地
    9. updateService(serviceName, clusters);
    10. return;
    11. }
    12. // 过期服务(服务的最新更新时间小于等于缓存刷新时间),从注册中心重新查询
    13. if (serviceObj.getLastRefTime() <= lastRefTime) {
    14. updateService(serviceName, clusters);
    15. serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
    16. } else {
    17. // if serviceName already updated by push, we should not override it
    18. // since the push data may be different from pull through force push
    19. // 如果 serviceName 已经通过 push 更新,我们不应该覆盖它,
    20. // 因为 push 数据可能与 pull through force push 不同
    21. refreshOnly(serviceName, clusters);
    22. }
    23. // 刷新更新时间
    24. lastRefTime = serviceObj.getLastRefTime();
    25. if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
    26. .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
    27. // abort the update task
    28. NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
    29. return;
    30. }
    31. if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
    32. incFailCount();
    33. return;
    34. }
    35. delayTime = serviceObj.getCacheMillis();
    36. // 重置失败数量为0
    37. resetFailCount();
    38. } catch (Throwable e) {
    39. incFailCount();
    40. NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
    41. } finally {
    42. // 下次调度刷新时间,下次执行的时间与failCount有关
    43. // failCount=0,则下次调度时间为6秒,最长为1分钟,即当无异常情况下缓存实例的刷新时间是6秒
    44. executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    45. }
    46. }

    主要逻辑:

    1. 尝试从本地缓存获取
    2. 本地还是没有则发送http从服务端获取,并缓存到本地
    3. 过期服务,从注册中心重新查询;否则如果 serviceName 已经通过 push 更新,不应该覆盖它,因为 push 数据可能与 pull through force push 不同
    4. 刷新更新时间、重置失败数量为0等
    5. 下次调度刷新时间,下次执行的时间与failCount有关failCount=0,则下次调度时间为6秒,最长为1分钟,即当无异常情况下缓存实例的刷新时间是6秒

    💫queryList()发送http请求注册中心

    里面会调用重载的reqApi()方法,调用前组装入参、拼接URL等。

    1. public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
    2. throws NacosException {
    3. final Map params = new HashMap(8);
    4. params.put(CommonParams.NAMESPACE_ID, namespaceId);
    5. params.put(CommonParams.SERVICE_NAME, serviceName);
    6. params.put("clusters", clusters);
    7. params.put("udpPort", String.valueOf(udpPort));
    8. params.put("clientIP", NetUtils.localIP());
    9. params.put("healthyOnly", String.valueOf(healthyOnly));
    10. return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
    11. }

    这篇文章是基于Nacos地1.x版本地,Nacos已经发布了新的2.x版本,官方也推荐使用新的。故还会出新的关于服务发现的文章。

  • 相关阅读:
    基于Java毕业设计学籍管理系统源码+系统+mysql+lw文档+部署软件
    Hono——一个小型,简单且超快的Edges Web框架
    Java进阶篇之泛型
    redis基础知识总结——数据类型(字符串,列表,集合,哈希,集合)
    JavaScript获取字符串的字节长度
    2024-3-17Go语言入门
    权限系统 RGCA 四步架构法
    【手撕数据结构】二分查找(好多细节)
    excel 拼接数据填充单元格 = &
    【软考】14.1 面向对象基本概念/分析设计测试
  • 原文地址:https://blog.csdn.net/qq_57756904/article/details/127804082