env.execute("Order Count");
相信大家对这一行代码都不陌生,其作用是执行 Flink 程序,相当于是一个总开关。
很难想象,那么复杂的 Flink 架构,那么复杂的 Flink 程序仅仅需要这简单的一个函数就能启动,其背后究竟是怎样的过程?
StreamExecutionEnvironment.java
public JobExecutionResult execute() throws Exception {
return execute((String) null);
}
execute 方法的参数为 jobName,若未指定则自动赋为 null
/**
* Triggers the program execution. The environment will execute all parts of the program that
* have resulted in a "sink" operation. Sink operations are for example printing results or
* forwarding them to a message queue.
*
* The program execution will be logged and displayed with the provided name
*
* @param jobName Desired name of the job
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception which occurs during job execution.
*/
public JobExecutionResult execute(String jobName) throws Exception {
final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
StreamGraph streamGraph = getStreamGraph();
if (jobName != null) {
streamGraph.setJobName(jobName);
}
try {
return execute(streamGraph);
} catch (Throwable t) {
Optional<ClusterDatasetCorruptedException> clusterDatasetCorruptedException =
ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);
if (!clusterDatasetCorruptedException.isPresent()) {
throw t;
}
// Retry without cache if it is caused by corrupted cluster dataset.
invalidateCacheTransformations(originalTransformations);
streamGraph = getStreamGraph(originalTransformations);
return execute(streamGraph);
}
}
这一大段注释的大意是触发程序执行,环境将执行导致 sink 操作的程序的所有部分。
该方法首先通过 getStreamGraph 方法获取了 StreamGraph 对象。
public StreamGraph getStreamGraph() {
return getStreamGraph(true);
}
public StreamGraph getStreamGraph(boolean clearTransformations) {
final StreamGraph streamGraph = getStreamGraph(transformations);
if (clearTransformations) {
transformations.clear();
}
return streamGraph;
}
由源码可知该方法的主要作用是获取流的执行图,若参数 clearTransformations 为 true(默认为 true)则清空 transformations。
这里的 transformations 是一个 List 的对象,包含一系列流的转换操作,而 Transformation 本身是一个抽象类,用于完成从输入流到输出流的转换,也就是我们常用的 map、filter 等转换算子其底层都是一棵 Transformation 树。任何一个 Flink 程序只要包含流的输入与输出都会存在一棵 Transformation 树。
Flink 程序会基于 Transformation 列表将其转化为 StreamGraph
这里清空 transformations 也就是清除这棵转换树。在完成了到 StreamGraph 的转换后清除树。
我们再继续往下看调用的 getStreamGraph(List
private StreamGraph getStreamGraph(List<Transformation<?>> transformations) {
synchronizeClusterDatasetStatus();
return getStreamGraphGenerator(transformations).generate();
}
private void synchronizeClusterDatasetStatus() {
if (cachedTransformations.isEmpty()) {
return;
}
Set<AbstractID> completedClusterDatasets =
listCompletedClusterDatasets().stream()
.map(AbstractID::new)
.collect(Collectors.toSet());
cachedTransformations.forEach(
(id, transformation) -> {
transformation.setCached(completedClusterDatasets.contains(id));
});
}
synchronizeClusterDatasetStatus 顾名思义,同步集群数据集状态。
其中,cachedTransformations 是一个 Map
接下来,调用 getStreamGraphGenerator 生成执行图。
public StreamGraph generateStreamGraph(List<Transformation<?>> transformations) {
return getStreamGraphGenerator(transformations).generate();
}
private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
if (transformations.size() <= 0) {
throw new IllegalStateException(
"No operators defined in streaming topology. Cannot execute.");
}
// We copy the transformation so that newly added transformations cannot intervene with the
// stream graph generation.
return new StreamGraphGenerator(
new ArrayList<>(transformations), config, checkpointCfg, configuration)
.setStateBackend(defaultStateBackend)
.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
.setSavepointDir(defaultSavepointDirectory)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout)
.setSlotSharingGroupResource(slotSharingGroupResources);
}
至此,我们终于找到了真正生成执行图的类 StreamGraphGenerator。这个我们稍后再说。
相信读者看到这里可能都忘了我们开始的地方,现在我们回到最初的 execute() 方法。
public JobExecutionResult execute(String jobName) throws Exception {
final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
StreamGraph streamGraph = getStreamGraph();
if (jobName != null) {
streamGraph.setJobName(jobName);
}
try {
return execute(streamGraph);
} catch (Throwable t) {
Optional<ClusterDatasetCorruptedException> clusterDatasetCorruptedException =
ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);
if (!clusterDatasetCorruptedException.isPresent()) {
throw t;
}
// Retry without cache if it is caused by corrupted cluster dataset.
invalidateCacheTransformations(originalTransformations);
streamGraph = getStreamGraph(originalTransformations);
return execute(streamGraph);
}
}
在完成了执行图的生成后,调用 execute(streamGraph),将执行图赋给执行程序,并在出错后重新获取执行图再次执行。
接下来我们继续看 execute(streamGraph)
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
final JobClient jobClient = executeAsync(streamGraph);
try {
final JobExecutionResult jobExecutionResult;
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
jobExecutionResult = jobClient.getJobExecutionResult().get();
} else {
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
} catch (Throwable t) {
// get() on the JobExecutionResult Future will throw an ExecutionException. This
// behaviour was largely not there in Flink versions before the PipelineExecutor
// refactoring so we should strip that exception.
Throwable strippedException = ExceptionUtils.stripExecutionException(t);
jobListeners.forEach(
jobListener -> {
jobListener.onJobExecuted(null, strippedException);
});
ExceptionUtils.rethrowException(strippedException);
// never reached, only make javac happy
return null;
}
}
这个方法做了两件事:
这里特别要提一下,JobClient 接口是任务执行的起点,负责接受用户的程序代码,然后创建数据流,将数据流提交给 JobManager 以便进一步执行。执行完成后,将结果返回给用户。这里就是通过 JobClient 取出执行结果 JobExecutionResult 对象。
不知道你是否注意到,任务完成后会执行 jobListeners 的 forEach 操作。jobListener 是 List 变量。
关于 JobListner 接口,源码注释如下:
/**
* A listener that is notified on specific job status changed, which should be firstly registered by
* {@code #registerJobListener} of execution environments.
*
* It is highly recommended NOT to perform any blocking operation inside the callbacks. If you
* block the thread the invoker of environment execute methods is possibly blocked.
*/
@PublicEvolving
public interface JobListener {
大意是在特定作业状态更改时被通知的侦听器,在 StreamExecutionEnvironment 中通过 registrJobListener 方法注册
public void registerJobListener(JobListener jobListener) {
checkNotNull(jobListener, "JobListener cannot be null");
jobListeners.add(jobListener);
}
而在任务执行完成后,会将其置为 null,表示执行 finished。
接下来我们继续看真正执行 execute 操作的 executeAsync(StreamGraph streamGraph) 方法
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
final PipelineExecutor executor = getPipelineExecutor();
CompletableFuture<JobClient> jobClientFuture =
executor.execute(streamGraph, configuration, userClassloader);
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
collectIterators.clear();
return jobClient;
} catch (ExecutionException executionException) {
final Throwable strippedException =
ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(
jobListener -> jobListener.onJobSubmitted(null, strippedException));
throw new FlinkException(
String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
strippedException);
}
}
兜了一大圈,终于找到真正执行的方法,异步方法。
PipelineExecutor 按源码注释的解释是负责用户作业执行的实体,它由PipelineFactory 根据配置中确定的 Flink 环境按 yarn、standalone、per-job、local 几种不同情况生产对应的 Pipeline。这一点可以在 getPipelineExecutor 方法中得到证实
private PipelineExecutor getPipelineExecutor() throws Exception {
checkNotNull(
configuration.get(DeploymentOptions.TARGET),
"No execution.target specified in your configuration file.");
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
return executorFactory.getExecutor(configuration);
}
获取对应环境的 PipelineExecutor 后调用接口中的 execute 方法执行,并将执行图、配置、类加载器作为参数传入
要想进一步搞清执行逻辑,我们必须继续深入探究 Pipeline.execute 的执行逻辑。
我们再官方文档中找到继承 Pipeline 接口的类有 AbstractJobClusterExecutor, AbstractSessionClusterExecutor, EmbeddedExecutor, KubernetesSessionClusterExecutor, LocalExecutor, RemoteExecutor, YarnJobClusterExecutor, YarnSessionClusterExecutor
下面我们就本地执行为例,探究 LocalExecutor 执行原理
public CompletableFuture<JobClient> execute(
Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)
throws Exception {
checkNotNull(pipeline);
checkNotNull(configuration);
Configuration effectiveConfig = new Configuration();
effectiveConfig.addAll(this.configuration);
effectiveConfig.addAll(configuration);
// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
.submitJob(jobGraph, userCodeClassloader);
}
该方法执行流程为:
这里的 JobGraph 是由 StreamGraph 转化而来,转化过程看 getJobGraph 方法
关于 JobGraph 的获取方法,我们留待后面讨论
这里 PerJobMiniClusterFactory.createWithFactory 创建了一个 PerJobMiniClusterFactory 对象
submitJob 开始了一个 MiniCluster 并提交了一个任务,具体代码如下:
public CompletableFuture<JobClient> submitJob(
JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
MiniClusterConfiguration miniClusterConfig =
getMiniClusterConfig(jobGraph.getMaximumParallelism());
MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
miniCluster.start();
return miniCluster
.submitJob(jobGraph)
.thenApplyAsync(
FunctionUtils.uncheckedFunction(
submissionResult -> {
org.apache.flink.client.ClientUtils
.waitUntilJobInitializationFinished(
() ->
miniCluster
.getJobStatus(
submissionResult
.getJobID())
.get(),
() ->
miniCluster
.requestJobResult(
submissionResult
.getJobID())
.get(),
userCodeClassloader);
return submissionResult;
}))
.thenApply(
result ->
new MiniClusterJobClient(
result.getJobID(),
miniCluster,
userCodeClassloader,
MiniClusterJobClient.JobFinalizationBehavior
.SHUTDOWN_CLUSTER))
.whenComplete(
(ignored, throwable) -> {
if (throwable != null) {
// We failed to create the JobClient and must shutdown to ensure
// cleanup.
shutDownCluster(miniCluster);
}
})
.thenApply(Function.identity());
}
至此,我们总算明白了,execute 到最后是开启了一个 MiniCluster 并将 JobGraph 作为参数提交任务。
而 MiniCluster 在官方文档上的解释为 本地执行 Flink jobs 的 mini 集群。
总结: