• Nacos注册中心10-Server端(处理服务心跳请求)


    0. 环境

    • nacos版本:1.4.1
    • Spring Cloud : 2020.0.2
    • Spring Boot :2.4.4
    • Spring Cloud alibaba: 2.2.5.RELEASE

    测试代码:github.com/hsfxuebao/s…

    1. 服务端接收心跳

    这个心跳请求是走了InstanceController 的beat 方法处理的,代码如下:

    1. @CanDistro
    2. @PutMapping("/beat")
    3. @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    4. public ObjectNode beat(HttpServletRequest request) throws Exception {
    5. // 创建一个JSON Node,该方法的返回值就是它,后面的代码就是对这个Node进行各种初始化
    6. ObjectNode result = JacksonUtils.createEmptyJsonNode();
    7. result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    8. // 从请求中获取到beat,即client端的beatInfo
    9. String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    10. RsInfo clientBeat = null;
    11. // 将beat构建为clientBeat
    12. if (StringUtils.isNotBlank(beat)) {
    13. clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    14. }
    15. String clusterName = WebUtils
    16. .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    17. String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    18. // 获取到客户端传递来的client的port,其将来用于UDP通信
    19. int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    20. if (clientBeat != null) {
    21. if (StringUtils.isNotBlank(clientBeat.getCluster())) {
    22. clusterName = clientBeat.getCluster();
    23. } else {
    24. // fix #2533
    25. clientBeat.setCluster(clusterName);
    26. }
    27. ip = clientBeat.getIp();
    28. port = clientBeat.getPort();
    29. }
    30. String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    31. String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    32. NamingUtils.checkServiceNameFormat(serviceName);
    33. Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
    34. // 从注册表中获取当前发送请求的client对应的instance
    35. Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    36. // 处理注册表中不存在该client的instance的情况
    37. if (instance == null) {
    38. // 若请求中没有携带心跳数据,则直接返回
    39. if (clientBeat == null) {
    40. result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
    41. return result;
    42. }
    43. Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
    44. + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
    45. // 下面处理的情况是,注册表中没有该client的instance,但其发送的请求中具有心跳数据。
    46. // 在client的注册请求还未到达时(网络抖动等原因),第一次心跳请求先到达了server,会出现这种情况
    47. // 处理方式是,使用心跳数据构建出一个instance,注册到注册表
    48. instance = new Instance();
    49. instance.setPort(clientBeat.getPort());
    50. instance.setIp(clientBeat.getIp());
    51. instance.setWeight(clientBeat.getWeight());
    52. instance.setMetadata(clientBeat.getMetadata());
    53. instance.setClusterName(clusterName);
    54. instance.setServiceName(serviceName);
    55. instance.setInstanceId(instance.getInstanceId());
    56. instance.setEphemeral(clientBeat.isEphemeral());
    57. // 注册
    58. serviceManager.registerInstance(namespaceId, serviceName, instance);
    59. }
    60. // 从注册表中获取service
    61. Service service = serviceManager.getService(namespaceId, serviceName);
    62. if (service == null) {
    63. throw new NacosException(NacosException.SERVER_ERROR,
    64. "service not found: " + serviceName + "@" + namespaceId);
    65. }
    66. if (clientBeat == null) {
    67. clientBeat = new RsInfo();
    68. clientBeat.setIp(ip);
    69. clientBeat.setPort(port);
    70. clientBeat.setCluster(clusterName);
    71. }
    72. // todo 处理本次心跳
    73. service.processClientBeat(clientBeat);
    74. result.put(CommonParams.CODE, NamingResponseCode.OK);
    75. // 这个就有点动态配置了
    76. // 如果instance中有 preserved.heart.beat.interval 这个参数
    77. if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
    78. // 带回给客户端
    79. result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    80. }
    81. result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    82. return result;
    83. }
    84. 复制代码

    先是根据namespaceId, serviceName, clusterName, ip, port 这个参数调用 ServiceManager的getInstance 获取对应的instance,其实就是先根据namespace从serviceMap中获取对应的service,接着根据cluster从service的clusterMap中获取对应cluster的instance集合,然后再遍历比对ip与port。

    如果没有找到对应的instance,而且beatInfo不是null,就会进行服务注册。

    接着就是根据namespace与serviceName获取service,然后调用service的processClientBeat 方法处理心跳。这个processClientBeat 方法我们后面看,先看下后面这个有意思的,它往这个返回值中塞了clientBeatIntervallightBeatEnabled 参数值,这clientBeatInterval 就是心跳间隔lightBeatEnabled 就是带不带beatInfo,这时候lightBeatEnabled 返回的就是true了,也就是下次不带了,看来这个心跳间隔是可以随时调整的,而且不用动服务,在控制台修改下某个实例的元数据就可以了。

    接下来看下service是怎样处理请求的:

    1. public void processClientBeat(final RsInfo rsInfo) {
    2. // 创建一个处理器,其是一个任务
    3. ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    4. clientBeatProcessor.setService(this);
    5. clientBeatProcessor.setRsInfo(rsInfo);
    6. // 开启一个立即执行的任务,即执行clientBeatProcessor任务的run()
    7. HealthCheckReactor.scheduleNow(clientBeatProcessor);
    8. }
    9. 复制代码

    封装一个ClientBeatProcessor ,然后交给了HealthCheckReactorscheduleNamingHealth 方法,其实就是给了一个健康检查的线程池处理了。看下ClientBeatProcessor 这个任务里面怎样执行的:

    1. @Override
    2. public void run() {
    3. Service service = this.service;
    4. if (Loggers.EVT_LOG.isDebugEnabled()) {
    5. Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
    6. }
    7. String ip = rsInfo.getIp();
    8. String clusterName = rsInfo.getCluster();
    9. int port = rsInfo.getPort();
    10. Cluster cluster = service.getClusterMap().get(clusterName);
    11. // 获取当前服务的所有临时实例
    12. List<Instance> instances = cluster.allIPs(true);
    13. // 遍历所有这些临时实例,从中查找当前发送心跳的instance
    14. for (Instance instance : instances) {
    15. // 只要ip与port与当前心跳的instance的相同,就是了
    16. if (instance.getIp().equals(ip) && instance.getPort() == port) {
    17. if (Loggers.EVT_LOG.isDebugEnabled()) {
    18. Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
    19. }
    20. // 修改最后心跳时间戳
    21. instance.setLastBeat(System.currentTimeMillis());
    22. // 修改该instance的健康状态
    23. // 当instance被标记时,即其marked为true时,其是一个持久实例
    24. if (!instance.isMarked()) {
    25. // instance的healthy才是临时实例健康状态的表示
    26. // 若当前instance健康状态为false,但本次是其发送的心跳,说明这个instance“起死回生”了,
    27. // 我们需要将其health变为true
    28. if (!instance.isHealthy()) {
    29. instance.setHealthy(true);
    30. Loggers.EVT_LOG
    31. .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
    32. cluster.getService().getName(), ip, port, cluster.getName(),
    33. UtilsAndCommons.LOCALHOST_SITE);
    34. // todo 发布服务变更事件(其对后续我们要分析的UDP通信非常重要)
    35. getPushService().serviceChanged(service);
    36. }
    37. }
    38. }
    39. }
    40. }
    41. 复制代码

    其实就是通过namespace/serviceName/cluster/ip/port找到对应的instance对象,重新设置一下LastBeat 的时间,也就是

    1. instance.setLastBeat(System.currentTimeMillis());
    2. 复制代码

    这行,接着就是判断,如果不健康的话,就更改健康状态是true,也就是改成健康。最后getPushService().serviceChanged(service);这行需要注意下,健康状态改变了,会引起它 将新的instance信息推送到那堆服务订阅者客户端上,这个服务订阅发布我们后面会介绍。

    好了,到这我们服务端对心跳消息的处理就结束了,可以看到,处理心跳消息也是异步的,将处理封装成task投寄到线程池,然后就直接返回给客户端了,由线程池执行这个task。

    2. 方法调用图

  • 相关阅读:
    电力电子转战数字IC20220820day65——uvm实战1B
    leetcode 823. Binary Trees With Factors(因子二叉树)
    Linux系统编程(三):进程
    【uniapp】安装与使用uView组件库:
    【Java 进阶篇】Java Filter 执行流程及生命周期详解
    【差旅游记】启程-新疆哈密(1)
    卧槽,Log4j2 再爆雷,Log4j v2.17.0 横空出世。。。
    vue如何解决跨域?
    大数据、小数据、都要从养数据开始
    vue-cli3项目本地启用https,并用mkcert生成证书
  • 原文地址:https://blog.csdn.net/BASK2311/article/details/127717636