• Flink Yarn Per Job - 启动TM,向RM注册,RM分配solt


    图片

    图片

    图片

    启动TaskManager

    YarnTaskExecutorRunner

    public static void main(String[] args) {
      EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args);
      SignalHandler.register(LOG);
      JvmShutdownSafeguard.installAsShutdownHook(LOG);
    
      runTaskManagerSecurely(args);
    }
    
    private static void runTaskManagerSecurely(String[] args) {
      try {
    ... ...
        TaskManagerRunner.runTaskManagerSecurely(configuration);
      }
    ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    TaskManagerRunner

    public static void runTaskManagerSecurely(Configuration configuration) throws Exception {
      replaceGracefulExitWithHaltIfConfigured(configuration);
      final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
      FileSystem.initialize(configuration, pluginManager);
    
      SecurityUtils.install(new SecurityConfiguration(configuration));
    
      SecurityUtils.getInstalledContext().runSecured(() -> {
        runTaskManager(configuration, pluginManager);
        return null;
      });
    }
    
    public static void runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {
      final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);
    
      taskManagerRunner.start();
    }
    
    public void start() throws Exception {
      taskExecutorService.start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    TaskExecutorToServiceAdapter

    implements TaskManagerRunner.TaskExecutorService
    
    • 1
    public void start() {
      // 通过Rpc服务,启动 TaskExecutor,找 它的 onStart()方法
      taskExecutor.start();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    通过Rpc服务,启动 TaskExecutor,找 它的 onStart()方法

    RpcEndpoint

    public final void start() {
      // 终端的启动,实际上是由 自身网关(RpcServer)来启动的
      rpcServer.start();
    }
    
    • 1
    • 2
    • 3
    • 4

    终端的启动,实际上是由 自身网关(RpcServer)来启动的

    TaskExecutor

    public void onStart() throws Exception {
      try {
        //启动 TaskExecutor服务
        startTaskExecutorServices();
      } catch (Throwable t) {
        final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);
        onFatalError(exception);
        throw exception;
      }
    
      startRegistrationTimeout();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    图片

    图片

    向ResourceManager注册

    private void startTaskExecutorServices() throws Exception {
      try {
        // start by connecting to the ResourceManager
        // 连接 RM
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    
        // tell the task slot table who's responsible for the task slot actions
        taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
    
        // start the job leader service
        jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
    
        fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
      } catch (Exception e) {
        handleStartTaskExecutorServicesException(e);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    StandaloneLeaderRetrievalService

    public void start(LeaderRetrievalListener listener) {
      checkNotNull(listener, "Listener must not be null.");
    
      synchronized (startStopLock) {
        checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
        started = true;
    
        // directly notify the listener, because we already know the leading JobManager's address
        listener.notifyLeaderAddress(leaderAddress, leaderId);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    JobMaster

    private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
    
      @Override
      public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
        runAsync(
          () -> notifyOfNewResourceManagerLeader(
            leaderAddress,
            ResourceManagerId.fromUuidOrNull(leaderSessionID)));
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    以下追踪步骤省略,直接到RegisteredRpcConnection中的start方法

    RegisteredRpcConnection

    public void start() {
      checkState(!closed, "The RPC connection is already closed");
      checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
    
      //  创建注册对象
      final RetryingRegistration newRegistration = createNewRegistration();
    
      if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
        // 开始注册,注册成功之后,调用 onRegistrationSuccess()
        newRegistration.startRegistration();
      } else {
        // concurrent start operation
        newRegistration.cancel();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 创建注册对象

    • 开始注册,注册成功之后,调用 onRegistrationSuccess()

    ResourceManagerRegistrationListener
    
    • 1

    in TaskExecutor

    @Override
    public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
      final ResourceID resourceManagerId = success.getResourceManagerId();
      final InstanceID taskExecutorRegistrationId = success.getRegistrationId();
      final ClusterInformation clusterInformation = success.getClusterInformation();
      final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();
    
      runAsync(
        () -> {
          // filter out outdated connections
          //noinspection ObjectEquality
          if (resourceManagerConnection == connection) {
            try {
              establishResourceManagerConnection(
                resourceManagerGateway,
                resourceManagerId,
                taskExecutorRegistrationId,
                clusterInformation);
            } catch (Throwable t) {
              log.error("Establishing Resource Manager connection in Task Executor failed", t);
            }
          }
        });
    }
    
    private void establishResourceManagerConnection(
        ResourceManagerGateway resourceManagerGateway,
        ResourceID resourceManagerResourceId,
        InstanceID taskExecutorRegistrationId,
        ClusterInformation clusterInformation) {
    
      final CompletableFuture slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
        getResourceID(),
        taskExecutorRegistrationId,
        taskSlotTable.createSlotReport(getResourceID()),
        taskManagerConfiguration.getTimeout());
    ... ...
      stopRegistrationTimeout();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    sendSlotReport向RM申请slot

    ResourceManager

    public CompletableFuture sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
      final WorkerRegistration workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);
    
      if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
        if (slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) {
          onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
      } else {
        return FutureUtils.completedExceptionally(new ResourceManagerException(String.format("Unknown TaskManager registration id %s.", taskManagerRegistrationId)));
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    SlotManagerImpl

    /**
     * Registers a new task manager at the slot manager. This will make the task managers slots
     * known and, thus, available for allocation.
     *
     * @param taskExecutorConnection for the new task manager
     * @param initialSlotReport for the new task manager
     * @return True if the task manager has not been registered before and is registered successfully; otherwise false
     */
    @Override
    public boolean registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
      checkInit();
    
      LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID().getStringWithMetadata(), taskExecutorConnection.getInstanceID());
    
      // we identify task managers by their instance id
      if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
        reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
        return false;
      } else {
        if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
          LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.", maxSlotNum);
          resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new FlinkException("The total number of slots exceeds the max limitation."));
          return false;
        }
    
        // first register the TaskManager
        ArrayList reportedSlots = new ArrayList<>();
    
        for (SlotStatus slotStatus : initialSlotReport) {
          reportedSlots.add(slotStatus.getSlotID());
        }
    
        TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(
          taskExecutorConnection,
          reportedSlots);
    
        taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
    
        // next register the new slots
        // 注册一个新的slot
        for (SlotStatus slotStatus : initialSlotReport) {
          registerSlot(
            slotStatus.getSlotID(),
            slotStatus.getAllocationID(),
            slotStatus.getJobID(),
            slotStatus.getResourceProfile(),
            taskExecutorConnection);
        }
        return true;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    图片

    图片

    ResourceManager分配slot

    SlotManagerImpl

    /**
     * Registers a slot for the given task manager at the slot manager. The slot is identified by
     * the given slot id. The given resource profile defines the available resources for the slot.
     * The task manager connection can be used to communicate with the task manager.
     *
     * @param slotId identifying the slot on the task manager
     * @param allocationId which is currently deployed in the slot
     * @param resourceProfile of the slot
     * @param taskManagerConnection to communicate with the remote task manager
     */
    private void registerSlot(
        SlotID slotId,
        AllocationID allocationId,
        JobID jobId,
        ResourceProfile resourceProfile,
        TaskExecutorConnection taskManagerConnection) {
    
      if (slots.containsKey(slotId)) {
        // remove the old slot first
        // 移除旧的slot
        removeSlot(
          slotId,
          new SlotManagerException(
            String.format(
              "Re-registration of slot %s. This indicates that the TaskExecutor has re-connected.",
              slotId)));
      }
    
      // 创建和注册 新的这些 slot
      final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
    
      final PendingTaskManagerSlot pendingTaskManagerSlot;
    
      if (allocationId == null) {
        pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
      } else {
        pendingTaskManagerSlot = null;
      }
    
      if (pendingTaskManagerSlot == null) {
        updateSlot(slotId, allocationId, jobId);
      } else {
        pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
        final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest();
    
        // 分配slot
        if (assignedPendingSlotRequest == null) {
          // 表示 挂起的请求都已经满足了,你暂时没事
          handleFreeSlot(slot);
        } else {
          // 表示 你要被分配给某个请求
          assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
          allocateSlot(slot, assignedPendingSlotRequest);
        }
      }
    }
    
    
    private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
      Preconditions.checkState(taskManagerSlot.getState() == SlotState.FREE);
    ... ...
      // RPC call to the task manager
      //分配完之后,通知 TM提供 slot给 JM
      CompletableFuture requestFuture = gateway.requestSlot(
        slotId,
        pendingSlotRequest.getJobId(),
        allocationId,
        pendingSlotRequest.getResourceProfile(),
        pendingSlotRequest.getTargetAddress(),
        resourceManagerId,
        taskManagerRequestTimeout);
    
      ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 移除旧的slot

    • 创建和注册 新的这些 slot

    • 分配slot

    • 分配完之后,通知 TaskManager提供 slot给 JobMaster

    图片

    图片

    TaskManager 提供slot

    TaskExecutor

    public CompletableFuture requestSlot(
    ... ...
      try {
        //  根据 RM的命令,分配taskmanager上的slot
        allocateSlot(
          slotId,
          jobId,
          allocationId,
          resourceProfile);
      } catch (SlotAllocationException sae) {
        return FutureUtils.completedExceptionally(sae);
      }
    ... ...
      if (job.isConnected()) {
        //连接上job, 向JobManager提供 slot
        offerSlotsToJobManager(jobId);
      }
    
      return CompletableFuture.completedFuture(Acknowledge.get());
    
    }
    
    private void offerSlotsToJobManager(final JobID jobId) {
      jobTable
        .getConnection(jobId)
        .ifPresent(this::internalOfferSlotsToJobManager);
    }
    
    private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnection) {
    ... ...
        CompletableFuture> acceptedSlotsFuture = jobMasterGateway.offerSlots(
          getResourceID(),
          reservedSlots,
          taskManagerConfiguration.getTimeout());
    ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 根据 RM的命令,分配taskmanager上的slot

    • 连接上job, 向JobManager提供 slot

    JobMaster

    public CompletableFuture> offerSlots(
    ... ...
      return CompletableFuture.completedFuture(
        slotPool.offerSlots(
          taskManagerLocation,
          rpcTaskManagerGateway,
          slots));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    SlotPoolImpl

    public Collection offerSlots(
        TaskManagerLocation taskManagerLocation,
        TaskManagerGateway taskManagerGateway,
        Collection offers) {
    
      ArrayList result = new ArrayList<>(offers.size());
    
      for (SlotOffer offer : offers) {
        if (offerSlot(
          taskManagerLocation,
          taskManagerGateway,
          offer)) {
    
          result.add(offer);
        }
      }
      return result;
    }
    
    /**
     * Slot offering by TaskExecutor with AllocationID. The AllocationID is originally generated by this pool and
     * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
     * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
     * request waiting for this slot (maybe fulfilled by some other returned slot).
     *
     * @param taskManagerLocation location from where the offer comes from
     * @param taskManagerGateway TaskManager gateway
     * @param slotOffer the offered slot
     * @return True if we accept the offering
     */
    boolean offerSlot(
        final TaskManagerLocation taskManagerLocation,
        final TaskManagerGateway taskManagerGateway,
        final SlotOffer slotOffer) {
    
      componentMainThreadExecutor.assertRunningInMainThread();
    
      // check if this TaskManager is valid
      final ResourceID resourceID = taskManagerLocation.getResourceID();
      final AllocationID allocationID = slotOffer.getAllocationId();
    
      if (!registeredTaskManagers.contains(resourceID)) {
        log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
            slotOffer.getAllocationId(), taskManagerLocation);
        return false;
      }
    
      // check whether we have already using this slot
      AllocatedSlot existingSlot;
      if ((existingSlot = allocatedSlots.get(allocationID)) != null ||
        (existingSlot = availableSlots.get(allocationID)) != null) {
    
        // we need to figure out if this is a repeated offer for the exact same slot,
        // or another offer that comes from a different TaskManager after the ResourceManager
        // re-tried the request
    
        // we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of
        // the actual slots on the TaskManagers
        // Note: The slotOffer should have the SlotID
        final SlotID existingSlotId = existingSlot.getSlotId();
        final SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex());
    
        if (existingSlotId.equals(newSlotId)) {
          log.info("Received repeated offer for slot [{}]. Ignoring.", allocationID);
    
          // return true here so that the sender will get a positive acknowledgement to the retry
          // and mark the offering as a success
          return true;
        } else {
          // the allocation has been fulfilled by another slot, reject the offer so the task executor
          // will offer the slot to the resource manager
          return false;
        }
      }
    
      final AllocatedSlot allocatedSlot = new AllocatedSlot(
        allocationID,
        taskManagerLocation,
        slotOffer.getSlotIndex(),
        slotOffer.getResourceProfile(),
        taskManagerGateway);
    
      // use the slot to fulfill pending request, in requested order
      tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
    
      // we accepted the request in any case. slot will be released after it idled for
      // too long and timed out
      return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    图片

  • 相关阅读:
    JNI查漏补缺(3)JNI调用java层
    shell脚本之数组元素排序
    学习笔记-配置备份静态路由及优先级
    CTO说不建议我使用SELECT * ,这是为什么?
    【目标】新学期计划与目标
    Dubbo+Zookeeper远程调用服务
    EasyExcel对大数据量表格操作导入导出
    物联网行业数字化转型CRM解决方案
    手撸mybatis07: SQL执行器的定义和实现
    不要高估迷信自己的毅力:交钱后坚持培训的比例不到1%
  • 原文地址:https://blog.csdn.net/hyunbar/article/details/126165327