• Flink1.15源码解析--选举


    角色说明
    LeaderContender(竞选者)需要选主的主体,比如dispatcher、resource manager。当选leader或者回收leader时进行回调
    LeaderElectionService(竞选服务)负责选举的服务。a.关联leaderElectionDriver实现,具体驱动实现可以是zk、k8s等;在启动时初始化驱动类;b.实现LeaderElectionEventHandler调用
    LeaderElectionDriver(竞选服务驱动)选主具体驱动实现。以ZooKeeperLeaderElectionDriver实现为例,其会实现LeaderLatchListener回调接口,在主节点变化和节点变化时得到监听,然后调用 LeaderElectionEventHandler
    LeaderElectionEventHandler(竞选服务的事件处理类)leader竞选回调。被选为Leader会调用LeaderContender的grantLeadership方法进行不同实现主体的后续逻辑,比如DispatcherRunner的grantLeadership后会创建Dispatcher服务,启动DispatcherBootstrap
    LeaderRetrievalService(leader监听服务)监听leader的变化,传递给监听者。实现与LeaderElectionService类似,但作用不同。主要是接收Leader信息然后通知监听者LeaderRetrievalListener,监听者
    LeaderRetriever(监听者)监听leader的监听者,实现了LeaderRetrievalListener接口。具体实现为RpcGatewayRetriever。RpcGatewayRetriever在回调中可以获取到leader的信息,创建akka的连接,生成aop代理类实例
    RpcGatewayRcp网关,实现类通过AOP的方式封装了akka层的细节,可以直接调用实现类方法实现akka通信

    一、LeaderContender

    其中 LeaderContender 接口主要在 leader 选举中使用,代表了参与leader竞争的角色
    其实现类有

    • JobMasterServiceLeadershipRunner
    • ResourceManager
    • DefaultDispatcherRunner
    • WebMonitorEndpoint

    该接口中包含了两个重要的方法:

      1.  grantLeadership,表示leader竞选成功的回调方法
    
      2.  revokeLeadership,表示由leader变为非leader的回调方法
    
    • 1
    • 2
    • 3

    一个 服务需要进行选举, 在启动时,将自身作为竞争者,传递给了 leaderElectionService。

        @Override
    
        public void start() throws Exception {
    
            LOG.debug("Start leadership runner for job {}.", getJobID());
    
            leaderElectionService.start(this);
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    二、LeaderElectionService

    Leader选举服务 ,以其子类 DefaultLeaderElectionService 为例

    在这里插入图片描述

    2.1、LeaderElectionService

    LeaderElectionService主要提供了参与选举竞争的接口调用以及竞争结果的查询接口:
    在这里插入图片描述

    2.2、LeaderElectionEventHandler(竞选服务的事件处理类)

    leader竞选回调。被选为Leader会调用LeaderContender的grantLeadership方法进行不同实现主体的后续逻辑,比如DispatcherRunner的grantLeadership后会创建Dispatcher服务,启动DispatcherBootstrap

    在这里插入图片描述

    在开始参加 Leader 时(DefaultleaderElectionService::start ),会通过选举驱动器工厂创建一个 leaderElectionDriver,通过这个Driver工厂类,Flink 将基于 zookeeper 的 CuratorFramework 的细节,与 Flink 本身做了解耦

    并将自身作为一个 LeaderElectionEventHandler 传入leaderElectionDriver。

        @Override
        public final void start(LeaderContender contender) throws Exception {
            checkNotNull(contender, "Contender must not be null.");
            Preconditions.checkState(leaderContender == null, "Contender was already set.");
    
            synchronized (lock) {
                running = true;
                leaderContender = contender;
                
                leaderElectionDriver =
                        leaderElectionDriverFactory.createLeaderElectionDriver(
                                this,
                                new LeaderElectionFatalErrorHandler(),
                                leaderContender.getDescription());
                LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    三 、LeaderElectionDriver

    3.1、LeaderElectionDriver

    LeaderElectionDriver 选主具体驱动实现。以ZooKeeperLeaderElectionDriver实现为例,其会实现 LeaderLatchListener 回调接口,在主节点变化和节点变化时得到监听,然后调用 LeaderElectionEventHandler

    3.2、LeaderLatchListener

    
    public interface LeaderLatchListener {
    	// 选举成功回调方法
        void isLeader();
    	
        void notLeader();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.3、 以其 子类实现 ZooKeeperLeaderElectionDriver 为例

    在这里插入图片描述

    通过工厂创建

    	// 通过工厂创建 
        @Override
        public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
                LeaderElectionEventHandler leaderEventHandler,  // leaderElectionService 对象 作为 leaderEventHandler
                FatalErrorHandler fatalErrorHandler,
                String leaderContenderDescription)
                throws Exception {
            return new ZooKeeperLeaderElectionDriver(
                    client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    ZooKeeperLeaderElectionDriver 封装了 CuratorFramework 作为选举框架

        /**
         * Creates a ZooKeeperLeaderElectionDriver object.
         *
         * @param client Client which is connected to the ZooKeeper quorum
         * @param path ZooKeeper node path for the leader election
         * @param leaderElectionEventHandler Event handler for processing leader change events
         * @param fatalErrorHandler Fatal error handler
         * @param leaderContenderDescription Leader contender description
         */
        public ZooKeeperLeaderElectionDriver(
                CuratorFramework client,
                String path,
                LeaderElectionEventHandler leaderElectionEventHandler, // DefaultLeaderElectionService
                FatalErrorHandler fatalErrorHandler,
                String leaderContenderDescription)
                throws Exception {
            checkNotNull(path);
            this.client = checkNotNull(client);
            this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
            
            this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
            
            this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
            this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
            
            leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
            leaderLatch = new LeaderLatch(client, leaderLatchPath);
            this.cache =
                    ZooKeeperUtils.createTreeCache(
                            client,
                            connectionInformationPath,
                            this::retrieveLeaderInformationFromZooKeeper);
    
            running = true;
    		// 启动选举 
            leaderLatch.addListener(this); // 添加  LeaderLatchListener
            leaderLatch.start();
    
            cache.start();
    
            client.getConnectionStateListenable().addListener(listener);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    3.3.1、LeaderLatch

    3.3.1.1、LeaderLatch 创建

            // 创建 leaderLatch
            leaderLatch = new LeaderLatch(client, leaderLatchPath);
    
    • 1
    • 2

    3.3.1.2、创建 TreeCache

    设置监听
    Curator 相关监听 API 封装 zookeeper 原生API,内部增加重复注册等功能,从而使监听可以重复使用。
    Curator 存在三种类型 API。

    • NodeCache:针对节点增删改操作。
    • PathChildrenCache:针对节点一级目录下节点增删改监听
    • TreeCache:结合 NodeCache 与 PathChildrenCache 操作,不仅可以监听当前节点,还可以监听节点下任意子节点(支持多级)变动。
            this.cache =
                    ZooKeeperUtils.createTreeCache(
                            client,
                            connectionInformationPath,
                            this::retrieveLeaderInformationFromZooKeeper);
    
        private void retrieveLeaderInformationFromZooKeeper() throws Exception {
            if (leaderLatch.hasLeadership()) {
                ChildData childData = cache.getCurrentData(connectionInformationPath);
                // 回调  onLeaderInformationChange  leaderElectionEvnetHandler 是 DefaultLeaderElectionService
                leaderElectionEventHandler.onLeaderInformationChange(
                        childData == null
                                ? LeaderInformation.empty()
                                : ZooKeeperUtils.readLeaderInformation(childData.getData()));
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    回调 onLeaderInformationChange, leaderElectionEvnetHandler 是 DefaultLeaderElectionService

    // org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#onLeaderInformationChange
    
        @Override
        @GuardedBy("lock")
        public void onLeaderInformationChange(LeaderInformation leaderInformation) {
            synchronized (lock) {
                if (running) {
                    if (!confirmedLeaderInformation.isEmpty()) {
                        final LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation;
                        if (leaderInformation.isEmpty()) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(
                                        "Writing leader information by {} since the external storage is empty.",
                                        leaderContender.getDescription());
                            }
                            // 主要逻辑是将 leaderinfo 写入 zookeeper
                            leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
                        } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
                            // the data field does not correspond to the expected leader information
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(
                                        "Correcting leader information by {}.",
                                        leaderContender.getDescription());
                            }
                            leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
                        }
                    }
                } 
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    leaderinfo 写入 zookeeper

        /** Writes the current leader's address as well the given leader session ID to ZooKeeper. */
        @Override
        public void writeLeaderInformation(LeaderInformation leaderInformation) {
            assert (running);
            // this method does not have to be synchronized because the curator framework client
            // is thread-safe. We do not write the empty data to ZooKeeper here. Because
            // check-leadership-and-update
            // is not a transactional operation. We may wrongly clear the data written by new leader.
            if (leaderInformation.isEmpty()) {
                return;
            }
    
            try {
                ZooKeeperUtils.writeLeaderInformationToZooKeeper(
                        leaderInformation,
                        client,
                        leaderLatch::hasLeadership,
                        connectionInformationPath);
    
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Successfully wrote leader information: {}.", leaderInformation);
                }
            } catch (Exception e) {
                fatalErrorHandler.onFatalError(
                        new LeaderElectionException(
                                "Could not write leader address and leader session ID to "
                                        + "ZooKeeper.",
                                e));
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    3.3.1.3、启动选举

            // 将 ZooKeeperLeaderElectionDriver 添加到  leaderLatch 的 listeners 中
            leaderLatch.addListener(this);
            leaderLatch.start();
    
    • 1
    • 2
    • 3
    // org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch#start
        public void start() throws Exception {
            Preconditions.checkState(this.state.compareAndSet(LeaderLatch.State.LATENT, LeaderLatch.State.STARTED), "Cannot be started more than once");
            this.startTask.set(AfterConnectionEstablished.execute(this.client, new Runnable() {
                public void run() {
                    try {
                        LeaderLatch.this.internalStart();
                    } finally {
                        LeaderLatch.this.startTask.set((Object)null);
                    }
    
                }
            }));
        }
    
    
    public class AfterConnectionEstablished {
        private static final Logger log = LoggerFactory.getLogger(AfterConnectionEstablished.class);
    
        public static Future<?> execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception {
            final ExecutorService executor = 
            	ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
            	
            Runnable internalCall = new Runnable() {
                public void run() {
                    try {
                        client.blockUntilConnected();
                        runAfterConnection.run();
                    } catch (Exception var5) {
                        ThreadUtils.checkInterrupted(var5);
                        AfterConnectionEstablished.log.error("An error occurred blocking until a connection is available", var5);
                    } finally {
                        executor.shutdown();
                    }
    
                }
            };
            return executor.submit(internalCall);
        }
    
        private AfterConnectionEstablished() {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    调用 internalStart

        private synchronized void internalStart() {
            if (this.state.get() == LeaderLatch.State.STARTED) {
                this.client.getConnectionStateListenable().addListener(this.listener);
    
                try {
                // 主要逻辑
                    this.reset();
                } catch (Exception var2) {
                    ThreadUtils.checkInterrupted(var2);
                    this.log.error("An error occurred checking resetting leadership.", var2);
                }
            }
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    reset

    异步接口
    Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。

        @VisibleForTesting
        void reset() throws Exception {
        	// 领导权初始化
            this.setLeadership(false);
            this.setNode((String)null);
            
            BackgroundCallback callback = new BackgroundCallback() {
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    if (LeaderLatch.this.debugResetWaitLatch != null) {
                        LeaderLatch.this.debugResetWaitLatch.await();
                        LeaderLatch.this.debugResetWaitLatch = null;
                    }
    
                    if (event.getResultCode() == Code.OK.intValue()) {
                        LeaderLatch.this.setNode(event.getName());
                        if (LeaderLatch.this.state.get() == LeaderLatch.State.CLOSED) {
                            LeaderLatch.this.setNode((String)null);
                        } else {
                        	// 主要逻辑 主要逻辑 主要逻辑
                            LeaderLatch.this.getChildren();
                        }
                    } else {
                        LeaderLatch.this.log.error("getChildren() failed. rc = " + event.getResultCode());
                    }
    
                }
            };
            ((ErrorListenerPathAndBytesable)((ACLBackgroundPathAndBytesable)this.client.create()
            .creatingParentContainersIfNeeded()        	
            .withProtection()
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL))
            .inBackground(callback))
            .forPath(ZKPaths.makePath(this.latchPath, "latch-"), LeaderSelector.getIdBytes(this.id));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    接下来我们来看 主要逻辑 LeaderLatch.this.getChildren();

        private void getChildren() throws Exception {
            BackgroundCallback callback = new BackgroundCallback() {
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    if (event.getResultCode() == Code.OK.intValue()) {
                    	// 校验 领导权
                        LeaderLatch.this.checkLeadership(event.getChildren());
                    }
    
                }
            };
            ((ErrorListenerPathable)this.client.getChildren()
            .inBackground(callback))
            .forPath(ZKPaths.makePath(this.latchPath, (String)null));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    进入 checkLeadership, 我们关注 this.setLeadership(true);

        private void checkLeadership(List<String> children) throws Exception {
            if (this.debugCheckLeaderShipLatch != null) {
                this.debugCheckLeaderShipLatch.await();
            }
    
            final String localOurPath = (String)this.ourPath.get();
            List<String> sortedChildren = LockInternals.getSortedChildren("latch-", sorter, children);
            int ourIndex = localOurPath != null ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
            if (ourIndex < 0) {
                this.log.error("Can't find our node. Resetting. Index: " + ourIndex);
                this.reset();
            } else if (ourIndex == 0) {
                this.setLeadership(true);
            } else {
                String watchPath = (String)sortedChildren.get(ourIndex - 1);
                Watcher watcher = new Watcher() {
                    public void process(WatchedEvent event) {
                        if (LeaderLatch.this.state.get() == LeaderLatch.State.STARTED && event.getType() == EventType.NodeDeleted && localOurPath != null) {
                            try {
                                LeaderLatch.this.getChildren();
                            } catch (Exception var3) {
                                ThreadUtils.checkInterrupted(var3);
                                LeaderLatch.this.log.error("An error occurred checking the leadership.", var3);
                            }
                        }
    
                    }
                };
                BackgroundCallback callback = new BackgroundCallback() {
                    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                        if (event.getResultCode() == Code.NONODE.intValue()) {
                            LeaderLatch.this.reset();
                        }
    
                    }
                };
                ((ErrorListenerPathable)((BackgroundPathable)this.client.getData().usingWatcher(watcher)).inBackground(callback)).forPath(ZKPaths.makePath(this.latchPath, watchPath));
            }
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    3.3.1.4、setLeadership(true) 选举成功,调用回调函数 isLeader 通知
        private synchronized void setLeadership(boolean newValue) {
            boolean oldValue = this.hasLeadership.getAndSet(newValue);
            if (oldValue && !newValue) {
            	// ZooKeeperLeaderElectionDriver
                this.listeners.forEach(LeaderLatchListener::notLeader);
            } else if (!oldValue && newValue) {
                this.listeners.forEach(LeaderLatchListener::isLeader);
            }
    
            this.notifyAll();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    回调实现

    // org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver#isLeader
    
        @Override
        public void isLeader() {
        	// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#start 中  
        	// DefaultLeaderElectionService 将 自己作为 一个 leaderElectionEventHandler
        	// 所以此处选举成功, 通过回调 通知 DefaultLeaderElectionService 选举leader
            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
        }
    
        @Override
        public void notLeader() {
            leaderElectionEventHandler.onRevokeLeadership();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    DefaultLeaderElectionService 响应选举成功,回调 leaderContender.grantLeadership:

        @Override
        @GuardedBy("lock")
        public void onGrantLeadership(UUID newLeaderSessionId) {
            synchronized (lock) {
                if (running) {
                    issuedLeaderSessionID = newLeaderSessionId;
                    clearConfirmedLeaderInformation();
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(
                                "Grant leadership to contender {} with session ID {}.",
                                leaderContender.getDescription(),
                                issuedLeaderSessionID);
                    }
    				// 主要逻辑 leaderContender, 
                    leaderContender.grantLeadership(issuedLeaderSessionID);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(
                                "Ignoring the grant leadership notification since the {} has "
                                        + "already been closed.",
                                leaderElectionDriver);
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    以 WebMonitorEndpoint 为例

        @Override
        public void grantLeadership(final UUID leaderSessionID) {
            log.info(
                    "{} was granted leadership with leaderSessionID={}",
                    getRestBaseUrl(),
                    leaderSessionID);
            leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    至此一个完整的选举流程结束

    参考:
    https://www.modb.pro/db/107324
    https://blog.csdn.net/qq_44836294/article/details/108022739

  • 相关阅读:
    6.4.2 基于GZ文件安装MySQL
    生物信息学中保存键值对的最佳实践
    什么是一致性哈希算法?一致性哈希算法原理刨析
    计算机毕业设计java+ssm舞蹈管理系统(源码+系统+mysql数据库+Lw文档)
    集成学习进阶
    Mac上安装Mysql8.0修改my.cnf配置文件(忽略大小写)
    如何把Bootstrap和JQuery在不能上网环境中生效并用Maven来管理版本?
    哪种烧录单片机的方法合适?
    【玩转 Cloud Studio】以 Rust 为例定制自己的开发环境
    android studio启动虚拟器失败
  • 原文地址:https://blog.csdn.net/wuxintdrh/article/details/127873870