• zk session expire会引起HA模式的rm一直处于standby吗


    【概述】


    最近连续在多个环境中遇到了同一个问题:在HA模式下,两个resourcemanager均为standby,并且持续没有选举出新的leader。经过一番分析,并对照源码梳理问题出现前后的逻辑流程,最后发现是因为zk会话过期(session expire)引起的问题,本文就复盘总结下。

    【RM的正常选举流程】

    在很早之前的文章中,介绍过hadoop里namenode的HA机制(戳这里),RM的选举流程其实是复用了同样的框架,只是以一个独立线程的方式运行,而不是像namenode一样,有个独立的进程(zkfc)负责与zk连接并选举。

    因此,整体的选举流程会和namenode的选举方式基本雷同,即首先向zk建立连接,当连接建立成功后,在zk上竞争创建临时锁节点,成功创建的rm成为active,失败的则成为standby。

    【与zk之间网络异常后的情况】

    正常逻辑是相对简单的,那我们再来看看与zk之间网络出现异常,以及网络异常恢复之后的处理逻辑,具体如下图所示:

    10cd742afc2ea01202548ee9c8a94df3.jpeg

    1. 当ZK服务出现故障,或者网络出现故障,导致网络完全不可达时,客户端与ZK的连接会出现在指定时间内没有读到任何数据,从而引发会话超时。(也可能是读异常,此时产生的是EndOfStreamException,后续处理逻辑与会话超时的逻辑一样)。

    这个时候,zk客户端的发送线程会抛会话超时的异常,同时内部捕获该异常, 向事件回调线程的队列中插入连接断开的事件。此后,循环执行与zk的重连动作。

    1. while (state.isAlive()) {
    2.     try {
    3.         ...
    4.         if (to <= 0) {
    5.             String warnInfo;
    6.             warnInfo =
    7.                 "Client session timed out, have not heard from server in " +
    8.                 clientCnxnSocket.getIdleRecv() + "ms" +
    9.                 " for sessionid 0x" + Long.toHexString(sessionId);
    10.             LOG.warn(warnInfo);
    11.             throw new SessionTimeoutException(warnInfo);
    12.         }
    13.     } catch (Throwable e) {
    14.         ...
    15.         if (state.isAlive()) {
    16.             eventThread.queueEvent(
    17.                 new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
    18.         }
    19.         ...
    20.     }
    21. }

    2. zk客户端中的事件回调线程接收到事件后,向上进行回调通知。在RM的回调处理中,启动定时器线程,触发成为standby。

    1. synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
    2.     ...
    3.     if (eventType == Event.EventType.None) {
    4.         switch (event.getState()) {
    5.         case Disconnected:
    6.             LOG.info("Session disconnected. Entering neutral mode...");
    7.             zkConnectionState = ConnectionState.DISCONNECTED;
    8.             enterNeutralMode();
    9.             break;
    10.         ...
    11.         }
    12.     }
    13. }
    14. private void enterNeutralMode() {
    15.     if (state != State.NEUTRAL) {
    16.         if (LOG.isDebugEnabled()) {
    17.             LOG.debug("Entering neutral mode for " + this);
    18.         }
    19.         state = State.NEUTRAL;
    20.         appClient.enterNeutralMode();
    21.     }
    22. }
    23. public void enterNeutralMode() {
    24.     LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
    25.         + zkSessionTimeout + " ms if connection is not reestablished.");
    26.     // If we've just become disconnected, start a timer. When the time's up,
    27.     // we'll transition to standby.
    28.     synchronized (zkDisconnectLock) {
    29.       if (zkDisconnectTimer == null) {
    30.         zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
    31.         zkDisconnectTimer.schedule(new TimerTask() {
    32.           @Override
    33.           public void run() {
    34.             synchronized (zkDisconnectLock) {
    35.               // Only run if the timer hasn't been cancelled
    36.               if (zkDisconnectTimer != null) {
    37.                 becomeStandby();
    38.               }
    39.             }
    40.           }
    41.         }, zkSessionTimeout);
    42.       }
    43.     }
    44. }

    3. 当网络恢复后,ZK客户端重连成功, 但仍旧是携带老的会话ID发送注册请求,如果重连时间超过了会话过期的时间,那么服务端会给出相应应答,告知会话过期,同时断开连接。

    此时,ZK客户端内部发送线程会从响应中得到知道会话过期,向事件线程发送会话过期事件以及线程退出事件,同时将自身状态置为CLOSED,并抛出异常,这样发送线程也就会退出循环从而结束运行。

    1. void onConnected(
    2.     int _negotiatedSessionTimeout,
    3.     long _sessionId,
    4.     byte[] _sessionPasswd,
    5.     boolean isRO)
    6.     throws IOException {
    7.     negotiatedSessionTimeout = _negotiatedSessionTimeout;
    8.     if(negotiatedSessionTimeout <= 0) {
    9.         state = States.CLOSED;
    10.         eventThread.queueEvent(new WatchedEvent(
    11.             Watcher.Event.EventType.None,
    12.             Watcher.Event.KeeperState.Expired, null));
    13.         eventThread.queueEventOfDeath();
    14.         String warnInfo;
    15.         warnInfo =
    16.             "Unable to reconnect to ZooKeeper service, session 0x" +
    17.             Long.toHexString(sessionId) + " has expired";
    18.         LOG.warn(warnInfo);
    19.         throw new SessionExpiredException(warnInfo);
    20.     }
    21. }

    4. 在会话过期的回调处理中,修改自身状态,并重新参与选举,这包括关闭当前的客户端,重新创建新的zk客户端进行连接,如果能成功连接,则继续创建锁节点来进行leader的选举。

    1. synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
    2.     ...
    3.     if (eventType == Event.EventType.None) {
    4.         switch (event.getState()) {
    5.         case Expired:
    6.             LOG.info("Session expired. Entering neutral mode and rejoining...");
    7.             enterNeutralMode();
    8.             reJoinElection(0);
    9.             break;
    10.         ...
    11.         }
    12.     }
    13. }
    14. private void reJoinElection(int sleepTime) {
    15.     LOG.info("Trying to re-establish ZK session");
    16.     
    17.     sessionReestablishLockForTests.lock();
    18.     try {
    19.         terminateConnection();
    20.         sleepFor(sleepTime);
    21.         // Should not join election even before the SERVICE is reported
    22.         // as HEALTHY from ZKFC monitoring.
    23.         if (appData != null) {
    24.             joinElectionInternal();
    25.         } else {
    26.             LOG.info("Not joining election since service has not yet been " +
    27.                 "reported as healthy.");
    28.         }
    29.     } finally {
    30.         sessionReestablishLockForTests.unlock();
    31.     }
    32. }
    33. private void joinElectionInternal() {
    34.     Preconditions.checkState(appData != null,
    35.         "trying to join election without any app data");
    36.     if (zkClient == null) {
    37.         if (!reEstablishSession()) {
    38.             fatalError("Failed to reEstablish connection with ZooKeeper");
    39.             return;
    40.         }
    41.     }
    42.     createRetryCount = 0;
    43.     wantToBeInElection = true;
    44.     createLockNodeAsync();
    45. }

    对于standby的RM,其完整的日志如下所示:

    1. // 超时会接收到任何数据
    2. 2022-09-01 19:10:25,230 WARN org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 6668ms for sessionid 0x10054aa9d110000
    3. // 异常捕获
    4. 2022-09-01 19:10:25,230 INFO org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 6668ms for sessionid 0x10054aa9d110000, closing socket connection and attempting reconnect
    5. // RM的回调处理
    6. 2022-09-01 19:10:25,331 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session disconnected. Entering neutral mode...
    7. // 触发定时器线程
    8. 2022-09-01 19:10:25,331 WARN org.apache.hadoop.yarn.server.resourcemanager.ActiveStandbyElectorBasedElectorService: Lost contact with Zookeeper. Transitioning to standby in 10000 ms if connection is not reestablished.
    9. // ZK客户端的发送线程尝试重连
    10. 2022-09-01 19:10:26,905 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server zk-0-hncscwc.network-hncscwc/172.168.1.1:2181. Will not attempt to authenticate using SASL (unknown error)
    11. // 定时器线程触发进行状态的状态, 但当前状态已经是standby状态
    12. 2022-09-01 19:10:35,334 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Already in standby state
    13. // 重连成功
    14. 2022-09-01 19:13:51,101 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, initiating session
    15. // 会话过期, 向事件回调线程队列插入会话过期的事件
    16. 2022-09-01 19:13:51,104 WARN org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x10054aa9d110000 has expired
    17. // 回调处理, 并触发重新选举
    18. 2022-09-01 19:13:51,104 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session expired. Entering neutral mode and rejoining...
    19. // 发送线程捕获异常
    20. 2022-09-01 19:13:51,105 INFO org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x10054aa9d110000 has expired, closing socket connection
    21. // 重新建立连接并进行选举
    22. 2022-09-01 19:13:51,105 INFO org.apache.hadoop.ha.ActiveStandbyElector: Trying to re-establish ZK session
    23. 2022-09-01 19:13:51,109 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, initiating session
    24. // 成功建立连接(注意会话ID不同)
    25. 2022-09-01 19:13:51,122 INFO org.apache.zookeeper.ClientCnxn: Session establishment complete on server zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, sessionid = 0x10054aa9d110006, negotiated timeout = 10000
    26. // 连接成功建立的回调
    27. 2022-09-01 19:13:51,123 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session connected.

    【极端情况引起的BUG】

    上面的逻辑分析中提到了,首次重连ZK后触发会话过期后,会重新创建新的客户端进行重连动作(毕竟老的会话已经过期,无法再继续使用)。通常情况下, 网络都是相对稳定的,创建新的客户端连接肯定可以重连成功,毕竟这一系列动作是连贯的,中间没有任何睡眠操作。

    但如果真的有极端情况,会话过期后重连ZK失败,并且达到重连的最大次数后仍旧未成功连接ZK。那么此时,会再向上层回调一个致命错误,对于这类型错误的处理,则是创建一个线程先进行standby状态的转换,然后再进行重新选举的动作。

    在这个线程中,会对一个原子变量进行判断(初始值为false)。如果为false,表示当前没有线程在执行这个动作,将该变量置为true,然后进行后续动作。

    然而,这个地方,也是BUG所在的地方。在这个线程中重新进行选举,其逻辑和之前一样,依旧是先尝试连接ZK,如果持续无法连接到ZK,并且达到最大重连次数,则再触发回调,创建新线程进行后续逻辑。但此时,原子变量的值已经被置为true,新的线程运行后,判断该值为true,则直接退出。此后就没有机会再进行与ZK的重连动作了。fd268aeedfc52207cc313f548a2271e6.jpeg

    对应的代码如下所示:

    1. private void fatalError(String errorMessage) {
    2.     LOG.error(errorMessage);
    3.     reset();
    4.     appClient.notifyFatalError(errorMessage);
    5. }
    6. public void notifyFatalError(String errorMessage) {
    7.     rm.getRMContext().getDispatcher().getEventHandler().handle(
    8.         new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
    9.             errorMessage));
    10. }
    11. private class RMFatalEventDispatcher implements EventHandler {
    12.     @Override
    13.     public void handle(RMFatalEvent event) {
    14.         LOG.error("Received " + event);
    15.         if (HAUtil.isHAEnabled(getConfig())) {
    16.             LOG.warn("Transitioning the resource manager to standby.");
    17.             handleTransitionToStandByInNewThread();
    18.         }
    19.         ...
    20.     }
    21. }
    22. private void handleTransitionToStandByInNewThread() {
    23.     Thread standByTransitionThread =
    24.         new Thread(activeServices.standByTransitionRunnable);
    25.     standByTransitionThread.setName("StandByTransitionThread");
    26.     standByTransitionThread.start();
    27. }
    28. private class StandByTransitionRunnable implements Runnable {
    29.     // The atomic variable to make sure multiple threads with the same runnable
    30.     // run only once.
    31.     private final AtomicBoolean hasAlreadyRun = new AtomicBoolean(false);
    32.     @Override
    33.     public void run() {
    34.       // Run this only once, even if multiple threads end up triggering
    35.       // this simultaneously.
    36.       if (hasAlreadyRun.getAndSet(true)) {
    37.         return;
    38.       }
    39.       if (rmContext.isHAEnabled()) {
    40.         try {
    41.           // Transition to standby and reinit active services
    42.           LOG.info("Transitioning RM to Standby mode");
    43.           transitionToStandby(true);
    44.           EmbeddedElector elector = rmContext.getLeaderElectorService();
    45.           if (elector != null) {
    46.             elector.rejoinElection();
    47.           }
    48.         } catch (Exception e) {
    49.           LOG.fatal("Failed to transition RM to Standby mode.", e);
    50.           ExitUtil.terminate(1, e);
    51.         }
    52.       }
    53.     }
    54. }

    在线程中进行状态转换的过程中,有个细节需要注意:

    如果进行转换时,RM的当前状态为active,那么此时会停止activeService并重新初始化,即重新创建一个新的实例对象出来。而前面的原子变量,也会随着新的实例对象重新被赋值为false。

    1. synchronized void transitionToStandby(boolean initialize)
    2.     throws Exception {
    3.     if (rmContext.getHAServiceState() ==
    4.         HAServiceProtocol.HAServiceState.STANDBY) {
    5.         LOG.info("Already in standby state");
    6.         return;
    7.     }
    8.     LOG.info("Transitioning to standby state");
    9.     HAServiceState state = rmContext.getHAServiceState();
    10.     rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
    11.     if (state == HAServiceProtocol.HAServiceState.ACTIVE) {
    12.         stopActiveServices();
    13.         reinitialize(initialize);
    14.     }
    15.     LOG.info("Transitioned to standby state");
    16. }
    17. void reinitialize(boolean initialize) {
    18.     ClusterMetrics.destroy();
    19.     QueueMetrics.clearQueueMetrics();
    20.     getResourceScheduler().resetSchedulerMetrics();
    21.     if (initialize) {
    22.         resetRMContext();
    23.         createAndInitActiveServices(true);
    24.     }
    25. }
    26. protected void createAndInitActiveServices(boolean fromActive) {
    27.     activeServices = new RMActiveServices(this);
    28.     activeServices.fromActive = fromActive;
    29.     activeServices.init(conf);
    30. }

    同时,此时会走重新初始化建立连接的逻辑流程,因此,这里是可以正确进行重连。但此后,active的状态切换为standby,在未成为active之前,如果继续出现会话过期后的重连ZK失败,那么仍旧会出现无法再重连zk的问题。

    【可以稳定复现的方式】

    清楚问题产生的场景后,也就能比较容易的进行问题复现了,我们可以通过iptables丢弃从zk过来的数据包进行模拟。例如在与ZK的连接断开一段时间后,再执行下面的脚本命令,这样,问题现象大概率就复现出来了。

    1. #!/bin/bash
    2. # 恢复网络
    3. iptables -F
    4. # 短暂睡眠,使其可以重连成功
    5. sleep 0.3
    6. # 再次模拟与ZK的网络异常
    7. iptables -A INPUT -p tcp --sport 2181 -j DROP

    【问题解决】

    问题的解决其实也很简单,比如去除原子布尔变量的判断逻辑,同时在后续的执行动作中加锁保护,避免多线程并发操作;另一种更简单的方式是启用curator框架,新版本中大多引入了该框架,只是默认为false,即没有使用,可以配置使用该框架,也能对这个问题进行规避。

    好了,这就是本文的全部内容,如果觉得本文对您有帮助,请点赞+转发,也欢迎加我微信交流~

  • 相关阅读:
    MySQL事务底层原理
    linux基础知识之文件系统 df/du/fsck/dump2fs
    项目7-音乐播放器5+注册账号
    Prometheus+Grafana可视化监控【MySQL状态】
    供电企业的福音,远程抄表系统无线解决方案,让工程师告别奔波
    【虹科干货】什么是信号调制?
    gpnmb+ gpnmb-AT2 cell空转映射 上皮细胞的空转映射
    【ArcGIS】基本概念-空间参考与变换
    嵌入式分享合集28
    【软件建模与设计】-02-UML图
  • 原文地址:https://blog.csdn.net/hncscwc/article/details/126672761