• Flink 1.13 源码解析——JobManager启动流程 WebMonitorEndpoint启动


    点击这里查看 Flink 1.13 源码解析 目录汇总

    点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程概览

    点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动

    点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程之Dispatcher启动

    目录

    前言:

    WebMonitorEndpoint启动流程解析

    第一步:Handler相关操作,具体做了以下操作:

    第二步:Netty启动的相关操作

    第三步:修改状态并启动其他基础服务

    第四步:节点选举以及方法回调

    总结


    前言:

            在上一章中,我们分析了主节点(逻辑JobManager)的启动大致流程,在这一章中我们来看看源码中WebMonitorEndpoint服务是如何构建并启动的。

            我们先来复习一下上节中分析的JobManager的几个重要概念:

    关于Flink的主节点JobManager,他只是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不同。

    JobManager(逻辑)有三大核心内容,分别为ResourceManager、Dispatcher和WebmonitorEndpoin:

    ResourceManager:

            Flink集群的资源管理器,只有一个,关于Slot的管理和申请等工作,都有它负责

    Dispatcher:

            1、负责接收用户提交的JobGraph,然后启动一个JobMaster,蕾西与Yarn中的AppMaster和Spark中的Driver。

            2、内有一个持久服务:JobGraphStore,负责存储JobGraph。当构建执行图或物理执行图时主节点宕机并恢复,则可以从这里重新拉取作业JobGraph

    WebMonitorEndpoint:

            Rest服务,内部有一个Netty服务,客户端的所有请求都由该组件接收处理

    用一个例子来描述这三个组件的功能:

            当Client提交一个Job到集群时(Client会把Job构建成一个JobGraph),主节点接收到提交的job的Rest请求后,WebMonitorEndpoint 会通过Router进行解析找到对应的Handler来执行处理,处理完毕后交由Dispatcher,Dispatcher负责大气JobMaster来负责这个Job内部的Task的部署执行,执行Task所需的资源,JobMaster向ResourceManager申请。 

            在这里,我们再来介绍一下WebMonitorEndpoint的主要功能:

            WebMonitorEndpoint里面维护了很多很多的Handler,如果客户端通过 flink run 的方式来提交一个 job 到 flink 集群,最终, 是由 WebMonitorEndpoint 来接收,并且决定使用哪一个 Handler 来执行处理

            好了,了解了WebMonitorEndpoint的功能后,我们来看看WebMonitorEndpoint的构建和启动源码。

    WebMonitorEndpoint启动流程解析

            紧接上一章内容,在工厂类创建完成后,首先创建的实例为WebMonitorEndpoint实例,下面我们俩看看WebMonitorEndpoint是如何启动的

    点击这里查看上一节内容:Flink 1.13 源码解析——JobManager启动流程概览

            首先来看DefaultDispatcherResourceManagerComponentFactory的create方法中WebMonitorEndpoint的构建和启动代码

    1. /*
    2. TODO 创建WebMonitorEndpoint实例,在Standalone模式下为:DispatcherRestEndpoint
    3. 该实例内部会启动一个Netty服务端,绑定了一堆Handler
    4. */
    5. webMonitorEndpoint =
    6. restEndpointFactory.createRestEndpoint(
    7. configuration,
    8. dispatcherGatewayRetriever,
    9. resourceManagerGatewayRetriever,
    10. blobServer,
    11. executor,
    12. metricFetcher,
    13. highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
    14. fatalErrorHandler);
    15. // TODO 启动WebMonitorEndpoint
    16. log.debug("Starting Dispatcher REST endpoint.");
    17. webMonitorEndpoint.start();

    我们进入到webMonitorEndpoint.start()方法内,这个方法里面内容比较多,我们这里来逐一分析

    第一步:Handler相关操作,具体做了以下操作:

    1. 首先创建Router,来解析Client的请求并寻找对应的Handler
    2. 通过initializeHandlers方法注册了一堆Handler
    3. 将这些Handler进行排序,这里的排序是为了确认URL和Handler一对一的关系
    4. 排序好后通过checkAllEndpointsAndHandlersAreUnique方法来确认唯一性
    5. 确认唯一性后将Handler逐一注册
    1. // TODO 路由器,解析请求寻找对应Handler
    2. final Router router = new Router();
    3. final CompletableFuture restAddressFuture = new CompletableFuture<>(
    4. // TODO 初始化Handlers
    5. handlers = initializeHandlers(restAddressFuture);
    6. /* sort the handlers such that they are ordered the following:
    7. * /jobs
    8. * /jobs/overview
    9. * /jobs/:jobid
    10. * /jobs/:jobid/config
    11. * /:*
    12. *
    13. * TODO 排序,为了确认URL和Handler的一一对应的关系,不应出现一对多的情况
    14. */
    15. Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
    16. // TODO 确认唯一性方法
    17. checkAllEndpointsAndHandlersAreUnique(handlers);
    18. // TODO 注册Handler
    19. handlers.forEach(handler -> registerHandler(router, handler, log));

    第二步:Netty启动的相关操作

    首先启动NettyServer端引导操作

    1. // TODO 启动Netty Server 端引导程序
    2. NioEventLoopGroup bossGroup =
    3. new NioEventLoopGroup(
    4. 1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
    5. NioEventLoopGroup workerGroup =
    6. new NioEventLoopGroup(
    7. 0, new ExecutorThreadFactory("flink-rest-server-netty-worker"))
    8. bootstrap = new ServerBootstrap();
    9. bootstrap
    10. .group(bossGroup, workerGroup)
    11. .channel(NioServerSocketChannel.class)
    12. .childHandler(initializer);
    13. Iterator portsIterator;
    14. try {
    15. portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
    16. } catch (IllegalConfigurationException e) {
    17. throw e;
    18. } catch (Exception e) {
    19. throw new IllegalArgumentException(
    20. "Invalid port range definition: " + restBindPortRange);
    21. }

    这里有一段代码很有意思,Flink为了解决端口冲突问题做了以下操作

    1. // 此处为防止端口冲突,将逐一尝试端口是否可用
    2. int chosenPort = 0;
    3. while (portsIterator.hasNext()) {
    4. try {
    5. chosenPort = portsIterator.next();
    6. final ChannelFuture channel;
    7. if (restBindAddress == null) {
    8. channel = bootstrap.bind(chosenPort);
    9. } else {
    10. channel = bootstrap.bind(restBindAddress, chosenPort);
    11. }
    12. serverChannel = channel.syncUninterruptibly().channel();
    13. break;
    14. } catch (final Exception e) {
    15. // continue if the exception is due to the port being in use, fail early
    16. // otherwise
    17. if (!(e instanceof org.jboss.netty.channel.ChannelException
    18. || e instanceof java.net.BindException)) {
    19. throw e;
    20. }
    21. }
    22. }
    23. if (serverChannel == null) {
    24. throw new BindException(
    25. "Could not start rest endpoint on any port in port range "
    26. + restBindPortRange);
    27. }

    第三步:修改状态并启动其他基础服务

            在方法末尾修改一下EndPoint状态为RUNNING,到这里WebMonitorEndpoint的Netty服务就启动完毕了,接下来通过startInternal()方法进行其他基础服务的启动

    1. // 修改状态
    2. state = State.RUNNING;
    3. //TODO 到此为止,WebMonitorEndpoint的netty服务端就启动好了
    4. // TODO 启动其他基础服务
    5. startInternal();

    接下来,我们看看startInternal做了什么工作

    第四步:节点选举以及方法回调

            在startInternal方法内部我们可以看到,WebMonitorEndpoint准备开始进行Leader竞选

    1. @Override
    2. public void startInternal() throws Exception {
    3. // TODO 开始选举,去找回调方法
    4. leaderElectionService.start(this);
    5. startExecutionGraphCacheCleanupTask();
    6. if (hasWebUI) {
    7. log.info("Web frontend listening at {}.", getRestBaseUrl());
    8. }
    9. }

            在往下看之前,这里要介绍一下主节点内部组件的的Leader选举机制

            Flink的选举使用的是Curator框架,节点的选举针对每一个参选对象,会创建一个选举驱动leaderElectionDriver,在完成选举之后,会回调两个方法,如果选举成功会回调isLeader方法如果竞选失败则回调notLeader方法。有了这个概念,我们再来看WebMonitorEndpoint的选举,我们进入start方法。这里我们选择DefaultLeaderElectionService实现

    1. @Override
    2. public final void start(LeaderContender contender) throws Exception {
    3. checkNotNull(contender, "Contender must not be null.");
    4. Preconditions.checkState(leaderContender == null, "Contender was already set.");
    5. synchronized (lock) {
    6. /*
    7. TODO 在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
    8. 在ResourceManager中调用时,contender为ResourceManager
    9. 在DispatcherRunner中调用时,contender为DispatcherRunner
    10. */
    11. leaderContender = contender;
    12. // TODO 此处创建选举对象 leaderElectionDriver
    13. leaderElectionDriver =
    14. leaderElectionDriverFactory.createLeaderElectionDriver(
    15. this,
    16. new LeaderElectionFatalErrorHandler(),
    17. leaderContender.getDescription());
    18. LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
    19. running = true;
    20. }
    21. }

            在Standalone模式下,WebMonitorEndpoint、ResourceManager、DispatcherRunner都会使用该模式来进行竞选,此刻我们是从WebMonitorEndpoint进入的此方法,此时contender对象实际为DispatcherRestEndpoint。我们继续看leaderElectionDriver的构建。

            接下来进入createLeaderElectionDriver方法内,由于我们是Standalone模式,我们选择ZooKeeperLeaderElectionDriverFactory实现

    1. @Override
    2. public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
    3. LeaderElectionEventHandler leaderEventHandler,
    4. FatalErrorHandler fatalErrorHandler,
    5. String leaderContenderDescription)
    6. throws Exception {
    7. return new ZooKeeperLeaderElectionDriver(
    8. client,
    9. latchPath,
    10. leaderPath,
    11. leaderEventHandler,
    12. fatalErrorHandler,
    13. leaderContenderDescription);
    14. }

    这里构建了一个zookeeper的leaderElectionDriver,我们点进来继续看

    1. /**
    2. * Creates a ZooKeeperLeaderElectionDriver object.
    3. *
    4. * @param client Client which is connected to the ZooKeeper quorum
    5. * @param latchPath ZooKeeper node path for the leader election latch
    6. * @param leaderPath ZooKeeper node path for the node which stores the current leader
    7. * information
    8. * @param leaderElectionEventHandler Event handler for processing leader change events
    9. * @param fatalErrorHandler Fatal error handler
    10. * @param leaderContenderDescription Leader contender description
    11. */
    12. public ZooKeeperLeaderElectionDriver(
    13. CuratorFramework client,
    14. String latchPath,
    15. String leaderPath,
    16. LeaderElectionEventHandler leaderElectionEventHandler,
    17. FatalErrorHandler fatalErrorHandler,
    18. String leaderContenderDescription)
    19. throws Exception {
    20. this.client = checkNotNull(client);
    21. this.leaderPath = checkNotNull(leaderPath);
    22. this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
    23. this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
    24. this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
    25. leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
    26. cache = new NodeCache(client, leaderPath);
    27. client.getUnhandledErrorListenable().addListener(this);
    28. running = true;
    29. // TODO 开始选举
    30. leaderLatch.addListener(this);
    31. leaderLatch.start();
    32. /*
    33. TODO 选举开始后,不就会接收到响应:
    34. 1.如果竞选成功,则回调该类的isLeader方法
    35. 2.如果竞选失败,则回调该类的notLeader方法
    36. 每一个竞选者对应一个竞选Driver
    37. */
    38. cache.getListenable().addListener(this);
    39. cache.start();
    40. client.getConnectionStateListenable().addListener(listener);
    41. }

    在这里,通过start方法开始进行选举,正如我们上面所说,在选举完成和会调用回调方法,我们去看该类的isLeader方法

    1. /*
    2. 选举成功
    3. */
    4. @Override
    5. public void isLeader() {
    6. leaderElectionEventHandler.onGrantLeadership();
    7. }

    再进入onGrantLeadership方法

    1. @Override
    2. @GuardedBy("lock")
    3. public void onGrantLeadership() {
    4. synchronized (lock) {
    5. if (running) {
    6. issuedLeaderSessionID = UUID.randomUUID();
    7. clearConfirmedLeaderInformation();
    8. if (LOG.isDebugEnabled()) {
    9. LOG.debug(
    10. "Grant leadership to contender {} with session ID {}.",
    11. leaderContender.getDescription(),
    12. issuedLeaderSessionID);
    13. }
    14. /*
    15. TODO 有4中竞选者类型,LeaderContender有4中情况
    16. 1.Dispatcher = DefaultDispatcherRunner
    17. 2.JobMaster = JobManagerRunnerImpl
    18. 3.ResourceManager = ResourceManager
    19. 4.WebMonitorEndpoint = WebMonitorEndpoint
    20. */
    21. leaderContender.grantLeadership(issuedLeaderSessionID);
    22. } else {
    23. if (LOG.isDebugEnabled()) {
    24. LOG.debug(
    25. "Ignoring the grant leadership notification since the {} has "
    26. + "already been closed.",
    27. leaderElectionDriver);
    28. }
    29. }
    30. }
    31. }

    我们再进入leaderContender.grantLeadership方法,因为当前是WebMonitorEndpoint的选举,所以我们进入WebMonitorEndpoint的实现

    1. @Override
    2. public void grantLeadership(final UUID leaderSessionID) {
    3. log.info(
    4. "{} was granted leadership with leaderSessionID={}",
    5. getRestBaseUrl(),
    6. leaderSessionID);
    7. leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
    8. }

    没什么好说的,我们再进入leaderElectionService.confirmLeadership方法,选择DefaultLeaderElectionService实现

    1. @Override
    2. public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
    3. if (LOG.isDebugEnabled()) {
    4. LOG.debug(
    5. "Confirm leader session ID {} for leader {}.", leaderSessionID, leaderAddress);
    6. }
    7. checkNotNull(leaderSessionID);
    8. synchronized (lock) {
    9. if (hasLeadership(leaderSessionID)) {
    10. if (running) {
    11. // TODO 确认Leader信息,并将节点信息写入zk
    12. confirmLeaderInformation(leaderSessionID, leaderAddress);
    13. } else {
    14. if (LOG.isDebugEnabled()) {
    15. LOG.debug(
    16. "Ignoring the leader session Id {} confirmation, since the "
    17. + "LeaderElectionService has already been stopped.",
    18. leaderSessionID);
    19. }
    20. }
    21. } else {
    22. // Received an old confirmation call
    23. if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
    24. if (LOG.isDebugEnabled()) {
    25. LOG.debug(
    26. "Receive an old confirmation call of leader session ID {}, "
    27. + "current issued session ID is {}",
    28. leaderSessionID,
    29. issuedLeaderSessionID);
    30. }
    31. } else {
    32. LOG.warn(
    33. "The leader session ID {} was confirmed even though the "
    34. + "corresponding JobManager was not elected as the leader.",
    35. leaderSessionID);
    36. }
    37. }
    38. }
    39. }

    也没有什么内容,我们直接进入confirmLeaderInformation方法里

    1. @GuardedBy("lock")
    2. private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
    3. confirmedLeaderSessionID = leaderSessionID;
    4. confirmedLeaderAddress = leaderAddress;
    5. leaderElectionDriver.writeLeaderInformation(
    6. LeaderInformation.known(confirmedLeaderSessionID, confirmedLeaderAddress));
    7. }

    可以看到,WebMonitorEndpoint在选举Leader成功后,并没有做什么,只是将自己的信息写入zookeeper。

    在信息写入完毕后,WebMonitorEndpoint就算是启动完成了。

    总结

    WebMonitorEndpoint的启动流程并不复杂,总结一下就是做了以下这些工作:

    1. 初始化一堆Handler
    2. 启动Netty服务,注册Handler
    3. 启动内部服务: 执行竞选,WebMonitorEndpoint本身就是一个LeaderContender角色
    4. 竞选成功,其实只是把WebMonitorEndpoint的address以及和zk的sessionId写入znode中

    在下一篇中,我们继续来看ResourceManager的启动流程

  • 相关阅读:
    【Leetcode60天带刷】day01——704.二分查找、27.移除元素
    Linear Regression in mojo with NDBuffer
    时间序列(三):多变量回归
    ModuleNotFoundError: No module named ‘apt_pkg‘
    微服务项目雪崩的解决思路
    Android 13.0 Launcher3定制之双层改单层(去掉抽屉式三)
    SpringCloud Alibaba学习笔记,记重点!!
    Beacon帧
    学会Spring MVC文件上传、下载和JRebel的使用
    基于 PowerMax 架构的银行双活数据中心实践分享
  • 原文地址:https://blog.csdn.net/EdwardWong_/article/details/126533445