• Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动


    ​点击这里查看 Flink 1.13 源码解析 目录汇总

    点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程概览

    点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程 WebMonitorEndpoint启动

    点击查看相关章节:Flink 1.13 源码解析——JobManager启动流程之Dispatcher启动

    目录

    一、前言

    二、ResourceManager的启动

    2.1、触发Onstart回调

    2.2、Leader竞选,完成后进行isLeader的回调

    2.3、两个心跳以及两个定时任务

    2.3.1、两个心跳

    2.3.2、两个定时

    三、总结:


    一、前言

    在开始解析ResourceManager之前,我们先来复习一下Flink主节点中的一些重要概念:

            关于Flink的主节点JobManager,他只是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不同。

            JobManager(逻辑)有三大核心内容,分别为ResourceManager、Dispatcher和WebmonitorEndpoin:

    ResourceManager:

            Flink集群的资源管理器,只有一个,关于Slot的管理和申请等工作,都有它负责

    Dispatcher:

            1、负责接收用户提交的JobGraph,然后启动一个JobMaster,类似于Yarn中的AppMaster和Spark中的Driver。

            2、内有一个持久服务:JobGraphStore,负责存储JobGraph。当构建执行图或物理执行图时主节点宕机并恢复,则可以从这里重新拉取作业JobGraph

    WebMonitorEndpoint:

            Rest服务,内部有一个Netty服务,客户端的所有请求都由该组件接收处理

    用一个例子来描述这三个组件的功能:

            当Client提交一个Job到集群时(Client会把Job构建成一个JobGraph),主节点接收到提交的job的Rest请求后,WebMonitorEndpoint 会通过Router进行解析找到对应的Handler来执行处理,处理完毕后交由Dispatcher,Dispatcher负责大气JobMaster来负责这个Job内部的Task的部署执行,执行Task所需的资源,JobMaster向ResourceManager申请。 

    ResourceManager在Flink中扮演的角色就是一个资源管理器,负责Slot的管理和申请等工作。

    下面我们来看ResourceManager的启动代码。

    二、ResourceManager的启动

    首先回到dispatcherResourceManagerComponentFactory.create()方法,在完成了WebMonitorEndpoint的创建和启动之后,将进行ResourceManager的启动,我们来看代码:

    1. resourceManager =
    2. resourceManagerFactory.createResourceManager(
    3. configuration,
    4. ResourceID.generate(),
    5. rpcService,
    6. highAvailabilityServices,
    7. heartbeatServices,
    8. fatalErrorHandler,
    9. new ClusterInformation(hostname, blobServer.getPort()),
    10. webMonitorEndpoint.getRestBaseUrl(),
    11. metricRegistry,
    12. hostname,
    13. ioExecutor);
    14. resourceManager.start();

    在这里首先初始化了ResourceManager实例,然后调用了start方法启动ResourceManager,我们来看start方法

    1. @Override
    2. public void start() {
    3. // 向自己发送消息,告知已启动
    4. rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
    5. }

    ResourceManager是一个RpcEndpoint(Actor),在start方法里,知识是向自己发送了一条消息,告知已启动的状态。

    那么我们回头去看ResourceManager的构建过程,我们去看resourceManagerFactory.createResourceManager方法。

    1. // TODO 构建ResourceManagerRuntimeServices,加载配置
    2. final ResourceManagerRuntimeServices resourceManagerRuntimeServices =
    3. createResourceManagerRuntimeServices(
    4. effectiveResourceManagerAndRuntimeServicesConfig,
    5. rpcService,
    6. highAvailabilityServices,
    7. slotManagerMetricGroup);
    8. // TODO 构建ResourceManager
    9. return createResourceManager(
    10. getEffectiveConfigurationForResourceManager(
    11. effectiveResourceManagerAndRuntimeServicesConfig),
    12. resourceId,
    13. rpcService,
    14. highAvailabilityServices,
    15. heartbeatServices,
    16. fatalErrorHandler,
    17. clusterInformation,
    18. webInterfaceUrl,
    19. resourceManagerMetricGroup,
    20. resourceManagerRuntimeServices,
    21. ioExecutor);

    在方法里, 主要做了两件事:

    1、构建ResourceManagerRuntimeServices并加载配置

    2、真正构建ResourceManager

    2.1、触发Onstart回调

    我们进入createResourceManager方法里,选择StandaloneResourceManagerFactory的实现,可以看到:

    1. @Override
    2. protected ResourceManager createResourceManager(
    3. Configuration configuration,
    4. ResourceID resourceId,
    5. RpcService rpcService,
    6. HighAvailabilityServices highAvailabilityServices,
    7. HeartbeatServices heartbeatServices,
    8. FatalErrorHandler fatalErrorHandler,
    9. ClusterInformation clusterInformation,
    10. @Nullable String webInterfaceUrl,
    11. ResourceManagerMetricGroup resourceManagerMetricGroup,
    12. ResourceManagerRuntimeServices resourceManagerRuntimeServices,
    13. Executor ioExecutor) {
    14. // TODO ResourceManager启动超时时间: 从启动,到有TaskManager汇报的时间,
    15. // TODO 可以通过resourcemanager.standalone.start-up-time进行设置,如果没有设置则默认等于Slot申请超时时间
    16. final Time standaloneClusterStartupPeriodTime =
    17. ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);
    18. return new StandaloneResourceManager(
    19. rpcService,
    20. resourceId,
    21. highAvailabilityServices,
    22. heartbeatServices,
    23. resourceManagerRuntimeServices.getSlotManager(),
    24. ResourceManagerPartitionTrackerImpl::new,
    25. resourceManagerRuntimeServices.getJobLeaderIdService(),
    26. clusterInformation,
    27. fatalErrorHandler,
    28. resourceManagerMetricGroup,
    29. standaloneClusterStartupPeriodTime,
    30. AkkaUtils.getTimeoutAsTime(configuration),
    31. ioExecutor);
    32. }

    在这段代码里,主要完成了两件事:

    1、配置ResourceManager启动超时时间,所谓超时时间,是指从ResourceManager启动,一直到有TaskManager向ResourceManager注册的时间长度,当超过配置的时间还没有TaskManager来向当前ResourceManager来注册,则认为当前ResourceManager启动超时,改参数可以通过resourcemanager.standalone.start-up-time进行设置,如果没有设置则默认等于Slot申请超时时间

    2、构建了一个StandaloneResourceManager实例

    我们继续看StandaloneResourceManager的构建过程,进入StandaloneResourceManager的构造方法,一直追溯到ResourceManager的构造方法,可以看到ResourceManager继承了RpcEndpoint,所以他一定有一个onStart方法,在构建完成之后会被回调,所以我们直接去找ResourceManager的onStart方法:

    1. // ------------------------------------------------------------------------
    2. // RPC lifecycle methods
    3. // ------------------------------------------------------------------------
    4. @Override
    5. public final void onStart() throws Exception {
    6. try {
    7. log.info("Starting the resource manager.");
    8. // TODO 启动ResourceManager 的基础服务
    9. startResourceManagerServices();
    10. } catch (Throwable t) {
    11. final ResourceManagerException exception =
    12. new ResourceManagerException(
    13. String.format("Could not start the ResourceManager %s", getAddress()),
    14. t);
    15. onFatalError(exception);
    16. throw exception;
    17. }
    18. }

    可以看到,在方法里调用了startResourceManagerServices()方法来启动ResourceManager的基础服务,我们进入startResourceManagerServices()方法:

    1. private void startResourceManagerServices() throws Exception {
    2. try {
    3. // TODO 获取选举服务
    4. leaderElectionService =
    5. highAvailabilityServices.getResourceManagerLeaderElectionService();
    6. // 在Standalone模式下没有做任何操作
    7. initialize();
    8. // TODO 开始竞选
    9. leaderElectionService.start(this);
    10. jobLeaderIdService.start(new JobLeaderIdActionsImpl());
    11. registerMetrics();
    12. } catch (Exception e) {
    13. handleStartResourceManagerServicesException(e);
    14. }
    15. }

    又看到了我们熟悉的操作,Leader竞选。

    2.2、Leader竞选,完成后进行isLeader的回调

    我们直接来看leaderElectionService.start方法

    1. @Override
    2. public final void start(LeaderContender contender) throws Exception {
    3. checkNotNull(contender, "Contender must not be null.");
    4. Preconditions.checkState(leaderContender == null, "Contender was already set.");
    5. synchronized (lock) {
    6. /*
    7. TODO 在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
    8. 在ResourceManager中调用时,contender为ResourceManager
    9. 在DispatcherRunner中调用时,contender为DispatcherRunner
    10. */
    11. leaderContender = contender;
    12. // TODO 此处创建选举对象 leaderElectionDriver
    13. leaderElectionDriver =
    14. leaderElectionDriverFactory.createLeaderElectionDriver(
    15. this,
    16. new LeaderElectionFatalErrorHandler(),
    17. leaderContender.getDescription());
    18. LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
    19. running = true;
    20. }
    21. }

    哦豁,这不就是上一章WebMonitorEndpoint启动时调用过的方法嘛,只不过由于此处是ResourceManager的选举,当前contender为ResourceManager。老规矩,我们直接去看createLeaderElectionDriver方法。由于是Standalone模式,我们选择ZooKeeperLeaderElectionDriverFactory的实现:

    1. @Override
    2. public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
    3. LeaderElectionEventHandler leaderEventHandler,
    4. FatalErrorHandler fatalErrorHandler,
    5. String leaderContenderDescription)
    6. throws Exception {
    7. return new ZooKeeperLeaderElectionDriver(
    8. client,
    9. latchPath,
    10. leaderPath,
    11. leaderEventHandler,
    12. fatalErrorHandler,
    13. leaderContenderDescription);
    14. }

    可以看到这里返回了一个zk的选举驱动,我们在点进ZooKeeperLeaderElectionDriver类

    1. public ZooKeeperLeaderElectionDriver(
    2. CuratorFramework client,
    3. String latchPath,
    4. String leaderPath,
    5. LeaderElectionEventHandler leaderElectionEventHandler,
    6. FatalErrorHandler fatalErrorHandler,
    7. String leaderContenderDescription)
    8. throws Exception {
    9. this.client = checkNotNull(client);
    10. this.leaderPath = checkNotNull(leaderPath);
    11. this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
    12. this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
    13. this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
    14. leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
    15. cache = new NodeCache(client, leaderPath);
    16. client.getUnhandledErrorListenable().addListener(this);
    17. running = true;
    18. // TODO 开始选举
    19. leaderLatch.addListener(this);
    20. leaderLatch.start();
    21. /*
    22. TODO 选举开始后,不就会接收到响应:
    23. 1.如果竞选成功,则回调该类的isLeader方法
    24. 2.如果竞选失败,则回调该类的notLeader方法
    25. 每一个竞选者对应一个竞选Driver
    26. */
    27. cache.getListenable().addListener(this);
    28. cache.start();
    29. client.getConnectionStateListenable().addListener(listener);
    30. }

    又来到了这个选举方法,在上节里我们讲到,在完成选举之后会回调isLeader方法或notLeader方法,我们这里直接去看isLeader方法

    1. /*
    2. 选举成功
    3. */
    4. @Override
    5. public void isLeader() {
    6. leaderElectionEventHandler.onGrantLeadership();
    7. }

    在进入leaderElectionEventHandler.onGrantLeadership()方法:

    1. @Override
    2. @GuardedBy("lock")
    3. public void onGrantLeadership() {
    4. synchronized (lock) {
    5. if (running) {
    6. issuedLeaderSessionID = UUID.randomUUID();
    7. clearConfirmedLeaderInformation();
    8. if (LOG.isDebugEnabled()) {
    9. LOG.debug(
    10. "Grant leadership to contender {} with session ID {}.",
    11. leaderContender.getDescription(),
    12. issuedLeaderSessionID);
    13. }
    14. /*
    15. TODO 有4中竞选者类型,LeaderContender有4中情况
    16. 1.Dispatcher = DefaultDispatcherRunner
    17. 2.JobMaster = JobManagerRunnerImpl
    18. 3.ResourceManager = ResourceManager
    19. 4.WebMonitorEndpoint = WebMonitorEndpoint
    20. */
    21. leaderContender.grantLeadership(issuedLeaderSessionID);
    22. } else {
    23. if (LOG.isDebugEnabled()) {
    24. LOG.debug(
    25. "Ignoring the grant leadership notification since the {} has "
    26. + "already been closed.",
    27. leaderElectionDriver);
    28. }
    29. }
    30. }
    31. }

    再进入leaderContender.grantLeadership方法,选择ResourceManager的实现:

    1. // ------------------------------------------------------------------------
    2. // Leader Contender
    3. // ------------------------------------------------------------------------
    4. /**
    5. * Callback method when current resourceManager is granted leadership.
    6. *
    7. * @param newLeaderSessionID unique leadershipID
    8. */
    9. @Override
    10. public void grantLeadership(final UUID newLeaderSessionID) {
    11. final CompletableFuture acceptLeadershipFuture =
    12. clearStateFuture.thenComposeAsync(
    13. // TODO 选举成功后执行回调函数
    14. (ignored) -> tryAcceptLeadership(newLeaderSessionID),
    15. getUnfencedMainThreadExecutor());
    16. final CompletableFuture confirmationFuture =
    17. acceptLeadershipFuture.thenAcceptAsync(
    18. (acceptLeadership) -> {
    19. if (acceptLeadership) {
    20. // confirming the leader session ID might be blocking,
    21. leaderElectionService.confirmLeadership(
    22. newLeaderSessionID, getAddress());
    23. }
    24. },
    25. ioExecutor);
    26. confirmationFuture.whenComplete(
    27. (Void ignored, Throwable throwable) -> {
    28. if (throwable != null) {
    29. onFatalError(ExceptionUtils.stripCompletionException(throwable));
    30. }
    31. });
    32. }

    在这个方法内部,ResourceManager调用了一个tryAcceptLeadership()方法,我们进入这个方法

    1. private CompletableFuture tryAcceptLeadership(final UUID newLeaderSessionID) {
    2. if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
    3. final ResourceManagerId newResourceManagerId =
    4. ResourceManagerId.fromUuid(newLeaderSessionID);
    5. log.info(
    6. "ResourceManager {} was granted leadership with fencing token {}",
    7. getAddress(),
    8. newResourceManagerId);
    9. // clear the state if we've been the leader before
    10. if (getFencingToken() != null) {
    11. clearStateInternal();
    12. }
    13. setFencingToken(newResourceManagerId);
    14. /*
    15. TODO 启动服务
    16. 1.启动两个心跳服务
    17. 2.启动slotManager服务启动两个定时任务
    18. */
    19. startServicesOnLeadership();
    20. return prepareLeadershipAsync().thenApply(ignored -> hasLeadership = true);
    21. } else {
    22. return CompletableFuture.completedFuture(false);
    23. }
    24. }

    2.3、两个心跳以及两个定时任务

    在这里通过调用startServicesOnLeadership方法,启动了两个心跳服务和两个定时任务,我们进入这个方法一探究竟:

    1. private void startServicesOnLeadership() {
    2. // TODO 启动两个心跳服务
    3. startHeartbeatServices();
    4. // TODO 启动两个定时服务
    5. // TODO SlotManager是存在于ResourceManager中用来管理所有TaskManager汇报和注册的Slot的工作的
    6. slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
    7. onLeadership();
    8. }

    2.3.1、两个心跳

    我们先来看启动的两个心跳服务,进入startHeartbeatServices()方法:

    1. private void startHeartbeatServices() {
    2. // TODO ResourceManager(主节点)维持和从节点的心跳
    3. // TODO ResourceManager(逻辑JobManager)维持和TaskExecutor(TaskManager)的心跳
    4. taskManagerHeartbeatManager =
    5. heartbeatServices.createHeartbeatManagerSender(
    6. resourceId,
    7. new TaskManagerHeartbeatListener(),
    8. getMainThreadExecutor(),
    9. log);
    10. // TODO ResourceManager维持和JobMaster(主控程序)的心跳
    11. jobManagerHeartbeatManager =
    12. heartbeatServices.createHeartbeatManagerSender(
    13. resourceId,
    14. new JobManagerHeartbeatListener(),
    15. getMainThreadExecutor(),
    16. log);
    17. }

    可以看到到这两个心跳分别为:

    1、ResourceManager维持和TaskManager的心跳

    2、ResourceManager维持和主控程序(JobMaster)的心跳

    我们再来看这个心跳是如何实现的,我们进入createHeartbeatManagerSender方法,在进入HeartbeatManagerSenderImpl,再进入this,可以看到:

    1. HeartbeatManagerSenderImpl(
    2. long heartbeatPeriod,
    3. long heartbeatTimeout,
    4. ResourceID ownResourceID,
    5. HeartbeatListener heartbeatListener,
    6. ScheduledExecutor mainThreadExecutor,
    7. Logger log,
    8. HeartbeatMonitor.Factory heartbeatMonitorFactory) {
    9. super(
    10. heartbeatTimeout,
    11. ownResourceID,
    12. heartbeatListener,
    13. mainThreadExecutor,
    14. log,
    15. heartbeatMonitorFactory);
    16. this.heartbeatPeriod = heartbeatPeriod;
    17. // TODO 线程池定时调用this的run方法,由于delay为0L,立即执行
    18. mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
    19. }

    在这里构建了一个延时线程池,不过延迟时间为0,则当代码执行到这里时会立即调用this的run方法,我们去看run方法:

    1. @Override
    2. public void run() {
    3. if (!stopped) {
    4. log.debug("Trigger heartbeat request.");
    5. // 详细说明待后面解析完从节点后在介绍
    6. for (HeartbeatMonitor heartbeatMonitor : getHeartbeatTargets().values()) {
    7. // TODO 发送心跳
    8. requestHeartbeat(heartbeatMonitor);
    9. }
    10. //等heartbeatPeriod=10s之后,再次执行this的run方法,来控制上面的for循环每隔10s执行一次,实现心跳的无限循环
    11. getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
    12. }
    13. }

    可以看到,当代码第一次执行到这里是,会先调用依次发送心跳的方法,关于这个for循环,我们会在后面的章节里详细解析,这里就先不细说。在发送完心跳之后又出现了一个延时线程池,在heartbeatPeriod(10秒)延时后,会再次触发this的run方法,也就是当前方法,到此就会进入无限的心跳循环,也是在这里构建完成无限心跳。

    2.3.2、两个定时

    分析完心跳的实现,我们回去看那两个定时服务是什么:

    首先进入slotManager.start方法并选择SlotManagerImpl实现:

    1. @Override
    2. public void start(
    3. ResourceManagerId newResourceManagerId,
    4. Executor newMainThreadExecutor,
    5. ResourceActions newResourceActions) {
    6. LOG.info("Starting the SlotManager.");
    7. this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
    8. mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
    9. resourceActions = Preconditions.checkNotNull(newResourceActions);
    10. started = true;
    11. /*
    12. TODO 定时任务checkTaskManagerTimeoutsAndRedundancy
    13. 每隔30秒检查一次闲置的TaskManager
    14. */
    15. taskManagerTimeoutsAndRedundancyCheck =
    16. scheduledExecutor.scheduleWithFixedDelay(
    17. () ->
    18. mainThreadExecutor.execute(
    19. () -> checkTaskManagerTimeoutsAndRedundancy()),
    20. 0L,
    21. taskManagerTimeout.toMilliseconds(),
    22. TimeUnit.MILLISECONDS);
    23. /*
    24. TODO 定时任务 checkSlotRequestTimeouts
    25. Slot在申请中是状态为PendingRequest, 这个定时任务就是来检测那些已经超过5分钟的pendingRequest
    26. 也就是超时的Slot
    27. */
    28. slotRequestTimeoutCheck =
    29. scheduledExecutor.scheduleWithFixedDelay(
    30. () -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()),
    31. 0L,
    32. slotRequestTimeout.toMilliseconds(),
    33. TimeUnit.MILLISECONDS);
    34. registerSlotManagerMetrics();
    35. }

    可以看到,这两个定时任务分别为:

    1、闲置TaskManager的定时检查,这里当我们是yarn-session模式时,会定时(30秒)检查一次闲置的TaskManager,当有闲置时间超过30秒的Taskmanager是,回去将该从节点回收,并释放资源。

    2、定时检查超时的Slot申请,Slot在申请中是状态为PendingRequest, 这个定时任务就是来检测那些已经超过5分钟的pendingRequest 也就是超时的Slot

    我们进入超时slot的检查方法checkSlotRequestTimeouts:

    1. private void checkSlotRequestTimeouts() {
    2. if (!pendingSlotRequests.isEmpty()) {
    3. long currentTime = System.currentTimeMillis();
    4. Iterator> slotRequestIterator =
    5. pendingSlotRequests.entrySet().iterator();
    6. // TODO 遍历SlotRequest列表
    7. while (slotRequestIterator.hasNext()) {
    8. PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();
    9. // 判断已超时的slotRequest
    10. if (currentTime - slotRequest.getCreationTimestamp()
    11. >= slotRequestTimeout.toMilliseconds()) {
    12. // 移除掉已超时的slotRequest
    13. slotRequestIterator.remove();
    14. // TODO ResourceManager已经分配给某个Job的Slot,但是该Slot还处于pendingRequest状态
    15. if (slotRequest.isAssigned()) {
    16. // 取消
    17. cancelPendingSlotRequest(slotRequest);
    18. }
    19. // TODO 通知失败
    20. resourceActions.notifyAllocationFailure(
    21. slotRequest.getJobId(),
    22. slotRequest.getAllocationId(),
    23. new TimeoutException("The allocation could not be fulfilled in time."));
    24. }
    25. }
    26. }
    27. }

    检查流程为:

    1、遍历所有slot请求列表

    2、判断已超时的slotRequest

    3、移除超时SlotRequest

    4、 如果ResourceManager已经分配给某个Job的Slot,但是该Slot还处于pendingRequest状态

    5、则先取消当前的slot分配

    6、再通知该slot分配失败

    到此为止,ResourceManager就已经启动完毕,最后我们总结一下ResourceManager的启动工作:

    三、总结:

    ResourceManager的启动要点有以下几点:

    1、ResourceManager是一个RpcEndpoint(Actor),当构建好对象后启动时会触发onStart(Actor的perStart生命周期方法)方法
    2、ResourceManager也是一个LeaderContendr,也会执行竞选, 会执行竞选结果方法
    3、ResourceManagerService 具有两个心跳服务和两个定时服务:
             a、两个心跳服务:    
                      ⅰ、从节点和主节点之间的心跳
                      ⅱ、Job的主控程序和主节点之间的心跳
             b、两个定时服务:
                      ⅰ、TaskManager 的超时检查服务
                      ⅱ、Slot申请的 超时检查服务

    在下一章中,我们继续介绍Dispatcher的构建和启动过程!

  • 相关阅读:
    一、MyBatis-Plus(未完成)
    JVM类加载机制详解
    外汇天眼:如果你想成为前5%的交易者
    Selenium的WebDriver操作页面的超时或者元素重叠引起的ElementClickInterceptedException
    国产加速度传感器QMA6100P
    数组模拟散列表
    1704. 判断字符串的两半是否相似
    C#面:死锁的必要条件是什么?怎么克服?
    JavaWeb的监控系统
    什么是js的闭包,它是如何产生的
  • 原文地址:https://blog.csdn.net/EdwardWong_/article/details/126551127