• nacos上的注册过的服务实例掉线分析


    最近生产上的xxl_job框架的一个执行器(nacos客户端)因为分配内存不大,导致频繁与nacos服务端的连接断开,而断开之后虽然客户端服务没有宕掉,但是就是无法重新注册到nacos的服务端上去。

    问题定位:

    查看服务器日志;发现日志中打印好多内存溢出的情况,回顾上个迭代同事新增了一个功能,就是定时同步底层数据。后面定位问题的时候发现这个定时任务要执行12个小时才会结束,跟踪代码的时候发现每次遍历的时候要初始化大量数据到jvm内存中,这就导致了内存资源紧张,后台日志不断报内存溢出和GC回收异常的问题。

    解决方案:

    (1)针对有问题的定时业务逻辑重新进行编码优化实现。
    (2)扩大jvm分配给程序的启动内存。-Xms2g -Xmx2g统一改成 -Xms6g -Xmx6g.
    通过这2个方案整改后,1周内生产再也没有出过类似的问题。

    问题解决了,但是我想深究一下为什么nacos掉线后,就注册不上去了呢

    然后我想出现这个问题的原因是自己的服务在jvm的分配的内存使用完毕后,在后台运行的向nacos服务端保持心跳的线程阻塞或者被杀死了,导致后面nacos服务器接收不到来自客户端的心跳,从而我的服务后面没有再次被nacos服务端发现。

    基于以上情况,我试着从nacos客户端注册与心跳检测方面跟一下源码。

    首先最重要的是要找到保持服务于nacos服务端保持心跳的源码,看看这个后台运行的保持心跳的线程的运行机制。

    我猜想这个保持心跳的线程应该会在服务首次启动注册的时候激活。

    所以先找到注册的接口,去到nacos官网,找到api接口界面的注册接口:https://nacos.io/zh-cn/docs/open-api.html

    找到注册接口请求路径为/nacos/v1/ns/instance

    然后去gitlab下载nacos的源码,我直接下载的最新的,然后根据/nacos/v1/ns/instance 找到如下方法:
    可以看到服务注册成功后,还执行了个方法addBeatInfo

    1. @Override
    2. public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    3. if (instance.isEphemeral()) {
    4. BeatInfo beatInfo = new BeatInfo();
    5. beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
    6. beatInfo.setIp(instance.getIp());
    7. beatInfo.setPort(instance.getPort());
    8. beatInfo.setCluster(instance.getClusterName());
    9. beatInfo.setWeight(instance.getWeight());
    10. beatInfo.setMetadata(instance.getMetadata());
    11. beatInfo.setScheduled(false);
    12. beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
    13. beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    14. }

    看一下这个方法是做什么的,可以看到BeatReactor是个线程池类

    1. public BeatReactor(NamingProxy serverProxy, int threadCount) {
    2. this.serverProxy = serverProxy;
    3. executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
    4. @Override
    5. public Thread newThread(Runnable r) {
    6. Thread thread = new Thread(r);
    7. thread.setDaemon(true);
    8. thread.setName("com.alibaba.nacos.naming.beat.sender");
    9. return thread;
    10. }
    11. });
    12. }
    13. public void **addBeatInfo**(String serviceName, BeatInfo beatInfo) {
    14. NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    15. String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    16. BeatInfo existBeat = null;
    17. //fix #1733
    18. if ((existBeat = dom2Beat.remove(key)) != null) {
    19. existBeat.setStopped(true);
    20. }
    21. dom2Beat.put(key, beatInfo);
    22. //这个方法就是启动的后台保持心跳的线程
    23. executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    24. MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    25. }

    看看
    schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS)
    这个方法的定义

    1. /**
    2. * Creates and executes a one-shot action that becomes enabled
    3. * after the given delay.
    4. *
    5. * @param command the task to execute
    6. * @param delay the time from now to delay execution
    7. * @param unit the time unit of the delay parameter
    8. * @return a ScheduledFuture representing pending completion of
    9. * the task and whose {@code get()} method will return
    10. * {@code null} upon completion
    11. * @throws RejectedExecutionException if the task cannot be
    12. * scheduled for execution
    13. * @throws NullPointerException if command is null
    14. */
    15. public ScheduledFuture> schedule(Runnable command,
    16. long delay, TimeUnit unit);

    然后重点部分就是看BeatTask,BeatTask继承Runnable,run方法就是我们的重点,该方法调用了NamingProxy的sendBeat方法,服务端请求地址为/instance/beat的方法

    1. class BeatTask implements Runnable {
    2. BeatInfo beatInfo;
    3. public BeatTask(BeatInfo beatInfo) {
    4. this.beatInfo = beatInfo;
    5. }
    6. @Override
    7. public void run() {
    8. if (beatInfo.isStopped()) {
    9. return;
    10. }
    11. long nextTime = beatInfo.getPeriod();
    12. try {
    13. JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
    14. long interval = result.getIntValue("clientBeatInterval");
    15. boolean lightBeatEnabled = false;
    16. if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) {
    17. lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED);
    18. }
    19. BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
    20. if (interval > 0) {
    21. nextTime = interval;
    22. }
    23. int code = NamingResponseCode.OK;
    24. if (result.containsKey(CommonParams.CODE)) {
    25. code = result.getIntValue(CommonParams.CODE);
    26. }
    27. if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
    28. Instance instance = new Instance();
    29. instance.setPort(beatInfo.getPort());
    30. instance.setIp(beatInfo.getIp());
    31. instance.setWeight(beatInfo.getWeight());
    32. instance.setMetadata(beatInfo.getMetadata());
    33. instance.setClusterName(beatInfo.getCluster());
    34. instance.setServiceName(beatInfo.getServiceName());
    35. instance.setInstanceId(instance.getInstanceId());
    36. instance.setEphemeral(true);
    37. try {
    38. serverProxy.registerService(beatInfo.getServiceName(),
    39. NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
    40. } catch (Exception ignore) {
    41. }
    42. }
    43. } catch (NacosException ne) {
    44. NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
    45. JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg());
    46. }
    47. executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    48. }
    49. }

    找一下这个线程是每隔几秒运行一次的,是5秒

    接下来我们把目光放到服务端,找到InstanceController的beat方法,如果是参数beat信息的话,说明是第一次发起心跳,则会带有服务实例信息,因为发起心跳成功则服务端会返回下次不要带beat信息的参数,这样客户端第二次就不会携带beat信息了。如果发现没有该服务,又没带beat信息,说明这个服务可能被移除过了,直接返回没找到。如果没有服务,但是发现有beat信息,那就从beat中获取服务实例信息,进行注册.

    1. @CanDistro
    2. @PutMapping("/beat")
    3. @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    4. public ObjectNode beat(HttpServletRequest request) throws Exception {
    5. ObjectNode result = JacksonUtils.createEmptyJsonNode();
    6. //设置心跳间隔
    7. result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    8. String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    9. RsInfo clientBeat = null;
    10. //判断有无心跳内容
    11. //如果存在心跳内容则不是轻量级心跳就转化为RsInfo
    12. if (StringUtils.isNotBlank(beat)) {
    13. clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    14. }
    15. String clusterName = WebUtils
    16. .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    17. String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    18. int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    19. if (clientBeat != null) {
    20. if (StringUtils.isNotBlank(clientBeat.getCluster())) {
    21. clusterName = clientBeat.getCluster();
    22. } else {
    23. // fix #2533
    24. clientBeat.setCluster(clusterName);
    25. }
    26. ip = clientBeat.getIp();
    27. port = clientBeat.getPort();
    28. }
    29. String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    30. String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    31. NamingUtils.checkServiceNameFormat(serviceName);
    32. Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
    33. //获取实例的信息
    34. Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    35. //如果实例不存在
    36. if (instance == null) {
    37. if (clientBeat == null) {
    38. result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
    39. return result;
    40. }
    41. Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
    42. + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
    43. //根据您心跳内容创建一个实例信息
    44. instance = new Instance();
    45. instance.setPort(clientBeat.getPort());
    46. instance.setIp(clientBeat.getIp());
    47. instance.setWeight(clientBeat.getWeight());
    48. instance.setMetadata(clientBeat.getMetadata());
    49. instance.setClusterName(clusterName);
    50. instance.setServiceName(serviceName);
    51. instance.setInstanceId(instance.getInstanceId());
    52. instance.setEphemeral(clientBeat.isEphemeral());
    53. //注册实例
    54. serviceManager.registerInstance(namespaceId, serviceName, instance);
    55. }
    56. //获取服务的信息
    57. Service service = serviceManager.getService(namespaceId, serviceName);
    58. if (service == null) {
    59. throw new NacosException(NacosException.SERVER_ERROR,
    60. "service not found: " + serviceName + "@" + namespaceId);
    61. }
    62. //不存在的话,要创建一个进行处理
    63. if (clientBeat == null) {
    64. clientBeat = new RsInfo();
    65. clientBeat.setIp(ip);
    66. clientBeat.setPort(port);
    67. clientBeat.setCluster(clusterName);
    68. }
    69. //开启心跳检查任务
    70. service.processClientBeat(clientBeat);
    71. result.put(CommonParams.CODE, NamingResponseCode.OK);
    72. //5秒间隔
    73. if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
    74. result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    75. }
    76. //告诉客户端不需要带上心跳信息了,变成轻量级心跳了
    77. result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    78. return result;
    79. }

    接下来我们看一下nacos服务端 开启心跳检查任务processClientBeat方法,该方法将ClientBeatProcessor放入到线程池中,接下来我们看下重点看下run方法

    1. public void processClientBeat(final RsInfo rsInfo) {
    2. ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    3. clientBeatProcessor.setService(this);
    4. clientBeatProcessor.setRsInfo(rsInfo);
    5. //放入线程池中执行 HealthCheckReactor.scheduleNow(clientBeatProcessor);
    6. }

    更新每次客户端发送的最新的心跳时间,这个主要是是放到内存map中

    1. @Override
    2. public void run() {
    3. Service service = this.service;
    4. if (Loggers.EVT_LOG.isDebugEnabled()) {
    5. Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
    6. }
    7. String ip = rsInfo.getIp();
    8. String clusterName = rsInfo.getCluster();
    9. int port = rsInfo.getPort();
    10. Cluster cluster = service.getClusterMap().get(clusterName);
    11. List<Instance> instances = cluster.allIPs(true);
    12. for (Instance instance : instances) {
    13. if (instance.getIp().equals(ip) && instance.getPort() == port) {
    14. if (Loggers.EVT_LOG.isDebugEnabled()) {
    15. Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
    16. }
    17. //更新每次客户端发送的最新的心跳时间,这个主要是是放到内存map中 instance.setLastBeat(System.currentTimeMillis());
    18. if (!instance.isMarked()) {
    19. if (!instance.isHealthy()) {
    20. instance.setHealthy(true);
    21. Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
    22. cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
    23. getPushService().serviceChanged(service);
    24. }
    25. }
    26. }
    27. }
    28. }

    那么,为什么nacos服务端为什么要设置注册到nacos上的服务最新的更新时间呢?

    这涉及到nacos的健康检查机制

    Nacos Server会开启一个定时任务来检查注册服务的健康情况,对于超过15秒没收到客户端的心跳实例会将它的 healthy属性置为false,此时当客户端不会将该实例的信息发现,如果某个服务的实例超过30秒没收到心跳,则剔除该实例,如果剔除的实例恢复,发送心跳则会恢复。

    当有实例注册的时候,我们会看到有个service.init()的方法,该方法的实现主要是将ClientBeatCheckTask加入到线程池当中:

    1. private void putServiceAndInit(Service service) throws NacosException {
    2. putService(service);
    3. /启动服务检查
    4. service.init();
    5. consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    6. consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    7. Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
    8. }
    9. public void init() {
    10. HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    11. for (Map.Entry entry : clusterMap.entrySet()) {
    12. entry.getValue().setService(this);
    13. entry.getValue().init();
    14. }
    15. }

    ClientBeatCheckTask中的run方法主要做两件事心跳时间超过15秒则设置该实例信息为不健康状况和心跳时间超过30秒则删除该实例信息,如下代码:

    1. @Override
    2. public void run() {
    3. try {
    4. if (!getDistroMapper().responsible(service.getName())) {
    5. return;
    6. }
    7. if (!getSwitchDomain().isHealthCheckEnabled()) {
    8. return;
    9. }
    10. List<Instance> instances = service.allIPs(true);
    11. // first set health status of instances:
    12. for (Instance instance : instances) {
    13. //如果心跳时间超过15秒则设置该实例信息为不健康状况
    14. if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
    15. if (!instance.isMarked()) {
    16. if (instance.isHealthy()) {
    17. instance.setHealthy(false);
    18. Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
    19. instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
    20. UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
    21. getPushService().serviceChanged(service);
    22. SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
    23. }
    24. }
    25. }
    26. }
    27. if (!getGlobalConfig().isExpireInstance()) {
    28. return;
    29. }
    30. // then remove obsolete instances:
    31. for (Instance instance : instances) {
    32. if (instance.isMarked()) {
    33. continue;
    34. }
    35. //如果心跳时间超过30秒则删除该实例信息
    36. if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
    37. // delete instance
    38. Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
    39. deleteIP(instance);
    40. }
    41. }
    42. } catch (Exception e) {
    43. Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    44. }
    45. }

    到此完成删除实例的过程,整体的时序图如下:

    经过源码的研究,我发现问题的根本原因就是我的服务里面维持向nacos服务器端发送心跳的定时任务不再执行了,本质上讲 就是内存溢出的原因导致这个线程不再执行了

    应该从jvm和垃圾回收层面找问题,或者说JVM内存溢出造成的tomcat假死

  • 相关阅读:
    平衡搜索树——B-树小记
    1-k8s常见注意事项
    <蓝桥杯软件赛>零基础备赛20周--第5周--杂题-2
    c++ | makefile | 编译 | 链接库
    2022-08-25 mysql范围查询场景下的自适应跳表索引-第二版-改进点
    zzcase&接口自动化-质&效的探索
    makefile & dockerfile
    virtio-net 报文组织形式
    Python 正则表达式转义
    R语言APRIORI关联规则、K-MEANS均值聚类分析中药专利复方治疗用药规律网络可视化...
  • 原文地址:https://blog.csdn.net/baidu_39322753/article/details/132804728