• 【Flink源码】从StreamExecutionEnvironment.execute看Flink提交过程


    env.execute("Order Count");
    
    • 1

    相信大家对这一行代码都不陌生,其作用是执行 Flink 程序,相当于是一个总开关。
    很难想象,那么复杂的 Flink 架构,那么复杂的 Flink 程序仅仅需要这简单的一个函数就能启动,其背后究竟是怎样的过程?


    execute 与 Flink 执行原理

    StreamExecutionEnvironment.java

    public JobExecutionResult execute() throws Exception {
        return execute((String) null);
    }
    
    • 1
    • 2
    • 3

    execute 方法的参数为 jobName,若未指定则自动赋为 null

    /**
    * Triggers the program execution. The environment will execute all parts of the program that
    * have resulted in a "sink" operation. Sink operations are for example printing results or
    * forwarding them to a message queue.
    *
    * 

    The program execution will be logged and displayed with the provided name * * @param jobName Desired name of the job * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception which occurs during job execution. */ public JobExecutionResult execute(String jobName) throws Exception { final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations); StreamGraph streamGraph = getStreamGraph(); if (jobName != null) { streamGraph.setJobName(jobName); } try { return execute(streamGraph); } catch (Throwable t) { Optional<ClusterDatasetCorruptedException> clusterDatasetCorruptedException = ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class); if (!clusterDatasetCorruptedException.isPresent()) { throw t; } // Retry without cache if it is caused by corrupted cluster dataset. invalidateCacheTransformations(originalTransformations); streamGraph = getStreamGraph(originalTransformations); return execute(streamGraph); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    这一大段注释的大意是触发程序执行,环境将执行导致 sink 操作的程序的所有部分。
    该方法首先通过 getStreamGraph 方法获取了 StreamGraph 对象。

    public StreamGraph getStreamGraph() {
        return getStreamGraph(true);
    }
    
    public StreamGraph getStreamGraph(boolean clearTransformations) {
        final StreamGraph streamGraph = getStreamGraph(transformations);
        if (clearTransformations) {
            transformations.clear();
        }
        return streamGraph;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    源码可知该方法的主要作用是获取流的执行图,若参数 clearTransformations 为 true(默认为 true)则清空 transformations。
    这里的 transformations 是一个 List 的对象,包含一系列流的转换操作,而 Transformation 本身是一个抽象类,用于完成从输入流到输出流的转换,也就是我们常用的 map、filter 等转换算子其底层都是一棵 Transformation 树。任何一个 Flink 程序只要包含流的输入与输出都会存在一棵 Transformation 树。
    Flink 程序会基于 Transformation 列表将其转化为 StreamGraph
    这里清空 transformations 也就是清除这棵转换树。在完成了到 StreamGraph 的转换后清除树。
    我们再继续往下看调用的 getStreamGraph(List> transformations)

    private StreamGraph getStreamGraph(List<Transformation<?>> transformations) {
        synchronizeClusterDatasetStatus();
        return getStreamGraphGenerator(transformations).generate();
    }
    
    private void synchronizeClusterDatasetStatus() {
        if (cachedTransformations.isEmpty()) {
            return;
        }
        Set<AbstractID> completedClusterDatasets =
                listCompletedClusterDatasets().stream()
                        .map(AbstractID::new)
                        .collect(Collectors.toSet());
        cachedTransformations.forEach(
                (id, transformation) -> {
                    transformation.setCached(completedClusterDatasets.contains(id));
                });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    synchronizeClusterDatasetStatus 顾名思义,同步集群数据集状态。
    其中,cachedTransformations 是一个 Map> 变量,表示集群中各个数据集的缓存转换算子。synchronizeClusterDatasetStatus 方法就是将已完成算子到执行图转化的数据集列表与缓存列表同步。
    接下来,调用 getStreamGraphGenerator 生成执行图。

    public StreamGraph generateStreamGraph(List<Transformation<?>> transformations) {
        return getStreamGraphGenerator(transformations).generate();
    }
    
    private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
        if (transformations.size() <= 0) {
            throw new IllegalStateException(
                    "No operators defined in streaming topology. Cannot execute.");
        }
    
        // We copy the transformation so that newly added transformations cannot intervene with the
        // stream graph generation.
        return new StreamGraphGenerator(
                        new ArrayList<>(transformations), config, checkpointCfg, configuration)
                .setStateBackend(defaultStateBackend)
                .setChangelogStateBackendEnabled(changelogStateBackendEnabled)
                .setSavepointDir(defaultSavepointDirectory)
                .setChaining(isChainingEnabled)
                .setUserArtifacts(cacheFile)
                .setTimeCharacteristic(timeCharacteristic)
                .setDefaultBufferTimeout(bufferTimeout)
                .setSlotSharingGroupResource(slotSharingGroupResources);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    至此,我们终于找到了真正生成执行图的类 StreamGraphGenerator。这个我们稍后再说。
    相信读者看到这里可能都忘了我们开始的地方,现在我们回到最初的 execute() 方法。

    public JobExecutionResult execute(String jobName) throws Exception {
        final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
        StreamGraph streamGraph = getStreamGraph();
        if (jobName != null) {
            streamGraph.setJobName(jobName);
        }
    
        try {
            return execute(streamGraph);
        } catch (Throwable t) {
            Optional<ClusterDatasetCorruptedException> clusterDatasetCorruptedException =
                    ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);
            if (!clusterDatasetCorruptedException.isPresent()) {
                throw t;
            }
    
            // Retry without cache if it is caused by corrupted cluster dataset.
            invalidateCacheTransformations(originalTransformations);
            streamGraph = getStreamGraph(originalTransformations);
            return execute(streamGraph);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    在完成了执行图的生成后,调用 execute(streamGraph),将执行图赋给执行程序,并在出错后重新获取执行图再次执行。
    接下来我们继续看 execute(streamGraph)

    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        final JobClient jobClient = executeAsync(streamGraph);
    
        try {
            final JobExecutionResult jobExecutionResult;
    
            if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                jobExecutionResult = jobClient.getJobExecutionResult().get();
            } else {
                jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
            }
    
            jobListeners.forEach(
                    jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
    
            return jobExecutionResult;
        } catch (Throwable t) {
            // get() on the JobExecutionResult Future will throw an ExecutionException. This
            // behaviour was largely not there in Flink versions before the PipelineExecutor
            // refactoring so we should strip that exception.
            Throwable strippedException = ExceptionUtils.stripExecutionException(t);
    
            jobListeners.forEach(
                    jobListener -> {
                        jobListener.onJobExecuted(null, strippedException);
                    });
            ExceptionUtils.rethrowException(strippedException);
    
            // never reached, only make javac happy
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    这个方法做了两件事:

    • 调用真正执行的方法 executeAsync(streamGraph)返回 JobClient
    • 针对执行结果,通过 jobClient.getJobExecutionResult().get() 获取

    这里特别要提一下,JobClient 接口是任务执行的起点,负责接受用户的程序代码,然后创建数据流,将数据流提交给 JobManager 以便进一步执行。执行完成后,将结果返回给用户。这里就是通过 JobClient 取出执行结果 JobExecutionResult 对象。

    不知道你是否注意到,任务完成后会执行 jobListeners 的 forEach 操作。jobListener 是 List 变量。
    关于 JobListner 接口,源码注释如下:

    /**
     * A listener that is notified on specific job status changed, which should be firstly registered by
     * {@code #registerJobListener} of execution environments.
     *
     * 

    It is highly recommended NOT to perform any blocking operation inside the callbacks. If you * block the thread the invoker of environment execute methods is possibly blocked. */ @PublicEvolving public interface JobListener {

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    大意是在特定作业状态更改时被通知的侦听器,在 StreamExecutionEnvironment 中通过 registrJobListener 方法注册

    public void registerJobListener(JobListener jobListener) {
        checkNotNull(jobListener, "JobListener cannot be null");
        jobListeners.add(jobListener);
    }
    
    • 1
    • 2
    • 3
    • 4

    而在任务执行完成后,会将其置为 null,表示执行 finished。
    接下来我们继续看真正执行 execute 操作的 executeAsync(StreamGraph streamGraph) 方法

    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        checkNotNull(streamGraph, "StreamGraph cannot be null.");
        final PipelineExecutor executor = getPipelineExecutor();
    
        CompletableFuture<JobClient> jobClientFuture =
                executor.execute(streamGraph, configuration, userClassloader);
    
        try {
            JobClient jobClient = jobClientFuture.get();
            jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
            collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
            collectIterators.clear();
            return jobClient;
        } catch (ExecutionException executionException) {
            final Throwable strippedException =
                    ExceptionUtils.stripExecutionException(executionException);
            jobListeners.forEach(
                    jobListener -> jobListener.onJobSubmitted(null, strippedException));
    
            throw new FlinkException(
                    String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
                    strippedException);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    兜了一大圈,终于找到真正执行的方法,异步方法。
    PipelineExecutor 按源码注释的解释是负责用户作业执行的实体,它由PipelineFactory 根据配置中确定的 Flink 环境按 yarn、standalone、per-job、local 几种不同情况生产对应的 Pipeline。这一点可以在 getPipelineExecutor 方法中得到证实

    private PipelineExecutor getPipelineExecutor() throws Exception {
        checkNotNull(
                configuration.get(DeploymentOptions.TARGET),
                "No execution.target specified in your configuration file.");
    
        final PipelineExecutorFactory executorFactory =
                executorServiceLoader.getExecutorFactory(configuration);
    
        checkNotNull(
                executorFactory,
                "Cannot find compatible factory for specified execution.target (=%s)",
                configuration.get(DeploymentOptions.TARGET));
    
        return executorFactory.getExecutor(configuration);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    获取对应环境的 PipelineExecutor 后调用接口中的 execute 方法执行,并将执行图、配置、类加载器作为参数传入
    要想进一步搞清执行逻辑,我们必须继续深入探究 Pipeline.execute 的执行逻辑。
    我们再官方文档中找到继承 Pipeline 接口的类有 AbstractJobClusterExecutor, AbstractSessionClusterExecutor, EmbeddedExecutor, KubernetesSessionClusterExecutor, LocalExecutor, RemoteExecutor, YarnJobClusterExecutor, YarnSessionClusterExecutor
    下面我们就本地执行为例,探究 LocalExecutor 执行原理

    public CompletableFuture<JobClient> execute(
                Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)
                throws Exception {
        checkNotNull(pipeline);
        checkNotNull(configuration);
    
        Configuration effectiveConfig = new Configuration();
        effectiveConfig.addAll(this.configuration);
        effectiveConfig.addAll(configuration);
    
        // we only support attached execution with the local executor.
        checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
    
        final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
    
        return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
                .submitJob(jobGraph, userCodeClassloader);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    该方法执行流程为:

    • 将实例化 LocalExecutor 时添加的额外配置和用户配置合并为一个 Configuration
    • 创建 JobGraph,作业执行图
    • 调用 createWithFactory 和 submitJob 提交任务

    这里的 JobGraph 是由 StreamGraph 转化而来,转化过程看 getJobGraph 方法
    关于 JobGraph 的获取方法,我们留待后面讨论
    这里 PerJobMiniClusterFactory.createWithFactory 创建了一个 PerJobMiniClusterFactory 对象
    submitJob 开始了一个 MiniCluster 并提交了一个任务,具体代码如下:

    public CompletableFuture<JobClient> submitJob(
                JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
        MiniClusterConfiguration miniClusterConfig =
                getMiniClusterConfig(jobGraph.getMaximumParallelism());
        MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
        miniCluster.start();
    
        return miniCluster
                .submitJob(jobGraph)
                .thenApplyAsync(
                        FunctionUtils.uncheckedFunction(
                                submissionResult -> {
                                    org.apache.flink.client.ClientUtils
                                            .waitUntilJobInitializationFinished(
                                                    () ->
                                                            miniCluster
                                                                    .getJobStatus(
                                                                            submissionResult
                                                                                    .getJobID())
                                                                    .get(),
                                                    () ->
                                                            miniCluster
                                                                    .requestJobResult(
                                                                            submissionResult
                                                                                    .getJobID())
                                                                    .get(),
                                                    userCodeClassloader);
                                    return submissionResult;
                                }))
                .thenApply(
                        result ->
                                new MiniClusterJobClient(
                                        result.getJobID(),
                                        miniCluster,
                                        userCodeClassloader,
                                        MiniClusterJobClient.JobFinalizationBehavior
                                                .SHUTDOWN_CLUSTER))
                .whenComplete(
                        (ignored, throwable) -> {
                            if (throwable != null) {
                                // We failed to create the JobClient and must shutdown to ensure
                                // cleanup.
                                shutDownCluster(miniCluster);
                            }
                        })
                .thenApply(Function.identity());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    至此,我们总算明白了,execute 到最后是开启了一个 MiniCluster 并将 JobGraph 作为参数提交任务。
    而 MiniCluster 在官方文档上的解释为 本地执行 Flink jobs 的 mini 集群。

    总结:

    • execute 的执行过程:
    • 转换 Transformation 为 StreamGraph
    • 提供执行需要的额外配置、监听方法等
    • 将 StreamGraph 转换为可执行的 JobGraph
    • 根据运行环境的不同创建不同的执行器
    • 在本地环境下,开启一个 MiniCluster 并将 JobGraph 提交任务执行
  • 相关阅读:
    RK356x U-Boot研究所(命令篇)3.3 env相关命令的用法
    Jupyter的安装
    点云数据结构化与体素化理论学习
    nginx测试配置文件的问题
    LeeCode热题100(两数之和)
    win10查看wifi密码
    软件测试面试题及答案 这个在线题库的多种刷题模式能帮你快速通关
    使用vue3 搭建一个H5手机端访问的项目
    利用高速光耦合器增强工业自动化发展
    机器学习笔记之最优化理论与方法(九)无约束优化问题——常用求解方法(下)
  • 原文地址:https://blog.csdn.net/wwb44444/article/details/127722714