• Flink Yarn Per Job - JobManger 申请 Slot


    图片

    JobMaster 启动时,启动 SlotPool,向 ResourceManager 注册

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4wXufj8G-1659612338059)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v0klERL9-1659612338059)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]

    启动SlotPool

    JobMaster

    private void startJobMasterServices() throws Exception {
      // 启动心跳服务:taskmanager、resourcemanager
      startHeartbeatServices();
    
      // start the slot pool make sure the slot pool now accepts messages for this leader
      //  启动 slotpool
      slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
    
      //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
      // try to reconnect to previously known leader
      // 连接到之前已知的 ResourceManager
      reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
    
      // job is ready to go, try to establish connection with resource manager
      //   - activate leader retrieval for the resource manager
      //   - on notification of the leader, the connection will be established and
      //     the slot pool will start requesting slots
      //  与ResourceManager建立连接,slotpool开始请求资源
      resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 启动心跳服务:taskmanager、resourcemanager

    • 启动 slotpool

    • 连接到之前已知的 ResourceManager

    • slotpool开始请求资源

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Aw0jORrK-1659612338059)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z1OedAhy-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]

    向RM注册

    StandaloneLeaderRetrievalService

    @Override
    public void start(LeaderRetrievalListener listener) {
      checkNotNull(listener, "Listener must not be null.");
    
      synchronized (startStopLock) {
        checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
        started = true;
    
        // directly notify the listener, because we already know the leading JobManager's address
        listener.notifyLeaderAddress(leaderAddress, leaderId);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    ResourceManagerLeaderListener in TaskExecutor
    
    • 1
    private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
    
      @Override
      public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
        runAsync(
          () -> notifyOfNewResourceManagerLeader(
            leaderAddress,
            ResourceManagerId.fromUuidOrNull(leaderSessionID)));
        }
    }
    
    private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
      resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
      reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
    }  
    
    private void reconnectToResourceManager(Exception cause) {
      closeResourceManagerConnection(cause);
      startRegistrationTimeout();
      tryConnectToResourceManager();
    }
    
    private void tryConnectToResourceManager() {
      if (resourceManagerAddress != null) {
        connectToResourceManager();
      }
    }
    
    private void connectToResourceManager() {
    ... ...
      resourceManagerConnection.start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    RegisteredRpcConnection

    public void start() {
      checkState(!closed, "The RPC connection is already closed");
      checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
    
      //  创建注册对象
      final RetryingRegistration newRegistration = createNewRegistration();
    
      if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
        // 开始注册,注册成功之后,调用 onRegistrationSuccess()
        newRegistration.startRegistration();
      } else {
        // concurrent start operation
        newRegistration.cancel();
      }
    }
    
    
    private RetryingRegistration createNewRegistration() {
      RetryingRegistration newRegistration = checkNotNull(generateRegistration());
    ... ...
      return newRegistration;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 创建注册对象

    • 开始注册,注册成功之后,调用 onRegistrationSuccess()

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DDQ64O0N-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xC5Swm4q-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]

    SlotPool 申请 slot

    ResourceManagerConnection in JobMaster
    
    • 1
    protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
      runAsync(() -> {
        // filter out outdated connections
        //noinspection ObjectEquality
        if (this == resourceManagerConnection) {
          establishResourceManagerConnection(success);
        }
      });
    }
    
    private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
        final ResourceManagerId resourceManagerId = success.getResourceManagerId();
    ... ...
        // slotpool连接到ResourceManager,请求资源
        slotPool.connectToResourceManager(resourceManagerGateway);
    ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    slotpool连接到ResourceManager,请求资源

    SlotPoolImpl

    public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
      this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
    
      // work on all slots waiting for this connection
      for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
        requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
      }
    
      // all sent off
      waitingForResourceManager.clear();
    }
    
    private void requestSlotFromResourceManager(
        final ResourceManagerGateway resourceManagerGateway,
        final PendingRequest pendingRequest) {
    ...  ...
      CompletableFuture rmResponse = resourceManagerGateway.requestSlot(
        jobMasterId,
        new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
        rpcTimeout);
    ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    从RM中申请slot

    ResourceManager

    @Override
    public CompletableFuture requestSlot(
        JobMasterId jobMasterId,
        SlotRequest slotRequest,
        final Time timeout) {
    
      JobID jobId = slotRequest.getJobId();
      JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
    
      if (null != jobManagerRegistration) {
        if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
          log.info("Request slot with profile {} for job {} with allocation id {}.",
            slotRequest.getResourceProfile(),
            slotRequest.getJobId(),
            slotRequest.getAllocationId());
    
          try {
            // RM内部的 slotManager去向 Yarn的ResourceManager申请资源
            slotManager.registerSlotRequest(slotRequest);
          } catch (ResourceManagerException e) {
            return FutureUtils.completedExceptionally(e);
          }
    
          return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
          return FutureUtils.completedExceptionally(new ResourceManagerException("The job leader's id " +
            jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.'));
        }
    
      } else {
        return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'));
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    RM内部的 slotManager去向 Yarn的ResourceManager申请资源

    SlotManagerImpl

    public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
      checkInit();
    ... ... 
        try {
          internalRequestSlot(pendingSlotRequest);
        } catch (ResourceManagerException e) {
          // requesting the slot failed --> remove pending slot request
          pendingSlotRequests.remove(slotRequest.getAllocationId());
    
          throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
        }
        return true;
      }
    }
    
    private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
      final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
    
      OptionalConsumer.of(findMatchingSlot(resourceProfile))
        .ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
        .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
    }
    
    private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
      ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
      Optional pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);
    
      if (!pendingTaskManagerSlotOptional.isPresent()) {
        pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
      }
    ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-s7gTM4KK-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/lpHDr05YrIRQgT2sib9SWGE99gWsPW7x8FP6LoMYvgkmO9jSQjFqfcMVBUsiaSKickqickc7k2Kwah52tbzMnA316k2WA88rXLAX/640?wx_fmt=svg)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dtgk4Mxi-1659612338060)(https://mmbiz.qpic.cn/mmbiz_svg/ibKHP1TZZeXLuZkD53jFWzc8iauhHlerlWDib9Dgm1JiaSF9LB4RGxxD4cSFrUoIeI4fvic7VPGpKGv8AqCJgcUeqLoOAXBW6kKov/640?wx_fmt=svg)]

    Flink内的RM向Yarn的RM申请资源

    SlotManagerImpl

    private Optional allocateResource(ResourceProfile requestedSlotResourceProfile) {
    ... ...
      if (!resourceActions.allocateResource(defaultWorkerResourceSpec)) {
        // resource cannot be allocated
        return Optional.empty();
      }
    ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    ResourceActionsImpl in ResourceManager
    
    • 1
    public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
      validateRunsInMainThread();
      return startNewWorker(workerResourceSpec);
    }
    
    • 1
    • 2
    • 3
    • 4

    ActiveResourceManager

    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
      requestNewWorker(workerResourceSpec);
      return true;
    }
    
    // 从配置中获取 taskexecutor 配置
    private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
      final TaskExecutorProcessSpec taskExecutorProcessSpec =
          TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec);
      final int pendingCount = pendingWorkerCounter.increaseAndGet(workerResourceSpec);
    
      log.info("Requesting new worker with resource spec {}, current pending count: {}.",
          workerResourceSpec,
          pendingCount);
    // 申请资源
      CompletableFuture requestResourceFuture = resourceManagerDriver.requestResource(taskExecutorProcessSpec);
      FutureUtils.assertNoException(
          requestResourceFuture.handle((worker, exception) -> {
            if (exception != null) {
              final int count = pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
              log.warn("Failed requesting worker with resource spec {}, current pending count: {}, exception: {}",
                  workerResourceSpec,
                  count,
                  exception);
              requestWorkerIfRequired();
            } else {
              final ResourceID resourceId = worker.getResourceID();
              workerNodeMap.put(resourceId, worker);
              currentAttemptUnregisteredWorkers.put(resourceId, workerResourceSpec);
              log.info("Requested worker {} with resource spec {}.",
                  resourceId.getStringWithMetadata(),
                  workerResourceSpec);
            }
            return null;
          }));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 从配置中获取 taskexecutor 配置

    • 申请资源

    YarnResourceManagerDriver

    public CompletableFuture requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
      checkInitialized();
    
      final CompletableFuture requestResourceFuture = new CompletableFuture<>();
    
      final Optional priorityAndResourceOpt =
        taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);
    
      if (!priorityAndResourceOpt.isPresent()) {
        requestResourceFuture.completeExceptionally(
          new ResourceManagerException(
            String.format("Could not compute the container Resource from the given TaskExecutorProcessSpec %s. " +
                "This usually indicates the requested resource is larger than Yarn's max container resource limit.",
              taskExecutorProcessSpec)));
      } else {
        final Priority priority = priorityAndResourceOpt.get().getPriority();
        final Resource resource = priorityAndResourceOpt.get().getResource();
        resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority));
    
        // make sure we transmit the request fast and receive fast news of granted allocations
        resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
    
        requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec, ignore -> new LinkedList<>()).add(requestResourceFuture);
    
        log.info("Requesting new TaskExecutor container with resource {}, priority {}.", taskExecutorProcessSpec, priority);
      }
    
      return requestResourceFuture;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    图片

  • 相关阅读:
    OpenCV笔记整理【视频处理】
    protobuf使用详解
    【python中级】Pillow包在图像中绘制中文
    LeetCode算法二叉树—LCR 194. 二叉树的最近公共祖先
    docker 分离engine和client
    荐书丨《哥德尔、艾舍尔、巴赫书:集异璧之大成》:机器人与音乐的次元壁破了
    Linux进程间通信
    2023-2024 人工智能专业毕设如何选题
    android studio 我遇到的Task :app:compileDebugJavaWithJavac FAILED问题及解决过程
    单体 or 微服务?你以为是架构权衡?其实是认知负载!
  • 原文地址:https://blog.csdn.net/hyunbar/article/details/126165320