执行图是在JobManager生成的,且在创建JobMaster的过程中创建的。
本篇是基于per job模式
client生成JobGraph之后,就通过submitJob提交给JobManager,JobManager会根据JobGraph生成对应的ExecutionGraph。
ExecutionGraph 是Flink作业调度时使用到的核心数据结构,它包含每一个并行的task,每一个intermediate stream以及它们之间的关系。
在JobMaster类的构造方法中有如下代码
this.schedulerNG =
createScheduler(
slotPoolServiceSchedulerFactory,
executionDeploymentTracker,
jobManagerJobMetricGroup,
jobStatusListener);
final SchedulerNG scheduler =
slotPoolServiceSchedulerFactory.createScheduler(
log,
jobGraph,
ioExecutor,
jobMasterConfiguration.getConfiguration(),
slotPoolService,
futureExecutor,
userCodeLoader,
highAvailabilityServices.getCheckpointRecoveryFactory(),
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
jobMasterConfiguration.getSlotRequestTimeout(),
shuffleMaster,
partitionTracker,
executionDeploymentTracker,
initializationTimestamp,
getMainThreadExecutor(),
fatalErrorHandler,
jobStatusListener);
return scheduler;
看到工厂类调用了一个createScheduler 方法,现在只有默认实现类DefaultSlotPoolServiceSchedulerFactory.java。
又调用了一个方法,并且直接返回结果。
return schedulerNGFactory.createInstance(
log,
jobGraph,
ioExecutor,
configuration,
slotPoolService,
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
partitionTracker,
executionDeploymentTracker,
initializationTimestamp,
mainThreadExecutor,
fatalErrorHandler,
jobStatusListener);
这里实现类之前1.11版本的时候只有一个默认的实现类,现在加了AdaptiveSchedulerFactory和AdaptiveBatchSchedulerFactory两种调度器;
然后里面有一个方法
final ExecutionGraphFactory executionGraphFactory =
new DefaultExecutionGraphFactory(
jobMasterConfiguration,
userCodeLoader,
executionDeploymentTracker,
futureExecutor,
ioExecutor,
rpcTimeout,
jobManagerJobMetricGroup,
blobWriter,
shuffleMaster,
partitionTracker);
这个类里有一个buildGraph方法
final ExecutionGraph newExecutionGraph =
DefaultExecutionGraphBuilder.buildGraph(
jobGraph,
configuration,
futureExecutor,
ioExecutor,
userCodeClassLoader,
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
rpcTimeout,
blobWriter,
log,
shuffleMaster,
jobMasterPartitionTracker,
partitionLocationConstraint,
executionDeploymentListener,
combinedExecutionStateUpdateListener,
initializationTimestamp,
vertexAttemptNumberStore,
vertexParallelismStore,
checkpointStatsTrackerFactory,
isDynamicGraph,
executionJobVertexFactory);
看了这么多弯弯绕,才找到这个真正构建执行图的方法,看着这代码,虽然说用了很多设计模式,但是看着还是晕。
核心逻辑
final DefaultExecutionGraph executionGraph;
try {
executionGraph =
new DefaultExecutionGraph(
jobInformation,
futureExecutor,
ioExecutor,
rpcTimeout,
executionHistorySizeLimit,
classLoader,
blobWriter,
partitionGroupReleaseStrategyFactory,
shuffleMaster,
partitionTracker,
partitionLocationConstraint,
executionDeploymentListener,
executionStateUpdateListener,
initializationTimestamp,
vertexAttemptNumberStore,
vertexParallelismStore,
isDynamicGraph,
executionJobVertexFactory);
}
...
...
...
// 核心逻辑:将拓扑排序过的JobGraph添加到executionGraph数据结构中
executionGraph.attachJobGraph(sortedTopology);
核心逻辑
attachJobVertices(verticesToAttach);
initializeJobVertices(verticesToInitialize);
// 实例化执行图节点,根据每一个job vertex, 创建对应的ExecutionVertex
ExecutionJobVertex ejv =
executionJobVertexFactory.createExecutionJobVertex(
this, jobVertex, parallelismInfo);
...
// 将当前执行图节点加入到图中
this.verticesInCreationOrder.add(ejv);
初始化作业顶点
for (JobVertex jobVertex : topologicallySorted) {
final ExecutionJobVertex ejv = tasks.get(jobVertex.getID());
initializeJobVertex(ejv, createTimestamp);
}
ejv.initialize(
executionHistorySizeLimit,
rpcTimeout,
createTimestamp,
this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()),
coordinatorStore);
// 将创建的ExecutionJobVertex与前置的IntermediateResults连接起来
ejv.connectToPredecessors(this.intermediateResults);
...
// 最后注册执行顶点和结果中间分区
registerExecutionVerticesAndResultPartitionsFor(ejv);