最近生产上的xxl_job框架的一个执行器(nacos客户端)因为分配内存不大,导致频繁与nacos服务端的连接断开,而断开之后虽然客户端服务没有宕掉,但是就是无法重新注册到nacos的服务端上去。
查看服务器日志;发现日志中打印好多内存溢出的情况,回顾上个迭代同事新增了一个功能,就是定时同步底层数据。后面定位问题的时候发现这个定时任务要执行12个小时才会结束,跟踪代码的时候发现每次遍历的时候要初始化大量数据到jvm内存中,这就导致了内存资源紧张,后台日志不断报内存溢出和GC回收异常的问题。
(1)针对有问题的定时业务逻辑重新进行编码优化实现。
(2)扩大jvm分配给程序的启动内存。-Xms2g -Xmx2g统一改成 -Xms6g -Xmx6g.
通过这2个方案整改后,1周内生产再也没有出过类似的问题。
然后我想出现这个问题的原因是自己的服务在jvm的分配的内存使用完毕后,在后台运行的向nacos服务端保持心跳的线程阻塞或者被杀死了,导致后面nacos服务器接收不到来自客户端的心跳,从而我的服务后面没有再次被nacos服务端发现。
基于以上情况,我试着从nacos客户端注册与心跳检测方面跟一下源码。
首先最重要的是要找到保持服务于nacos服务端保持心跳的源码,看看这个后台运行的保持心跳的线程的运行机制。
我猜想这个保持心跳的线程应该会在服务首次启动注册的时候激活。
所以先找到注册的接口,去到nacos官网,找到api接口界面的注册接口:https://nacos.io/zh-cn/docs/open-api.html
找到注册接口请求路径为/nacos/v1/ns/instance
然后去gitlab下载nacos的源码,我直接下载的最新的,然后根据/nacos/v1/ns/instance 找到如下方法:
可以看到服务注册成功后,还执行了个方法addBeatInfo
- @Override
- public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
-
- if (instance.isEphemeral()) {
- BeatInfo beatInfo = new BeatInfo();
- beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
- beatInfo.setIp(instance.getIp());
- beatInfo.setPort(instance.getPort());
- beatInfo.setCluster(instance.getClusterName());
- beatInfo.setWeight(instance.getWeight());
- beatInfo.setMetadata(instance.getMetadata());
- beatInfo.setScheduled(false);
- beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
-
- beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
- }
看一下这个方法是做什么的,可以看到BeatReactor是个线程池类
- public BeatReactor(NamingProxy serverProxy, int threadCount) {
- this.serverProxy = serverProxy;
- executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setDaemon(true);
- thread.setName("com.alibaba.nacos.naming.beat.sender");
- return thread;
- }
- });
- }
-
- public void **addBeatInfo**(String serviceName, BeatInfo beatInfo) {
- NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
- String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
- BeatInfo existBeat = null;
- //fix #1733
- if ((existBeat = dom2Beat.remove(key)) != null) {
- existBeat.setStopped(true);
- }
- dom2Beat.put(key, beatInfo);
- //这个方法就是启动的后台保持心跳的线程
- executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
- MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
- }
看看
schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS)
这个方法的定义
- /**
- * Creates and executes a one-shot action that becomes enabled
- * after the given delay.
- *
- * @param command the task to execute
- * @param delay the time from now to delay execution
- * @param unit the time unit of the delay parameter
- * @return a ScheduledFuture representing pending completion of
- * the task and whose {@code get()} method will return
- * {@code null} upon completion
- * @throws RejectedExecutionException if the task cannot be
- * scheduled for execution
- * @throws NullPointerException if command is null
- */
- public ScheduledFuture> schedule(Runnable command,
- long delay, TimeUnit unit);
然后重点部分就是看BeatTask,BeatTask继承Runnable,run方法就是我们的重点,该方法调用了NamingProxy的sendBeat方法,服务端请求地址为/instance/beat的方法
- class BeatTask implements Runnable {
-
- BeatInfo beatInfo;
-
- public BeatTask(BeatInfo beatInfo) {
- this.beatInfo = beatInfo;
- }
-
- @Override
- public void run() {
- if (beatInfo.isStopped()) {
- return;
- }
- long nextTime = beatInfo.getPeriod();
- try {
- JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
- long interval = result.getIntValue("clientBeatInterval");
- boolean lightBeatEnabled = false;
- if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) {
- lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED);
- }
- BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
- if (interval > 0) {
- nextTime = interval;
- }
- int code = NamingResponseCode.OK;
- if (result.containsKey(CommonParams.CODE)) {
- code = result.getIntValue(CommonParams.CODE);
- }
- if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
- Instance instance = new Instance();
- instance.setPort(beatInfo.getPort());
- instance.setIp(beatInfo.getIp());
- instance.setWeight(beatInfo.getWeight());
- instance.setMetadata(beatInfo.getMetadata());
- instance.setClusterName(beatInfo.getCluster());
- instance.setServiceName(beatInfo.getServiceName());
- instance.setInstanceId(instance.getInstanceId());
- instance.setEphemeral(true);
- try {
- serverProxy.registerService(beatInfo.getServiceName(),
- NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
- } catch (Exception ignore) {
- }
- }
- } catch (NacosException ne) {
- NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
- JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg());
-
- }
- executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
- }
- }
找一下这个线程是每隔几秒运行一次的,是5秒
接下来我们把目光放到服务端,找到InstanceController的beat方法,如果是参数beat信息的话,说明是第一次发起心跳,则会带有服务实例信息,因为发起心跳成功则服务端会返回下次不要带beat信息的参数,这样客户端第二次就不会携带beat信息了。如果发现没有该服务,又没带beat信息,说明这个服务可能被移除过了,直接返回没找到。如果没有服务,但是发现有beat信息,那就从beat中获取服务实例信息,进行注册.
- @CanDistro
- @PutMapping("/beat")
- @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
- public ObjectNode beat(HttpServletRequest request) throws Exception {
-
- ObjectNode result = JacksonUtils.createEmptyJsonNode();
- //设置心跳间隔
- result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
-
- String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
- RsInfo clientBeat = null;
- //判断有无心跳内容
- //如果存在心跳内容则不是轻量级心跳就转化为RsInfo
- if (StringUtils.isNotBlank(beat)) {
- clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
- }
- String clusterName = WebUtils
- .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
- String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
- int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
- if (clientBeat != null) {
- if (StringUtils.isNotBlank(clientBeat.getCluster())) {
- clusterName = clientBeat.getCluster();
- } else {
- // fix #2533
- clientBeat.setCluster(clusterName);
- }
- ip = clientBeat.getIp();
- port = clientBeat.getPort();
- }
- String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
- String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
- NamingUtils.checkServiceNameFormat(serviceName);
- Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
- //获取实例的信息
- Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
- //如果实例不存在
- if (instance == null) {
- if (clientBeat == null) {
- result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
- return result;
- }
-
- Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
- + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
- //根据您心跳内容创建一个实例信息
- instance = new Instance();
- instance.setPort(clientBeat.getPort());
- instance.setIp(clientBeat.getIp());
- instance.setWeight(clientBeat.getWeight());
- instance.setMetadata(clientBeat.getMetadata());
- instance.setClusterName(clusterName);
- instance.setServiceName(serviceName);
- instance.setInstanceId(instance.getInstanceId());
- instance.setEphemeral(clientBeat.isEphemeral());
- //注册实例
- serviceManager.registerInstance(namespaceId, serviceName, instance);
- }
- //获取服务的信息
- Service service = serviceManager.getService(namespaceId, serviceName);
-
- if (service == null) {
- throw new NacosException(NacosException.SERVER_ERROR,
- "service not found: " + serviceName + "@" + namespaceId);
- }
- //不存在的话,要创建一个进行处理
- if (clientBeat == null) {
- clientBeat = new RsInfo();
- clientBeat.setIp(ip);
- clientBeat.setPort(port);
- clientBeat.setCluster(clusterName);
- }
- //开启心跳检查任务
- service.processClientBeat(clientBeat);
-
- result.put(CommonParams.CODE, NamingResponseCode.OK);
- //5秒间隔
- if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
- result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
- }
- //告诉客户端不需要带上心跳信息了,变成轻量级心跳了
- result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
- return result;
- }
接下来我们看一下nacos服务端 开启心跳检查任务processClientBeat方法,该方法将ClientBeatProcessor放入到线程池中,接下来我们看下重点看下run方法
- public void processClientBeat(final RsInfo rsInfo) {
- ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
- clientBeatProcessor.setService(this);
- clientBeatProcessor.setRsInfo(rsInfo);
- //放入线程池中执行 HealthCheckReactor.scheduleNow(clientBeatProcessor);
- }
更新每次客户端发送的最新的心跳时间,这个主要是是放到内存map中
- @Override
- public void run() {
- Service service = this.service;
- if (Loggers.EVT_LOG.isDebugEnabled()) {
- Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
- }
-
- String ip = rsInfo.getIp();
- String clusterName = rsInfo.getCluster();
- int port = rsInfo.getPort();
- Cluster cluster = service.getClusterMap().get(clusterName);
- List<Instance> instances = cluster.allIPs(true);
-
- for (Instance instance : instances) {
- if (instance.getIp().equals(ip) && instance.getPort() == port) {
- if (Loggers.EVT_LOG.isDebugEnabled()) {
- Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
- }
- //更新每次客户端发送的最新的心跳时间,这个主要是是放到内存map中 instance.setLastBeat(System.currentTimeMillis());
- if (!instance.isMarked()) {
- if (!instance.isHealthy()) {
- instance.setHealthy(true);
- Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
- cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
- getPushService().serviceChanged(service);
- }
- }
- }
- }
- }
那么,为什么nacos服务端为什么要设置注册到nacos上的服务最新的更新时间呢?
Nacos Server会开启一个定时任务来检查注册服务的健康情况,对于超过15秒没收到客户端的心跳实例会将它的 healthy属性置为false,此时当客户端不会将该实例的信息发现,如果某个服务的实例超过30秒没收到心跳,则剔除该实例,如果剔除的实例恢复,发送心跳则会恢复。
当有实例注册的时候,我们会看到有个service.init()的方法,该方法的实现主要是将ClientBeatCheckTask加入到线程池当中:
- private void putServiceAndInit(Service service) throws NacosException {
- putService(service);
- /启动服务检查
- service.init();
- consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
- consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
- Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
- }
-
-
- public void init() {
-
- HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
-
- for (Map.Entry
entry : clusterMap.entrySet()) { - entry.getValue().setService(this);
- entry.getValue().init();
- }
- }
ClientBeatCheckTask中的run方法主要做两件事心跳时间超过15秒则设置该实例信息为不健康状况和心跳时间超过30秒则删除该实例信息,如下代码:
- @Override
- public void run() {
- try {
- if (!getDistroMapper().responsible(service.getName())) {
- return;
- }
-
- if (!getSwitchDomain().isHealthCheckEnabled()) {
- return;
- }
-
- List<Instance> instances = service.allIPs(true);
-
- // first set health status of instances:
- for (Instance instance : instances) {
- //如果心跳时间超过15秒则设置该实例信息为不健康状况
- if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
- if (!instance.isMarked()) {
- if (instance.isHealthy()) {
- instance.setHealthy(false);
- Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
- instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
- UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
- getPushService().serviceChanged(service);
- SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
- }
- }
- }
- }
-
-
- if (!getGlobalConfig().isExpireInstance()) {
- return;
- }
-
- // then remove obsolete instances:
- for (Instance instance : instances) {
-
- if (instance.isMarked()) {
- continue;
- }
- //如果心跳时间超过30秒则删除该实例信息
-
- if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
- // delete instance
- Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
- deleteIP(instance);
- }
- }
-
- } catch (Exception e) {
- Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
- }
-
- }
到此完成删除实例的过程,整体的时序图如下: