• Flink Yarn Per Job - RM启动SlotManager


    图片

    ResourceManager

    public final void onStart() throws Exception {
      try {
        startResourceManagerServices();
      } catch (Throwable t) {
        final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), t);
        onFatalError(exception);
        throw exception;
      }
    }
    
    private void startResourceManagerServices() throws Exception {
      try {
        leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
    
        // 创建了Yarn的RM和NM的客户端,初始化并启动
        initialize();
    
        //  通过选举服务,启动ResourceManager
        leaderElectionService.start(this);
    
        jobLeaderIdService.start(new JobLeaderIdActionsImpl());
    
        registerTaskExecutorMetrics();
      } catch (Exception e) {
        handleStartResourceManagerServicesException(e);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    创建了Yarn的RM和NM的客户端,初始化并启动

    通过选举服务,启动ResourceManager

    创建了Yarn的RM和NM的客户端

    ActiveResourceManager

    @Override
    protected void initialize() throws ResourceManagerException {
      try {
        resourceManagerDriver.initialize(
            this,
            new GatewayMainThreadExecutor(),
            ioExecutor);
      } catch (Exception e) {
        throw new ResourceManagerException("Cannot initialize resource provider.", e);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    AbstractResourceManagerDriver

    @Override
    public final void initialize(
        ResourceEventHandler resourceEventHandler,
        ScheduledExecutor mainThreadExecutor,
        Executor ioExecutor) throws Exception {
      this.resourceEventHandler = Preconditions.checkNotNull(resourceEventHandler);
      this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
      this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
         // 下追
         initializeInternal();
    }
    
    protected abstract void initializeInternal() throws Exception;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    YarnResourceManagerDriver

    @Override
    protected void initializeInternal() throws Exception {
      final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();
      try {
        // 创建Yarn的ResourceManager的客户端,并且初始化和启动
        resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient(
          yarnHeartbeatIntervalMillis,
          yarnContainerEventHandler);
        resourceManagerClient.init(yarnConfig);
        resourceManagerClient.start();
    
        final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();
        getContainersFromPreviousAttempts(registerApplicationMasterResponse);
        taskExecutorProcessSpecContainerResourcePriorityAdapter =
          new TaskExecutorProcessSpecContainerResourcePriorityAdapter(
            registerApplicationMasterResponse.getMaximumResourceCapability(),
            ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
      } catch (Exception e) {
        throw new ResourceManagerException("Could not start resource manager client.", e);
      }
    
      // 创建yarn的 NodeManager的客户端,并且初始化和启动
      nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
      nodeManagerClient.init(yarnConfig);
      nodeManagerClient.start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    创建Yarn的ResourceManager的客户端,并且初始化和启动

    创建yarn的 NodeManager的客户端,并且初始化和启动

    启动SlotManager

    StandaloneLeaderElectionService

    @Override
    public void start(LeaderContender newContender) throws Exception {
      if (contender != null) {
        // Service was already started
        throw new IllegalArgumentException("Leader election service cannot be started multiple times.");
      }
    
      contender = Preconditions.checkNotNull(newContender);
    
      // directly grant leadership to the given contender
      contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    ResourceManager

    public void grantLeadership(final UUID newLeaderSessionID) {
     final CompletableFuture acceptLeadershipFuture = clearStateFuture
        // 下追
        .thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());
    
    ... ...
    }
    
    private CompletableFuture tryAcceptLeadership(final UUID newLeaderSessionID) {
     if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
    ... ...
        startServicesOnLeadership();
    
        return prepareLeadershipAsync().thenApply(ignored -> true);
     } else {
        return CompletableFuture.completedFuture(false);
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    private void startServicesOnLeadership() {

      //  启动心跳服务:TaskManager、JobMaster

      startHeartbeatServices();

      //  启动slotManager

      slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());

      onLeadership();

    }

    启动心跳服务:TaskManager、JobMaster

    启动slotManager

    private void startHeartbeatServices() {
      taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
        resourceId,
        new TaskManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);
    
      jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
        resourceId,
        new JobManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    作为资源的老大,肯定要跟task小弟和job去通信

    SlotManagerImpl

    public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
      LOG.info("Starting the SlotManager.");
    
      this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
      mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
      resourceActions = Preconditions.checkNotNull(newResourceActions);
    
      started = true;
    
      taskManagerTimeoutsAndRedundancyCheck = scheduledExecutor.scheduleWithFixedDelay(
        () -> mainThreadExecutor.execute(
        // 检查超时和多余的TaskManager
          () -> checkTaskManagerTimeoutsAndRedundancy()),
        0L,
        taskManagerTimeout.toMilliseconds(),
        TimeUnit.MILLISECONDS);
    
      slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
        () -> mainThreadExecutor.execute(
          () -> checkSlotRequestTimeouts()),
        0L,
        slotRequestTimeout.toMilliseconds(),
        TimeUnit.MILLISECONDS);
    
      registerSlotManagerMetrics();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    检查超时和多余的TaskManager

    void checkTaskManagerTimeoutsAndRedundancy() {
      if (!taskManagerRegistrations.isEmpty()) {
        long currentTime = System.currentTimeMillis();
    
        ArrayList timedOutTaskManagers = new ArrayList<>(taskManagerRegistrations.size());
    
        // first retrieve the timed out TaskManagers
        for (TaskManagerRegistration taskManagerRegistration : taskManagerRegistrations.values()) {
          if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
            // we collect the instance ids first in order to avoid concurrent modifications by the
            // ResourceActions.releaseResource call
            timedOutTaskManagers.add(taskManagerRegistration);
          }
        }
    
        int slotsDiff = redundantTaskManagerNum * numSlotsPerWorker - freeSlots.size();
        if (freeSlots.size() == slots.size()) {
          // No need to keep redundant taskManagers if no job is running.
          // 如果没有job在运行,释放taskmanager
          releaseTaskExecutors(timedOutTaskManagers, timedOutTaskManagers.size());
        } else if (slotsDiff > 0) {
          // Keep enough redundant taskManagers from time to time.
          // 保证随时有足够额taskmanager
          int requiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker);
          allocateRedundantTaskManagers(requiredTaskManagers);
        } else {
          // second we trigger the release resource callback which can decide upon the resource release
          int maxReleaseNum = (-slotsDiff) / numSlotsPerWorker;
          releaseTaskExecutors(timedOutTaskManagers, Math.min(maxReleaseNum, timedOutTaskManagers.size()));
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    图片

  • 相关阅读:
    执行shell脚本插入oracle数据库中文数据乱码
    AIOS: LLM Agent Operating System
    用 strace 跟踪系统命令的调用
    快速查找swagger接口的插件
    SQL SELECT DISTINCT(选择不同) 语法
    MFC Windows 程序设计[226]之下拉式列表(附源码)
    Docker基础操作容器
    AUTOSAR从入门到精通100讲(150)-SOA架构及应用
    TCP的重传机制
    前端制作
  • 原文地址:https://blog.csdn.net/hyunbar/article/details/126165301