• Zookeeper断网重连事件回调源码分析


    “不积跬步,无以至千里。”

    背景

    • 确定使用Curator作为zk客户端的情况下,断网[发生SUSPENDED | LOST事件]重连后每次都会回调org.apache.curator.framework.state.ConnectionStateListener#stateChanged方法,且事件类型为org.apache.curator.framework.state.ConnectionState#RECONNECTED
    • 部署zookeeper的版本为最新稳定版3.8.3,curator-recipes相关依赖的版本为5.5.0

    源码分析过程

    • 首先需要构建一个CuratorFramework对象,并基于这个CuratorFramework对象创建一个用于实现Leader选举功能的LeaderSelector,并将它启动

      public static final String leaderSelectorPath = "/source-code-analyse/reconnect";
      public static void main(String[] args) throws InterruptedException {
           CuratorFramework curatorFramework = newCuratorFramework();
           LeaderSelectorListener leaderSelectorListener = new LeaderSelectorListener() {
               @Override
               public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                   System.out.println("Thread " + Thread.currentThread().getName() + " Connection state changed : " + connectionState);
               }
      
               @Override
               public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                   System.out.println("Thread " + Thread.currentThread().getName() + " get the leader.");
                   TimeUnit.SECONDS.sleep(20);
               }
           };
           LeaderSelector leaderSelector = new LeaderSelector(curatorFramework, leaderSelectorPath, leaderSelectorListener);
           leaderSelector.start();
           TimeUnit.SECONDS.sleep(100);
           leaderSelector.close();
           curatorFramework.close();
           System.out.println("Test completed.");
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
        public static CuratorFramework newCuratorFramework() {
           CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                   .connectString("192.168.0.104:2181")
                   .sessionTimeoutMs(30000)
                   .connectionTimeoutMs(6000)
                   .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                   .threadFactory(ThreadUtils.newThreadFactory("ReconnectionTestThread")).build();
           curatorFramework.start();
           return curatorFramework;
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
    • 由于LeaderSelector的功能实现需要基于CuratorFramework,于是应该先看看CuratorFramework的start方法,直接看实现类CuratorFrameworkImpl

      @Override
      public void start()
      {
        log.info("Starting");
        if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
        {
          throw new IllegalStateException("Cannot be started more than once");
        }
        try
        {
          connectionStateManager.start();
          //省略代码
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    • 发现CuratorFrameworkImpl内部维护了一个与连接状态管理器,start方法中会启动它

    • ConnectionStateManager的start方法中,会向线程池提交一个任务,去调用processEvents方法

      public void start()
      {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
        service.submit
          (
          new Callable<Object>()
          {
            @Override
            public Object call() throws Exception
            {
              processEvents();
              return null;
            }
          }
        );
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
    • processEvents方法里面核心的内容就是,从eventQueue的一个阻塞队列中不断调用poll方法获取ConnectionState对象,因为处在一个while循环中,只要当前连接状态正常,就会一直去poll

      private void processEvents()
       {
           while ( state.get() == State.STARTED )
           {
               try
               {
                   int useSessionTimeoutMs = getUseSessionTimeoutMs();
                   long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
                   long pollMaxMs = useSessionTimeoutMs - elapsedMs;
      
                   final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
                   if ( newState != null )
                   {
                       if ( listeners.isEmpty() )
                       {
                           log.warn("There are no ConnectionStateListeners registered.");
                       }
      
                       listeners.forEach(listener -> listener.stateChanged(client, newState));
                   }
                   else if ( sessionExpirationPercent > 0 )
                   {
                       synchronized(this)
                       {
                           checkSessionExpiration();
                       }
                   }
      
                   synchronized(this)
                   {
                       if ( (currentConnectionState == ConnectionState.LOST) && client.getZookeeperClient().isConnected() )
                       {
                           // CURATOR-525 - there is a race whereby LOST is sometimes set after the connection has been repaired
                           // this "hack" fixes it by forcing the state to RECONNECTED
                           log.warn("ConnectionState is LOST but isConnected() is true. Forcing RECONNECTED.");
                           addStateChange(ConnectionState.RECONNECTED);
                       }
                   }
               }
               catch ( InterruptedException e )
               {
                   // swallow the interrupt as it's only possible from either a background
                   // operation and, thus, doesn't apply to this loop or the instance
                   // is being closed in which case the while test will get it
               }
           }
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
    • 随后遍历所有的ConnectionStateListener,回调stateChanged方法,LeaderSelector有一个静态内部类叫做WrappedListener实现了LeaderSelectorListener,则这个WrappedListener的stateChanged方法会被回调

      @Override
      public void stateChanged(CuratorFramework client, ConnectionState newState)
       {
           try
           {
               listener.stateChanged(client, newState);
           }
           catch ( CancelLeadershipException dummy )
           {
               // If we cancel only leadership but not whole election, then we could hand over
               // dated leadership to client with no further cancellation. Dated leadership is
               // possible due to separated steps in leadership acquire: server data(e.g. election sequence)
               // change and client flag(e.g. hasLeadership) set.
               leaderSelector.cancelElection();
           }
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
    • 而上面的listener.stateChanged(client, newState)中listener变量就是构造LeaderSelector时传入的第三个构造参数:LeaderSelectorListener,就是我们自己实现的LeaderSelectorListener

      所以最终会回调到我们自定义的LeaderSelectorListener#stateChanged()方法

    • 那么现在需要搞清楚ConnectionStateManager中的eventQueue是在哪里被放进去的

    • 追溯一下方法调用,发现eventQueue中的元素,是在ConnectionStateManager#postState方法中offer进去的

      private void postState(ConnectionState state)
      {
        log.info("State change: " + state);
        notifyAll();
        //如果队列满了,offer失败,会先poll,之后继续offer
        while ( !eventQueue.offer(state) )
        {
          eventQueue.poll();
          log.warn("ConnectionStateManager queue full - dropping events to make room");
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
    • 继续追溯来到org.apache.curator.framework.state.ConnectionStateManager#addStateChange方法

      public synchronized boolean addStateChange(ConnectionState newConnectionState)
      {
        //如果client不是启动状态直接返回false
        if ( state.get() != State.STARTED )
        {
          return false;
        }
        ConnectionState previousState = currentConnectionState;
        //如果新的连接状态和前一个一样,说明连接状态没有发生变化,不产生事件,直接返回了
        if ( previousState == newConnectionState )
        {
          return false;
        }
        currentConnectionState = newConnectionState;
        ConnectionState localState = newConnectionState;
        boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED) || (newConnectionState == ConnectionState.READ_ONLY));
        //如果是第一次连接,设置状态为CONNECTED
        if ( !isNegativeMessage && initialConnectMessageSent.compareAndSet(false, true) )
        {
          localState = ConnectionState.CONNECTED;
        }
        postState(localState);
        return true;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
    • 继续看addStateChange方法被org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection调用

      void validateConnection(Watcher.Event.KeeperState state)
      {
        //state为Disconnected的时候产生SUSPENDED事件
        if ( state == Watcher.Event.KeeperState.Disconnected )
        {
          suspendConnection();
        }
        //state为Expired的时候产生LOST事件
        else if ( state == Watcher.Event.KeeperState.Expired )
        {
          connectionStateManager.addStateChange(ConnectionState.LOST);
        }
        //state为SyncConnected的时候产生RECONNECTED事件
        else if ( state == Watcher.Event.KeeperState.SyncConnected )
        {
          connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
        }
        //state为ConnectedReadOnly的时候产生READ_ONLY事件
        else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
        {
          connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
    • 继续追溯validateConnection()的调用方是org.apache.curator.framework.imps.CuratorFrameworkImpl#processEvent

      private void processEvent(final CuratorEvent curatorEvent)
      {
        //只有事件类型是WATCHED时候,会调用这个validateConnection方法,连接状态的变更事件就是WARCHED
        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
        {
          validateConnection(curatorEvent.getWatchedEvent().getState());
        }
        //省略代码
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • 这个processEvent方法在连接状态发生变化时,会被CuratorFrameworkImplCuratorZookeeperClient传入的一个匿名内部类Watcher给调用

      public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
      {
        //这个ZookeeperFactory就是Curator创建Zookeeper的一个工厂
        ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory(), builder.getZkClientConfig());
          this.client = new CuratorZookeeperClient
              (
                  localZookeeperFactory,
                  builder.getEnsembleProvider(),
                  builder.getSessionTimeoutMs(),
                  builder.getConnectionTimeoutMs(),
                  builder.getWaitForShutdownTimeoutMs(),
                  new Watcher()
                  {
                      @Override
                      public void process(WatchedEvent watchedEvent)
                      {
                          CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                          processEvent(event);
                      }
                  },
                  builder.getRetryPolicy(),
                  builder.canBeReadOnly()
              );
              //省略代码
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
    • 并且在CuratorZookeeperClient构造函数中,创建了一个ConnectionState对象,用来管理客户端与zk的连接事件,同时把刚才的Watcher作为构造参数传给了ConnectionState,放到一个parentWatchers的队列中

        public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
               int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
               RetryPolicy retryPolicy, boolean canBeReadOnly)
       {
           if ( sessionTimeoutMs < connectionTimeoutMs )
           {
               log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
           }
      
           retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
           ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
      
           this.connectionTimeoutMs = connectionTimeoutMs;
           this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;
           //创建了一个ConnectionState对象,管理客户端与zk的连接状态
           state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, watcher, tracer, canBeReadOnly);
           setRetryPolicy(retryPolicy);
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
       ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
       {
           this.ensembleProvider = ensembleProvider;
           this.tracer = tracer;
           if ( parentWatcher != null )
           {
               //把匿名内部类的Watcher对象传进来,放到parentWatchers中
               parentWatchers.offer(parentWatcher);
           }
      
           handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    • 然后在ConnectionState对象中看看哪些地方使用了这个parentWatchers对象,发现是一个process()方法

       @Override
       public void process(WatchedEvent event)
       {
           if ( LOG_EVENTS )
           {
               log.debug("ConnectState watcher: " + event);
           }
      
           if ( event.getType() == Watcher.Event.EventType.None )
           {
               boolean wasConnected = isConnected.get();
               boolean newIsConnected = checkState(event.getState(), wasConnected);
               if ( newIsConnected != wasConnected )
               {
                   isConnected.set(newIsConnected);
                   connectionStartMs = System.currentTimeMillis();
                   if ( newIsConnected )
                   {
                       lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs());
                       log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
                   }
               }
           }
      
           for ( Watcher parentWatcher : parentWatchers )
           {
               OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
               //遍历Watcher,调用process方法,目前已知是在CuratorFrameworkImpl构造器中new的一个匿名Watcher,会回到我们自定义的ConnectionStateListener
               parentWatcher.process(event);
               trace.commit();
           }
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
    • 那么ConnectionState#process方法又是在哪里被调用的呢?这个找的有点深了,最终经过断点发现是在org.apache.zookeeper.ClientCnxn.EventThread#processEvent中被调用

      private void processEvent(Object event) {
        try {
          if (event instanceof WatcherSetEventPair) {
            // each watcher will process the event
            WatcherSetEventPair pair = (WatcherSetEventPair) event;
            for (Watcher watcher : pair.watchers) {
              try {
                watcher.process(pair.event);
              } catch (Throwable t) {
                LOG.error("Error while calling watcher ", t);
              }
            }
           //省略代码
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
    • 这个ClientCnxn已经不是Curator的源码了,属于Zookeeper原生API,是最底层用来管理客户端和zookeeper连接的一个组件,在new Zookeeper的时候被初始化,这个Zookeeper之前提了一下,会被Curator框架封装在ConnectionState中

      ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
      {
          this.ensembleProvider = ensembleProvider;
          this.tracer = tracer;
          if ( parentWatcher != null )
          {
              parentWatchers.offer(parentWatcher);
          }
      
          //这个zookeeperFactory里面封装了获取Zookepper的方法
          handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    • org.apache.zookeeper.ClientCnxn.EventThread#processEvent方法又是在org.apache.zookeeper.ClientCnxn.EventThread#run中调用,因为EventThread这个内部类继承了Thread类,所以在创建Zookeeper的时候就调用start()将线程启动了,同时启动的还有SendThread

      public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                       boolean canBeReadOnly)
        throws IOException
        //省略代码... ...
        cnxn = createConnection(
              connectStringParser.getChrootPath(),
              hostProvider,
              sessionTimeout,
              this.clientConfig,
              watcher,
              getClientCnxnSocket(),
              canBeReadOnly);
         cnxn.start();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      public void start() {
        sendThread.start();
        eventThread.start();
      }
      
      • 1
      • 2
      • 3
      • 4
    • 跟踪EventThread源码,可以看到,这个线程的run方法中也是采用while循环的方式不断从一个叫做waitingEvents的阻塞队列中take事件

      private final LinkedBlockingQueue<Object> waitingEvents =
                  new LinkedBlockingQueue<Object>();
      @Override
      @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
      public void run() {
        try {
          isRunning = true;
          while (true) {
            Object event = waitingEvents.take();
            //如果不是一个new Object对象,交给processEvent方法处理
            if (event == eventOfDeath) {
              wasKilled = true;
            } else {
              processEvent(event);
            }
            //省略无关代码... ...
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    • 那么重点就是这个waitingEvents的元素是在哪里add的?

    • 在ClientCnxn中拿这个变量搜索一下,发现有两个地方会add,一个是queueEvent方法,一个是queuePacket方法,显然根据名字来看,第二个应该是添加和ZK进行交互的具体数据的(而后通过打断点的方式也确实验证了这一点),而queueEvent()才是用来添加事件数据的

      public void queueEvent(WatchedEvent event) {
           queueEvent(event, null);
       }
      
      private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
          if (event.getType() == EventType.None && sessionState == event.getState()) {
              return;
          }
          sessionState = event.getState();
          final Set<Watcher> watchers;
          if (materializedWatchers == null) {
              // materialize the watchers based on the event
              watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath());
          } else {
              watchers = new HashSet<>(materializedWatchers);
          }
          WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
          // queue the pair (watch set & event) for later processing
          waitingEvents.add(pair);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
    • 也就是说,当客户端和ZK server连接状态变更时(如重连)一定会在某个地方调用这个queueEvent方法,把变更状态放到阻塞队列中,等待消费

    • 这块代码比较复杂,有兴趣可以自主阅读org.apache.zookeeper.ClientCnxn.SendThread源码

    • 简单的说,这块的处理流程是这样的:Zookeerper被创建的时候,会创建ClientCnxn,启动两个线程,一个是eventThread,另一个就是sendThread

    • 这个SendThread主要作用就是用来跟zk通信的,而且还会搞一个心跳机制,定期去和zk ping一下,确定连接是正常的

    • 在SendThread的run方法里有一个while循环,会检查如果你是断网状态,会不停的通过ClientCnxnSocket重新建立连接,连不上会重复进行此步骤

      //如果不是连接状态,会一直尝试建立连接,有兴趣的可以去startConnect方法看看,如果失败,会被外层的Catch块捕获,然后继续来到while循环,重新尝试建立连接
      if (!clientCnxnSocket.isConnected()) {
        // don't re-establish connection if we are closing
        if (closing) {
              break;
          }
          if (rwServerAddress != null) {
              serverAddress = rwServerAddress;
              rwServerAddress = null;
          } else {
              serverAddress = hostProvider.next(1000);
          }
          onConnecting(serverAddress);
          //这个方法中,最后会通过clientCnxnSocket组件连接zk,clientCnxnSocket.connect(addr);
          startConnect(serverAddress);
          // Update now to start the connection timer right after we make a connection attempt
          clientCnxnSocket.updateNow();
          clientCnxnSocket.updateLastSendAndHeard();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
    • 一旦重新重新建立,会在org.apache.zookeeper.ClientCnxn.SendThread#run方法中调用clientCnxnSocket.doTransport,开始和zk收发数据包

      //pengingQueue是已经发送并正在等待响应的数据包
      clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
      
      • 1
      • 2
    • doTransport方法里面是NIO的代码,有兴趣可以自己研究下

    • 最终会在org.apache.zookeeper.ClientCnxnSocket#readConnectResult读取zk响应的数据包,调用org.apache.zookeeper.ClientCnxn.SendThread#onConnected方法,将数据放入waitingEvents阻塞队列中

      void readConnectResult() throws IOException {
        //省略无关代码
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
      }
      
      • 1
      • 2
      • 3
      • 4

      因为我们和ZK建立的不是一个只读连接,所以事件类型会是SyncConnected

      void onConnected(
                int _negotiatedSessionTimeout,
                long _sessionId,
                byte[] _sessionPasswd,
                boolean isRO) throws IOException {
         //省略无关代码
        KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
        eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

      前面已经看到代码,在validate的时候,如果KeeperState是KeeperState.SyncConnected,会触发RECONNECTED事件,最终回调到我们自定义的ConnectionStateListener#stateChanged方法中

    • 有兴趣的可以根据我的思路进行断点调试验证,不过有一些事异步的,注意打断点的时机

    验证结果

    • 使用CuratorFramework作为zookeeper客户端连接工具时,当发生断网重连时在自定义的ConnectionStateListener的stateChanged方法中确定会产生RECONNECTED事件
  • 相关阅读:
    vue3之el-table单选
    6.2 整合MongoDB
    5G消息精准触达,”解锁“新场景
    单调队列优化DP
    为什么短时傅里叶变换无法实现小波等优秀时频方法对时频分布的提取效果?
    Debian常用命令
    数字IC设计笔试题汇总(四):一些基础知识点
    TreeMap类的继承关系简介说明
    二、CSS下拉菜单[颜色布局、子影响父]
    docker容器不能访问外网解决
  • 原文地址:https://blog.csdn.net/Josh_scott/article/details/133826783