• 尚硅谷大叔培训:揭秘Flink四种执行图——ExecutionGraph和物理执行图


    在上一篇文章 揭秘Flink四种执行图(上)——StreamGraph和JobGraph 中,我们已经通过源码详细分析过StremGraph和JobGraph是如何生成的,本文将继续深度解读ExecutionGraph和物理执行图的生成。

    1、ExecutionGraph在Jobanager生成

    client生成JobGraph之后,就通过submitJob提交给JobManager,JobManager 会根据 JobGraph生成对应的ExecutionGraph。

    ExecutionGraph 是Flink 作业调度时使⽤到的核⼼数据结构,它包含每⼀个并⾏的 task、每⼀个 intermediate stream 以及它们之间的关系。

    以per-job模式为例,分析 ExecutionGraph的生成逻辑:

    在Dispacher 创建JobManagerRunner时,调用createJobManagerRunner:

    => createJobManagerRunner()

    =>new JobManagerRunnerImpl()

    =>createJobMasterService()

    => new JobMaster()

    在创建JobMaster的时候,创建了Scheduler调度器

    => createScheduler()

    => createInstance()

    => new DefaultScheduler() #调度器

    => createAndRestoreExecutionGraph()

    => createExecutionGraph()

    => ExecutionGraphBuilder.buildGraph()

    下面是源码详细地跳转过程,可以直接略过,看后面buildGraph()方法的分析:

    Dispatcher.java

    CompletableFuture createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
             final RpcService rpcService =getRpcService();
             return CompletableFuture.supplyAsync(
                      () -> {
                              try {
                                       JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
                                                jobGraph,
                                                configuration,
                                                rpcService,
                                                highAvailabilityServices,
                                                heartbeatServices,
                                                jobManagerSharedServices,
                                                new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                                                fatalErrorHandler,
                                                initializationTimestamp);
                                       // 启动 JobManagerRunner
                                       runner.start();
                                       return runner;
                              }
                      ......
    }

    DefaultJobManagerRunnerFactory.java

    public JobManagerRunner createJobManagerRunner(
                      JobGraph jobGraph,
                      Configuration configuration,
                      RpcService rpcService,
                      HighAvailabilityServices highAvailabilityServices,
                      HeartbeatServices heartbeatServices,
                      JobManagerSharedServices jobManagerServices,
                      JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
                      FatalErrorHandler fatalErrorHandler,
                      long initializationTimestamp)throws Exception {
     
             ... ...
     
             return new JobManagerRunnerImpl(
                      jobGraph,
                      jobMasterFactory,
                      highAvailabilityServices,
     
             ... ...
    }

    JobManagerRunnerImpl.java

    public JobManagerRunnerImpl(
                      final JobGraph jobGraph,
                      final JobMasterServiceFactoryjobMasterFactory,
                      final HighAvailabilityServiceshaServices,
                      final LibraryCacheManager.ClassLoaderLease classLoaderLease,
                      final Executor executor,
                      final FatalErrorHandlerfatalErrorHandler,
                      long initializationTimestamp)throws Exception {
     
     
             this.jobMasterService =jobMasterFactory.createJobMasterService(jobGraph, this,userCodeLoader, initializationTimestamp);
    }

    DefaultJobManagerRunnerFactory.java

    public JobMaster createJobMasterService(
                      JobGraph jobGraph,
                      OnCompletionActions jobCompletionActions,
                      ClassLoader userCodeClassloader,
                      long initializationTimestamp)throws Exception {
     
             return new JobMaster(
                      rpcService,
                      jobMasterConfiguration,
                      ResourceID.generate(),
                      jobGraph,
                      haServices,
                      slotPoolFactory,
                      jobManagerSharedServices,
                      heartbeatServices,
                      jobManagerJobMetricGroupFactory,
                      jobCompletionActions,
                      fatalErrorHandler,
                      userCodeClassloader,
                      schedulerNGFactory,
                      shuffleMaster,
                      lookup -> new JobMasterPartitionTrackerImpl(
                              jobGraph.getJobID(),
                              shuffleMaster,
                              lookup
                      ),
                      new DefaultExecutionDeploymentTracker(),
                      DefaultExecutionDeploymentReconciler::new,
                      initializationTimestamp);
    }

    JobMaster.java

    public JobMaster(
                      RpcService rpcService,
                      JobMasterConfigurationjobMasterConfiguration,
                      ResourceID resourceId,
                      JobGraph jobGraph,
                      HighAvailabilityServices highAvailabilityService,
                      SlotPoolFactory slotPoolFactory,
                      JobManagerSharedServices jobManagerSharedServices,
                      HeartbeatServices heartbeatServices,
                      JobManagerJobMetricGroupFactory jobMetricGroupFactory,
                      OnCompletionActions jobCompletionActions,
                      FatalErrorHandler fatalErrorHandler,
                      ClassLoader userCodeLoader,
                      SchedulerNGFactory schedulerNGFactory,
                      ShuffleMaster shuffleMaster,
                      PartitionTrackerFactory partitionTrackerFactory,
                      ExecutionDeploymentTracker executionDeploymentTracker,
                      ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory,
                      long initializationTimestamp)throws Exception {
     
             ... ...
             this.schedulerNG = createScheduler(executionDeploymentTracker,jobManagerJobMetricGroup);
             ... ...
    }
    private SchedulerNG createScheduler(ExecutionDeploymentTracker executionDeploymentTracker,
                                                                                   final JobManagerJobMetricGroup jobManager JobMetricGroup) throws Exception {
             return schedulerNGFactory.createInstance(
                      log,
                      jobGraph,
                      backPressureStatsTracker,
                      scheduledExecutorService,
                      jobMasterConfiguration.getConfiguration(),
                      slotPool,
                      scheduledExecutorService,
                      userCodeLoader,
                      highAvailabilityServices.getCheckpointRecoveryFactory(),
                      rpcTimeout,
                      blobWriter,
                      jobManagerJobMetricGroup,
                      jobMasterConfiguration.getSlotRequestTimeout(),
                      shuffleMaster,
                      partitionTracker,
                      executionDeploymentTracker,
                      initializationTimestamp);
    }

    DefaultSchedulerFactory.java

    public SchedulerNG createInstance(
                      final Logger log,
                      final JobGraph jobGraph,
                      final BackPressureStatsTrackerbackPressureStatsTracker,
                      final Executor ioExecutor,
                      final ConfigurationjobMasterConfiguration,
                      final SlotPool slotPool,
                      final ScheduledExecutorServicefutureExecutor,
                      final ClassLoaderuserCodeLoader,
                      final CheckpointRecoveryFactory checkpointRecoveryFactory,
                      final Time rpcTimeout,
                      final BlobWriter blobWriter,
                      final JobManagerJobMetricGroupjobManagerJobMetricGroup,
                      final Time slotRequestTimeout,
                      final ShuffleMastershuffleMaster,
                      final JobMasterPartitionTracker partitionTracker,
                      final ExecutionDeploymentTracker executionDeploymentTracker,
                      long initializationTimestamp)throws Exception {
     
             ... ...
     
             return new DefaultScheduler(
                      log,
                      jobGraph,
                      backPressureStatsTracker,
                      ioExecutor,
                      jobMasterConfiguration,
                      schedulerComponents.getStartUpAction(),
                      futureExecutor,
                      newScheduledExecutorServiceAdapter(futureExecutor),
                      userCodeLoader,
                      checkpointRecoveryFactory,
                      rpcTimeout,
                      blobWriter,
                      jobManagerJobMetricGroup,
                      shuffleMaster,
                      partitionTracker,
                      schedulerComponents.getSchedulingStrategyFactory(),
                      FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
                      restartBackoffTimeStrategy,
                      newDefaultExecutionVertexOperations(),
                      new ExecutionVertexVersioner(),
                      schedulerComponents.getAllocatorFactory(),
                      executionDeploymentTracker,
                      initializationTimestamp);
    }

    SchedulerBase.java

    public SchedulerBase(
             final Logger log,
             final JobGraph jobGraph,
             final BackPressureStatsTrackerbackPressureStatsTracker,
             final Executor ioExecutor,
             final ConfigurationjobMasterConfiguration,
             final SlotProvider slotProvider,
             final ScheduledExecutorServicefutureExecutor,
             final ClassLoader userCodeLoader,
             final CheckpointRecoveryFactory checkpointRecoveryFactory,
             final Time rpcTimeout,
             final RestartStrategyFactoryrestartStrategyFactory,
             final BlobWriter blobWriter,
             final JobManagerJobMetricGroupjobManagerJobMetricGroup,
             final Time slotRequestTimeout,
             final ShuffleMastershuffleMaster,
             final JobMasterPartitionTrackerpartitionTracker,
             final ExecutionVertexVersionerexecutionVertexVersioner,
             final ExecutionDeploymentTrackerexecutionDeploymentTracker,
             final boolean legacyScheduling,
             long initializationTimestamp) throwsException {
             ... ...
             this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup,checkNotNull(shuffleMaster), checkNotNull(partitionTracker),checkNotNull(executionDeploymentTracker), initializationTimestamp);
             ... ...
    }
    private ExecutionGraph createAndRestoreExecutionGraph(
             JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
             ShuffleMaster shuffleMaster,
             JobMasterPartitionTracker partitionTracker,
             ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp) throws Exception {
     
             ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup,shuffleMaster, partitionTracker, executionDeploymentTracker,initializationTimestamp);
     
             ... ...
    }
    private ExecutionGraph createExecutionGraph(
             JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
             ShuffleMaster shuffleMaster,
             final JobMasterPartitionTracker partitionTracker,
             ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp) throws JobExecutionException, JobException {
     
             ... ...
     
             return ExecutionGraphBuilder.buildGraph(
                      null,
                      jobGraph,
                      jobMasterConfiguration,
                      futureExecutor,
                      ioExecutor,
                      slotProvider,
                      userCodeLoader,
                      checkpointRecoveryFactory,
                      rpcTimeout,
                      restartStrategy,
                      currentJobManagerJobMetricGroup,
                      blobWriter,
                      slotRequestTimeout,
                      log,
                      shuffleMaster,
                      partitionTracker,
                      failoverStrategy,
                      executionDeploymentListener,
                      executionStateUpdateListener,
                      initializationTimestamp);
    }

    接下来,分析生成ExecutionGraph的核心逻辑:

    ExecutionGraphBuilder.java

    public static ExecutionGraph buildGraph(
             @Nullable ExecutionGraph prior,
             JobGraph jobGraph,
             Configuration jobManagerConfig,
             ScheduledExecutorService futureExecutor,
             Executor ioExecutor,
             SlotProvider slotProvider,
             ClassLoader classLoader,
             CheckpointRecoveryFactoryrecoveryFactory,
             Time rpcTimeout,
             RestartStrategy restartStrategy,
             MetricGroup metrics,
             BlobWriter blobWriter,
             Time allocationTimeout,
             Logger log,
             ShuffleMaster shuffleMaster,
             JobMasterPartitionTracker partitionTracker,
             FailoverStrategy.Factory failoverStrategyFactory,
             ExecutionDeploymentListener executionDeploymentListener,
             ExecutionStateUpdateListener executionStateUpdateListener,
             long initializationTimestamp) throws JobExecutionException, JobException {
     
             checkNotNull(jobGraph, "job graphcannot be null");
     
             final String jobName =jobGraph.getName();
             final JobID jobId =jobGraph.getJobID();
     
             final JobInformation jobInformation =new JobInformation(
                      jobId,
                      jobName,
                      jobGraph.getSerializedExecutionConfig(),
                      jobGraph.getJobConfiguration(),
                      jobGraph.getUserJarBlobKeys(),
                      jobGraph.getClasspaths());
     
             final int maxPriorAttemptsHistoryLength=
                              jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
     
             final PartitionReleaseStrategy.FactorypartitionReleaseStrategyFactory =
                      PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);
     
             // create a new execution graph, ifnone exists so far
             // 如果不存在执⾏图,就创建⼀个新的执⾏图
             final ExecutionGraph executionGraph;
             try {
                      executionGraph = (prior !=null) ? prior :
                              new ExecutionGraph(
                                       jobInformation,
                                       futureExecutor,
                                       ioExecutor,
                                       rpcTimeout,
                                       restartStrategy,
                                       maxPriorAttemptsHistoryLength,
                                       failoverStrategyFactory,
                                       slotProvider,
                                       classLoader,
                                       blobWriter,
                                       allocationTimeout,
                                       partitionReleaseStrategyFactory,
                                       shuffleMaster,
                                       partitionTracker,
                                       jobGraph.getScheduleMode(),
                                       executionDeploymentListener,
                                       executionStateUpdateListener,
                                       initializationTimestamp);
             } catch (IOException e) {
                      throw newJobException("Could not create the ExecutionGraph.", e);
             }
     
             // set the basic properties
     
             try {
                      executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
             }
             catch (Throwable t) {
                      log.warn("Cannot createJSON plan for job", t);
                      // give the graph an emptyplan
                      executionGraph.setJsonPlan("{}");
             }
     
             // initialize the vertices that have amaster initialization hook
             // file output formats createdirectories here, input formats create splits
     
             final long initMasterStart = System.nanoTime();
             log.info("Running initializationon master for job {} ({}).", jobName, jobId);
     
             for (JobVertex vertex :jobGraph.getVertices()) {
    // 获取作业图中的每个节点的执⾏类,检查⼀下有没有没有执⾏类的节点,防御式编程
                      String executableClass = vertex.getInvokableClassName();
                      if (executableClass == null ||executableClass.isEmpty()) {
                              throw newJobSubmissionException(jobId,
                                                "Thevertex " + vertex.getID() + " (" + vertex.getName() + ")has no invokable class.");
                      }
     
                      try {
                              // 设置好每个节点的类加载器
                              vertex.initializeOnMaster(classLoader);
                      }
                      catch (Throwable t) {
                                       throw newJobExecutionException(jobId,
                                                         "Cannotinitialize task '" + vertex.getName() + "': " + t.getMessage(),t);
                      }
             }
     
             log.info("Successfully raninitialization on master in {} ms.",
                              (System.nanoTime() -initMasterStart) / 1_000_000);
     
             // topologically sort the job verticesand attach the graph to the existing one
             // 对JobGraph进⾏拓扑排序,获取所有的JobVertex列表
             List sortedTopology =jobGraph.getVerticesSortedTopologicallyFromSources();
             if (log.isDebugEnabled()) {
                      log.debug("Adding {}vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
             }
             // 核心逻辑:将拓扑排序过的JobGraph添加到 executionGraph数据结构中。
             executionGraph.attachJobGraph(sortedTopology);
     
             ... ...
    }
    public void attachJobGraph(ListtopologiallySorted) throws JobException {
             ... ...
             // ExecutionJobVertex是执⾏图的节点
             final ArrayListnewExecJobVertices = new ArrayList<>(topologiallySorted.size());
             final long createTimestamp =System.currentTimeMillis();
     
             // 遍历JobVertex
             for (JobVertex jobVertex :topologiallySorted) {
     
                      if (jobVertex.isInputVertex()&& !jobVertex.isStoppable()) {
                              this.isStoppable =false;
                      }
     
                      // create the execution jobvertex and attach it to the graph
                      // 实例化执⾏图节点,根据每⼀个job vertex,创建对应的 ExecutionVertex
                      ExecutionJobVertex ejv = new ExecutionJobVertex(
                                       this,
                                       jobVertex,
                                       1,
                                       maxPriorAttemptsHistoryLength,
                                       rpcTimeout,
                                       globalModVersion,
                                       createTimestamp);
                      // 将创建的ExecutionJobVertex与前置的IntermediateResult连接起来
                      ejv.connectToPredecessors(this.intermediateResults);
     
                      ExecutionJobVertexpreviousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
                      if (previousTask != null) {
                              throw newJobException(String.format("Encountered two job vertices with ID %s :previous=[%s] / new=[%s]",
                                       jobVertex.getID(),ejv, previousTask));
                      }
     
                      for (IntermediateResult res :ejv.getProducedDataSets()) {
                              IntermediateResultpreviousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
                              if (previousDataSet !=null) {
                                       throw newJobException(String.format("Encountered two intermediate data set with ID%s : previous=[%s] / new=[%s]",
                                                res.getId(),res, previousDataSet));
                              }
                      }
             //节点总数量需要加上当前执⾏图节点的并⾏度,因为执⾏图是作业图的并⾏化版本
             // 并⾏化就体现在并⾏度上,⼀个并⾏度对应⼀个节点。
                      this.verticesInCreationOrder.add(ejv);
                      this.numVerticesTotal +=ejv.getParallelism();
                      // 将当前执⾏图节点加⼊到图中
                      newExecJobVertices.add(ejv);
             }
     
             // the topology assigning should happenbefore notifying new vertices to failoverStrategy
             executionTopology =DefaultExecutionTopology.fromExecutionGraph(this);
     
             failoverStrategy.notifyNewVertices(newExecJobVertices);
     
             partitionReleaseStrategy =partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());
    }
    public void connectToPredecessors(Map intermediateDataSets) throws JobException {
             // 获取输入的JobEdge列表
             List inputs =jobVertex.getInputs();
     
             if (LOG.isDebugEnabled()) {
                      LOG.debug(String.format("ConnectingExecutionJobVertex %s (%s) to %d predecessors.", jobVertex.getID(),jobVertex.getName(), inputs.size()));
             }
    // 遍历每条JobEdge
             for (int num = 0; num 
    public void connectSource(int inputNumber,IntermediateResult source, JobEdge edge, int consumerNumber) {
             // 只有forward的方式的情况下,pattern才是POINTWISE的,否则均为ALL_TO_ALL
             final DistributionPattern pattern =edge.getDistributionPattern();
             final IntermediateResultPartition[]sourcePartitions = source.getPartitions();
     
             ExecutionEdge[] edges;
     
             switch (pattern) {
                      case POINTWISE:
                              edges =connectPointwise(sourcePartitions, inputNumber);
                              break;
     
                      case ALL_TO_ALL:
                              edges = connectAllToAll(sourcePartitions,inputNumber);
                              break;
     
                      default:
                              throw new RuntimeException("Unrecognizeddistribution pattern.");
     
             }
     
             inputEdges[inputNumber] = edges;
     
             // add the consumers to the source
             // for now (until the receiverinitiated handshake is in place), we need to register the
             // edges as the execution graph
             //之前已经为IntermediateResult添加了consumer,
    // 这里为IntermediateResultPartition添加consumer,即关联到ExecutionEdge上
             for (ExecutionEdge ee : edges) {
                      ee.getSource().addConsumer(ee,consumerNumber);
             }
    }
    private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[]sourcePartitions, int inputNumber) {
             ExecutionEdge[] edges = newExecutionEdge[sourcePartitions.length];
     
             for (int i = 0; i  
    

    看这个方法之前,需要知道,ExecutionVertex的inputEdges变量,是一个二维数据。它表示了这个ExecutionVertex上每一个input所包含的ExecutionEdge列表。

    即,如果ExecutionVertex有两个不同的输入:输入A和B。其中输入A的partition=1, 输入B的partition=8,那么这个二维数组inputEdges如下(以irp代替
    IntermediateResultPartition)

    [ ExecutionEdge[ A.irp[0]] ]
    [ ExecutionEdge[ B.irp[0], B.irp[1], ..., B.irp[7] ]

    到这里为止,ExecutionJobGraph就创建完成了。

    2、使用Akka

    接着进行调度的源码分析:

    JobMaster.java

    private Acknowledge startJobExecution(JobMasterId newJobMasterId)throws Exception {
    ... ...
    // 启动JobMaster
             startJobMasterServices();
     
             log.info("Starting execution ofjob {} ({}) under job master id {}.", jobGraph.getName(),jobGraph.getJobID(), newJobMasterId);
             // 重置开始调度
             resetAndStartScheduler();
    ... ...
    }
    private void resetAndStartScheduler() throwsException {
             ... ...
             FutureUtils.assertNoException(schedulerAssignedFuture.thenRun(this::startScheduling));
    }
    private void startScheduling() {
             checkState(jobStatusListener == null);
             // register self as job status changelistener
             jobStatusListener = new JobManagerJobStatusListener();
             schedulerNG.registerJobStatusListener(jobStatusListener);
     
             schedulerNG.startScheduling();
    }

    DefaultScheduler.java

    protected void startSchedulingInternal() {
             log.info("Starting scheduling withscheduling strategy [{}]", schedulingStrategy.getClass().getName());
             prepareExecutionGraphForNgScheduling();
             schedulingStrategy.startScheduling();
    }

    PipelinedRegionSchedulingStrategy.java

    public void startScheduling() {
             final Set sourceRegions = IterableUtils
                      .toStream(schedulingTopology.getAllPipelinedRegions())
                      .filter(region ->!region.getConsumedResults().iterator().hasNext())
                      .collect(Collectors.toSet());
             maybeScheduleRegions(sourceRegions);
    }
    private void maybeScheduleRegions(finalSet regions) {
             final List regionsSorted =
                      SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology,regions);
             for (SchedulingPipelinedRegion region :regionsSorted) {
                      maybeScheduleRegion(region);
             }
    }
    private void maybeScheduleRegion(finalSchedulingPipelinedRegion region) {
             if(!areRegionInputsAllConsumable(region)) {
                      return;
             }
     
             checkState(areRegionVerticesAllInCreatedState(region),"BUG: trying to schedule a region which is not in CREATED state");
     
             final List vertexDeploymentOptions =
                      SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
                              regionVerticesSorted.get(region),
                              id -> deploymentOption);
             schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
    }

    DefaultScheduler.java

    public void allocateSlotsAndDeploy(final List executionVertexDeploymentOptions) {
             validateDeploymentOptions(executionVertexDeploymentOptions);
     
             final Map deploymentOptionsByVertex =
                      groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);
     
             final List verticesToDeploy= executionVertexDeploymentOptions.stream()
                      .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
                      .collect(Collectors.toList());
     
             final Map requiredVersionByVertex =
                      executionVertexVersioner.recordVertexModifications(verticesToDeploy);
     
             transitionToScheduled(verticesToDeploy);
     
             finalList slotExecutionVertexAssignments =
                      allocateSlots(executionVertexDeploymentOptions);
     
             final List deploymentHandles= createDeploymentHandles(
                      requiredVersionByVertex,
                      deploymentOptionsByVertex,
                      slotExecutionVertexAssignments);
     
             waitForAllSlotsAndDeploy(deploymentHandles);
    }
    private void waitForAllSlotsAndDeploy(final ListdeploymentHandles) {
             FutureUtils.assertNoException(
                      assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
    }
    private BiFunction deployAll(final List deploymentHandles){
             return (ignored, throwable) -> {
                      propagateIfNonNull(throwable);
                      for (final DeploymentHandledeploymentHandle : deploymentHandles) {
                              finalSlotExecutionVertexAssignment slotExecutionVertexAssignment =deploymentHandle.getSlotExecutionVertexAssignment();
                              finalCompletableFuture slotAssigned =slotExecutionVertexAssignment.getLogicalSlotFuture();
                              checkState(slotAssigned.isDone());
     
                              FutureUtils.assertNoException(
                                       slotAssigned.handle(deployOrHandleError(deploymentHandle)));
                      }
                      return null;
             };
    }
    private BiFunction deployOrHandleError(final DeploymentHandledeploymentHandle) {
             final ExecutionVertexVersionrequiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
             final ExecutionVertexIDexecutionVertexId = requiredVertexVersion.getExecutionVertexId();
     
             return (ignored, throwable) -> {
                      if(executionVertexVersioner.isModified(requiredVertexVersion)) {
                              log.debug("Refusingto deploy execution vertex {} because this deployment was " +
                                       "supersededby another deployment", executionVertexId);
                              return null;
                      }
     
                      if (throwable == null) {
                              deployTaskSafe(executionVertexId);
                      } else {
                              handleTaskDeploymentFailure(executionVertexId,throwable);
                      }
                      return null;
             };
    }
    private void deployTaskSafe(final ExecutionVertexIDexecutionVertexId) {
             try {
                      // 通过执行图的节点ID获取执行图的节点
                      final ExecutionVertexexecutionVertex = getExecutionVertex(executionVertexId);
                      // deploy方法用来部署执行图节点
                      executionVertexOperations.deploy(executionVertex);
             } catch (Throwable e) {
                      handleTaskDeploymentFailure(executionVertexId,e);
             }
    }

    DefaultExecutionVertexOperations.java

    public void deploy(final ExecutionVertexexecutionVertex) throws JobException {
             executionVertex.deploy();
    }

    ExecutionVertex.java

    public void deploy() throws JobException {
             currentExecution.deploy();
    }

    Execution.java

    public void deploy() throws JobException {
             ... ...
             // 包含了从Execution Graph到真正物理执行图的转换。
             // 比如将IntermediateResultPartition转化成ResultPartition,
             // ExecutionEdge转成InputChannelDeploymentDescriptor(最终会在执行时转化成InputGate)。
                      final TaskDeploymentDescriptordeployment = TaskDeploymentDescriptorFactory
                              .fromExecutionVertex(vertex, attemptNumber)
                              .createDeploymentDescriptor(
                                       slot.getAllocationId(),
                                       slot.getPhysicalSlotNumber(),
                                       taskRestore,
                                       producedPartitions.values());
                      ... ...
                      // We run the submission inthe future executor so that the serialization of large TDDs does not block
                      // the main thread and syncback to the main thread once submission is completed.
                      CompletableFuture.supplyAsync(()-> taskManagerGateway.submitTask(deployment,rpcTimeout), executor)
     
             ... ...
    }

    RpcTaskManagerGateway.java

    public CompletableFuture submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
             return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
    }

    TaskExecutor.java

    public CompletableFuture submitTask(
                      TaskDeploymentDescriptor tdd,
                      JobMasterId jobMasterId,
                      Time timeout) {
     
             try {
             ... ...
     
                      Task task = new Task(
                              jobInformation,
                              taskInformation,
                              tdd.getExecutionAttemptId(),
                              tdd.getAllocationId(),
                              tdd.getSubtaskIndex(),
                              tdd.getAttemptNumber(),
                              tdd.getProducedPartitions(),
                              tdd.getInputGates(),
                              tdd.getTargetSlotNumber(),
                              memoryManager,
                              taskExecutorServices.getIOManager(),
                              taskExecutorServices.getShuffleEnvironment(),
                              taskExecutorServices.getKvStateService(),
                              taskExecutorServices.getBroadcastVariableManager(),
                              taskExecutorServices.getTaskEventDispatcher(),
                              externalResourceInfoProvider,
                              taskStateManager,
                              taskManagerActions,
                              inputSplitProvider,
                              checkpointResponder,
                              taskOperatorEventGateway,
                              aggregateManager,
                              classLoaderHandle,
                              fileCache,
                              taskManagerConfiguration,
                              taskMetricGroup,
                              resultPartitionConsumableNotifier,
                              partitionStateChecker,
                              getRpcService().getExecutor());
     
                      ... ...
     
                      if (taskAdded) {
                              task.startTaskThread();
     
                              ... ...
                      }
                      ... ...
    }

    Task.java

    public void startTaskThread() {
             executingThread.start();
    }

    接下来启动Task执行线程,调用Task.run()-> doRun()

    private void doRun() {
             ... ...
                      // now load and instantiatethe task's invokable code
                      // 加载和实例化task的可执行代码
                      invokable =loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(),nameOfInvokableClass, env);
     
                      ... ...
     
                      // run the invokable
                      // 执行代码
                      invokable.invoke();
             ... ...
    }

    这里的invokable即为operator对象实例,通过反射创建,比如StreamTask。

    nameOfInvokableClass在生成StreamGraph的时候,就已经确定了,见3.1.2 中的StreamGraph.addOperator方法:

    public  void addOperator(
                      Integer vertexID,
                      @Nullable StringslotSharingGroup,
                      @Nullable String coLocationGroup,
                      StreamOperatorFactory operatorFactory,
                      TypeInformation inTypeInfo,
                      TypeInformation outTypeInfo,
                      String operatorName) {
             Class invokableClass=
                              operatorFactory.isStreamSource()? SourceStreamTask.class : OneInputStreamTask.class;
             addOperator(vertexID, slotSharingGroup,coLocationGroup, operatorFactory, inTypeInfo,
                              outTypeInfo,operatorName, invokableClass);
    }

    这里的OneInputStreamTask.class即为生成的StreamNode的vertexClass。这个值会一直传递,当StreamGraph被转化成JobGraph的时候,这个值会被传递到JobVertex的invokableClass。然后当JobGraph被转成ExecutionGraph的时候,这个值被传入到
    ExecutionJobVertex.TaskInformation.invokableClassName中,一直传到Task中。

    继续看invokable.invoke():

    StreamTask.java

    public final void invoke() throws Exception {
             try {
                      // 运⾏任务之前的准备⼯作
                      beforeInvoke();
     
                      ... ...
     
                      // let the task do its work
                      // 关键逻辑:运行任务
                      runMailboxLoop();
     
                      ... ...
     
                      // 运行任务之后的清理工作
                      afterInvoke();
             }
             ... ...
             cleanUpInvoke();
    }
    public void runMailboxLoop() throws Exception {
             mailboxProcessor.runMailboxLoop();
    }

    MailboxProcessor.java

    public void runMailboxLoop() throws Exception {
     
             final TaskMailbox localMailbox =mailbox;
     
             Preconditions.checkState(
                      localMailbox.isMailboxThread(),
                      "Method must be executedby declared mailbox thread!");
     
             assert localMailbox.getState() ==TaskMailbox.State.OPEN : "Mailbox must be opened!";
     
             final MailboxControllerdefaultActionContext = new MailboxController(this);
     
             // 邮箱里有邮件,就进行处理。邮件就是类似于map之类的⼦任务。
             while (isMailboxLoopRunning()) {
                      // The blocking `processMail`call will not return until default action is available.
                      processMail(localMailbox,false);
                      if (isMailboxLoopRunning()) {
                              mailboxDefaultAction.runDefaultAction(defaultActionContext); //lock is acquired inside default action as needed
                      }
             }
    }

    runDefaultAction()执行默认操作,通过Control+h查找具体实现,为StreamTask.java中第292行

    StreamTask.java

    protected StreamTask(
                      Environment environment,
                      @Nullable TimerServicetimerService,
                      Thread.UncaughtExceptionHandleruncaughtExceptionHandler,
                      StreamTaskActionExecutoractionExecutor,
                      TaskMailbox mailbox) throwsException {
             ... ...
             // 查看MailboxProcessor的构造器,第一个参数就是默认操作
             this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
     
             ... ...
    }

    MailboxProcessor.java查看构造器

    public MailboxProcessor(
                      MailboxDefaultAction mailboxDefaultAction,
                      TaskMailbox mailbox,
                      StreamTaskActionExecutoractionExecutor) {
             this.mailboxDefaultAction =Preconditions.checkNotNull(mailboxDefaultAction);
             this.actionExecutor =Preconditions.checkNotNull(actionExecutor);
             this.mailbox =Preconditions.checkNotNull(mailbox);
             this.mailboxLoopRunning = true;
             this.suspendedDefaultAction = null;
    }

    所以执行的默认操作就是processInput():

    StreamTask.java

    protected void processInput(MailboxDefaultAction.Controllercontroller) throws Exception {
             InputStatus status = inputProcessor.processInput();
             if (status ==InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
                      return;
             }
             if (status == InputStatus.END_OF_INPUT){
                      controller.allActionsCompleted();
                      return;
             }
             CompletableFuture jointFuture= getInputOutputJointFuture(status);
             MailboxDefaultAction.SuspensionsuspendedDefaultAction = controller.suspendDefaultAction();
             assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume));
    }

    StreamOneInputProcessor.java

    public InputStatus processInput() throws Exception {
             InputStatus status = input.emitNext(output);
     
             if (status == InputStatus.END_OF_INPUT){
                      endOfInputAware.endInput(input.getInputIndex()+ 1);
             }
     
             return status;
    }

    StreamTaskNetworkInput.java

    public InputStatus emitNext(DataOutput output)throws Exception {
     
             while (true) {
                      // get the stream element fromthe deserializer
                      if (currentRecordDeserializer!= null) {
                              DeserializationResultresult = currentRecordDeserializer.getNextRecord(deserializationDelegate);
                              if(result.isBufferConsumed()) {
                                       currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                                       currentRecordDeserializer= null;
                              }
     
                              if(result.isFullRecord()) {
                                       processElement(deserializationDelegate.getInstance(),output);
                                       returnInputStatus.MORE_AVAILABLE;
                              }
                      }
     
                      OptionalbufferOrEvent = checkpointedInputGate.pollNext();
                      if (bufferOrEvent.isPresent()){
                              // return to themailbox after receiving a checkpoint barrier to avoid processing of
                              // data after thebarrier before checkpoint is performed for unaligned checkpoint mode
                              if(bufferOrEvent.get().isBuffer()) {
                                       processBuffer(bufferOrEvent.get());
                              } else {
                                       processEvent(bufferOrEvent.get());
                                       returnInputStatus.MORE_AVAILABLE;
                              }
                      } else {
                              if(checkpointedInputGate.isFinished()) {
                                       checkState(checkpointedInputGate.getAvailableFuture().isDone(),"Finished BarrierHandler should be available");
                                       returnInputStatus.END_OF_INPUT;
                              }
                              returnInputStatus.NOTHING_AVAILABLE;
                      }
             }
    }
    private void processElement(StreamElement recordOrMark,DataOutput output) throws Exception {
             if (recordOrMark.isRecord()){
                      output.emitRecord(recordOrMark.asRecord());
             } else if (recordOrMark.isWatermark()){
                      statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(),lastChannel, output);
             } else if(recordOrMark.isLatencyMarker()) {
                      output.emitLatencyMarker(recordOrMark.asLatencyMarker());
             } else if(recordOrMark.isStreamStatus()) {
                      statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(),lastChannel, output);
             } else {
                      throw newUnsupportedOperationException("Unknown type of StreamElement");
             }
    }

    如果是map算子,emitRecord应该在OneInputStreamTask.java调用

    public void emitRecord(StreamRecordrecord) throws Exception {
             numRecordsIn.inc();
             operator.setKeyContextElement1(record);
             operator.processElement(record);
    }

    如果是map算子,processElement应该在StreamMap.java调用

    public void processElement(StreamRecordelement) throws Exception {
             // userFunction.map() 就是用户定义的MapFunction里的map方法
             // 数据经过用户定义的map算子,通过采集器往下游发送
             output.collect(element.replace(userFunction.map(element.getValue())));
    }
  • 相关阅读:
    智慧污水处理在线监测系统解决方案
    TensorFlow实战教程(二十五)-基于BiLSTM-CRF的医学命名实体识别研究(下)模型构建
    图床云存储项目课程随堂笔记
    ROS自学笔记十八:ModuleNotFoundError: No module named ‘serial‘
    服装行业ERP体系的主要好处
    ElementUI浅尝辄止37:Select 选择器
    顺序结构与选择结构
    快速基于 ClickHouse + Grafana 搭建可观测性解决方案 - 分布式链路追踪篇(ClickHouse 官方博客)...
    原型设计模式
    【AI作画】使用stable-diffusion-webui搭建AI作画平台
  • 原文地址:https://blog.csdn.net/zjjcchina/article/details/126052379