• 【Flink源码】JobManager源码之启动WebMonitorEndpoint


    【Flink源码】JobManager 启动流程 一文中,笔者分析了 JobManager 的启动流程,并讲到 JobManager 启动了 ResourceManager、Dispatcher 和 WebMonitorEndpoint 三大核心组件。
    本文将就其中 WebMonitorEndpoint 启动过程的源码进行探究

    首先,我们先要讲述一下 WebMonitorEndpoint 的主要功能:
    WebMonitorEndpoint 里面维护了很多 Handler,如果客户端通过 flink run 的方式来提交一个 job 到 flink 集群,最终,是由 WebMonitorEndpoint 来接收,并且决定使用哪一个 Handler 来执行处理


    WebMonitorEndpoint 启动流程解析

    按前文所述,在 JobManager 创建完工厂类后,首先创建的实例为 WebMonitorEndpoint 实例

    DefaultDispatcherResourceManagerComponentFactory.java

    // 创建 WebMonitorEndpoint 实例,在 Standalone 模式下为:DispatcherRestEndpoint
    // 该实例内部会启动一个 Netty 服务端,绑定了一堆 Handler
    webMonitorEndpoint =
            restEndpointFactory.createRestEndpoint(
                    configuration,
                    dispatcherGatewayRetriever,
                    resourceManagerGatewayRetriever,
                    blobServer,
                    executor,
                    metricFetcher,
                    highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                    fatalErrorHandler);
    // 启动 WebMonitorEndpoint
    log.debug("Starting Dispatcher REST endpoint.");
    webMonitorEndpoint.start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    下面我们从 start 方法讲起
    该方法是 WebMonitorEndpoint 类下的方法,继承了 RestServerEndpoint 类,找到该类
    由于代码很长,我们分段来看

    Handler 相关操作

    RestServerEndpoint.java

    // 路由器,解析请求寻找对应 Handler
    final Router router = new Router();
    final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
    // 初始化 Handlers
    handlers = initializeHandlers(restAddressFuture);
    
    /* sort the handlers such that they are ordered the following:
        * /jobs
        * /jobs/overview
        * /jobs/:jobid
        * /jobs/:jobid/config
        * /:*
        */
    // 排序,为了确认 URL 和 Handler 的一一对应的关系,不应出现一对多的情况
    Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
    // 确认唯一性方法
    checkAllEndpointsAndHandlersAreUnique(handlers);
    // 注册 Handler
    handlers.forEach(handler -> registerHandler(router, handler, log));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    这一段主要包含 Handler 相关操作,包括:

    1. 首先创建 Router 来解析 Client 的请求并寻找对应的 Handler
    2. 通过 initializeHandlers 方法注册了一堆 Handler
    3. 将这些 Handler 进行排序,这里的排序是为了确认 URL 和 Handler 一对一的关系
    4. 排序好后通过 checkAllEndpointsAndHandlersAreUnique 方法来确认唯一性
    5. 确认唯一性后将 Handler 逐一注册

    Netty 启动的相关操作

    NioEventLoopGroup bossGroup =
            new NioEventLoopGroup(
                    1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
    NioEventLoopGroup workerGroup =
            new NioEventLoopGroup(
                    0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
    
    bootstrap = new ServerBootstrap();
    bootstrap
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(initializer);
    
    Iterator<Integer> portsIterator;
    try {
        portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
    } catch (IllegalConfigurationException e) {
        throw e;
    } catch (Exception e) {
        throw new IllegalArgumentException(
                "Invalid port range definition: " + restBindPortRange);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    这一段主要是 Netty 启动的相关操作,为了解决端口冲突问题,Flink 做了如下操作

    // 为防止端口冲突,将逐一尝试端口是否可用
    int chosenPort = 0;
    while (portsIterator.hasNext()) {
        try {
            chosenPort = portsIterator.next();
            final ChannelFuture channel;
            if (restBindAddress == null) {
                channel = bootstrap.bind(chosenPort);
            } else {
                channel = bootstrap.bind(restBindAddress, chosenPort);
            }
            serverChannel = channel.syncUninterruptibly().channel();
            break;
        } catch (final Exception e) {
            // syncUninterruptibly() throws checked exceptions via Unsafe
            // continue if the exception is due to the port being in use, fail early
            // otherwise
            if (!(e instanceof java.net.BindException)) {
                throw e;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    修改状态并启动其他基础服务

    // 修改状态
    state = State.RUNNING;
    // 启动其他基础服务
    startInternal();
    
    • 1
    • 2
    • 3
    • 4

    在方法末尾修改一下 EndPoint 状态为 RUNNING,至此,WebMonitorEndpoint 的 Netty 服务就启动完毕了,接下来通过 startInternal 方法启动其他基础服务

    节点选举以及方法回调

    我们进入 startInternal 方法内部

    public void startInternal() throws Exception {
        // 开始选举,去找回调方法
        leaderElectionService.start(this);
        startExecutionGraphCacheCleanupTask();
    
        if (hasWebUI) {
            log.info("Web frontend listening at {}.", getRestBaseUrl());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    可以看到 WebMonitorEndpoint 准备开始进行 Leader 竞选
    首先介绍一下主节点内部组件的 Leader 选举机制

    Flink 的选举使用的是 Curator 框架,节点的选举针对每一个参选对象,会创建一个选举驱动 leaderElectionDriver,在完成选举之后,会回调两个方法。
    如果选举成功会回调 isLeader 方法,如果失败则回调 notLeader 方法

    基于此,我们来看 leaderElectionService.start 方法

    DefaultLeaderElectionService.java

    public final void start(LeaderContender contender) throws Exception {
        checkNotNull(contender, "Contender must not be null.");
        Preconditions.checkState(leaderContender == null, "Contender was already set.");
    
        synchronized (lock) {
            // 在 WebMonitorEndpoint 中调用时,在 contender 为 DispatcherRestEndPoint
            // 在 ResourceManager 中调用时,contender 为 ResourceManager
            // 在 DispatcherRunner 中调用时,contender 为 DisoatcherRunner
            running = true;
            leaderContender = contender;
            // 创建选举对象 leaderElectionDriver
            leaderElectionDriver =
                    leaderElectionDriverFactory.createLeaderElectionDriver(
                            this,
                            new LeaderElectionFatalErrorHandler(),
                            leaderContender.getDescription());
            LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在 Standalone 模式下,WebMonitorEndpoint、ResourceManager、DispatcherRunner 都会使用该模式来进行竞选,此刻我们是从 WebMonitorEndpoint 进入的此方法,此时 contender 对象实际为 DispatcherRestEndpoint。
    我们继续看 leaderElectionDriver 的构建
    进入 createLeaderElectionDriver 方法,选择 ZooKeeperLeaderElectionDriverFactory 实现

    ZooKeeperLeaderElectionDriverFactory.java

    public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
            LeaderElectionEventHandler leaderEventHandler,
            FatalErrorHandler fatalErrorHandler,
            String leaderContenderDescription)
            throws Exception {
        return new ZooKeeperLeaderElectionDriver(
                client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ZooKeeperLeaderElectionDriver.java

    public ZooKeeperLeaderElectionDriver(
            CuratorFramework client,
            String path,
            LeaderElectionEventHandler leaderElectionEventHandler,
            FatalErrorHandler fatalErrorHandler,
            String leaderContenderDescription)
            throws Exception {
        checkNotNull(path);
        this.client = checkNotNull(client);
        this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
        this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
        this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
    
        leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
        leaderLatch = new LeaderLatch(client, leaderLatchPath);
        this.cache =
                ZooKeeperUtils.createTreeCache(
                        client,
                        connectionInformationPath,
                        this::retrieveLeaderInformationFromZooKeeper);
    
        running = true;
        // 开始选举
        leaderLatch.addListener(this);
        leaderLatch.start();
        // 选举开始后,会接收到响应:
        // 1.如果竞选成功,则回调该类的 isLeader 方法
        // 2.如果竞选失败,则回调该类的 notLeader 方法
        // 每一个竞选者对应一个竞选 Driver
        cache.start();
    
        client.getConnectionStateListenable().addListener(listener);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    接下来我们继续看该类的 isLeader 方法

    public void isLeader() {
        leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
    }
    
    • 1
    • 2
    • 3

    再进入 onGrantLeadership 方法

    @Override
    @GuardedBy("lock")
    public void onGrantLeadership(UUID newLeaderSessionId) {
        synchronized (lock) {
            if (running) {
                issuedLeaderSessionID = newLeaderSessionId;
                clearConfirmedLeaderInformation();
    
                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "Grant leadership to contender {} with session ID {}.",
                            leaderContender.getDescription(),
                            issuedLeaderSessionID);
                }
    
                /**
                    * 有 4 种竞选者类型,LeaderContender 有 4 种情况
                    * 1.Dispatcher = DefaultDispatcherRunner
                    * 2.JobMaster = JobManagerRunnerImpl
                    * 3.ResourceManager = ResourceManager
                    * 4.WebMonitorEndpoint = WebMonitorEndpoint
                    */
                leaderContender.grantLeadership(issuedLeaderSessionID);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "Ignoring the grant leadership notification since the {} has "
                                    + "already been closed.",
                            leaderElectionDriver);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    再进入 leaderContender.grantLeadership 方法,引入当前是 WebMonitorEndpoint 的选举,所以我们进入 WebMonitorEndpoint 的实现

    @Override
    public void grantLeadership(final UUID leaderSessionID) {
        log.info(
                "{} was granted leadership with leaderSessionID={}",
                getRestBaseUrl(),
                leaderSessionID);
        leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    @Override
    public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "Confirm leader session ID {} for leader {}.", leaderSessionID, leaderAddress);
        }
    
        checkNotNull(leaderSessionID);
    
        synchronized (lock) {
            if (hasLeadership(leaderSessionID)) {
                if (running) {
                    // 确认 Leader 信息,并将节点信息写入 Zookeeper
                    confirmLeaderInformation(leaderSessionID, leaderAddress);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(
                                "Ignoring the leader session Id {} confirmation, since the "
                                        + "LeaderElectionService has already been stopped.",
                                leaderSessionID);
                    }
                }
            } else {
                // Received an old confirmation call
                if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(
                                "Receive an old confirmation call of leader session ID {}, "
                                        + "current issued session ID is {}",
                                leaderSessionID,
                                issuedLeaderSessionID);
                    }
                } else {
                    LOG.warn(
                            "The leader session ID {} was confirmed even though the "
                                    + "corresponding JobManager was not elected as the leader.",
                            leaderSessionID);
                }
            }
        }
    }
    
    @GuardedBy("lock")
    private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
        confirmedLeaderInformation = LeaderInformation.known(leaderSessionID, leaderAddress);
        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    WebMonitorEndpoint 在选举 Leader 成功后,并没有做什么,只是将自己的信息写入 Zookeeper
    信息写入完毕后,WebMonitorEndpoint 就算是启动完成了


    总结
    WebMonitorEndpoint 的启动流程并不复杂,主要如下:

    1. 初始化一堆 Handler
    2. 启动 Netty 服务,注册 Handler
    3. 启动内部服务:执行竞选,WebMonitorEndpoint 本身就是一个 LeaderContender 角色
    4. 竞选成功,其实只是把 WebMonitorEndpoint 的 address 以及和 zookeeper 的 sessionId 写入 znode 中
  • 相关阅读:
    电力电子转战数字IC20220629day35——路科实验2b
    【数据结构】【栈与队列】循环队列的实现及基本操作(使用顺序队列)(可直接运行)
    FL Studio21中文版本新增功能FL2023完整版
    小满Vue3第四十二章(环境变量)
    在 Rust 中实现 TCP : 4. 完成握手
    通过JS脚本检查浏览器对视频编码的支持情况
    消息治理,到底需要治理哪些内容?
    React入门学习(一)
    排序算法(1)
    Node.js中的Buffer
  • 原文地址:https://blog.csdn.net/wwb44444/article/details/127722827