• Flink1.15源码阅读——用户代码构建jobgraph


    接上面Flink1.15源码阅读——StreamGraph流图

    接着往下面看jobgraph是如何生成的?又经历了哪些转换?
    在这里插入图片描述

    必须声明一下,这是基于per job模式,虽然现在该模式已经被打上了被弃用的标签,但是仍具有参考价值。

    execute(getStreamGraph())

    上面源码又调用了execute(getStreamGraph())

    @Internal
        public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
            // 又封装了一层,点击异步执行execute,这里我们只看到这里
            final JobClient jobClient = executeAsync(streamGraph);
    
            // 下面是jobClient返回的结果,不用细看
            try {
                final JobExecutionResult jobExecutionResult;
    
                if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                    jobExecutionResult = jobClient.getJobExecutionResult().get();
                } else {
                    jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
                }
    
                jobListeners.forEach(
                        jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
    
                return jobExecutionResult;
            } catch (Throwable t) {
                // get() on the JobExecutionResult Future will throw an ExecutionException. This
                // behaviour was largely not there in Flink versions before the PipelineExecutor
                // refactoring so we should strip that exception.
                Throwable strippedException = ExceptionUtils.stripExecutionException(t);
    
                jobListeners.forEach(
                        jobListener -> {
                            jobListener.onJobExecuted(null, strippedException);
                        });
                ExceptionUtils.rethrowException(strippedException);
    
                // never reached, only make javac happy
                return null;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    executeAsync(streamGraph)

    现在我们进入到executeAsync(streamGraph)这个方法里面

    @Internal
        public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
            // 判断
            checkNotNull(streamGraph, "StreamGraph cannot be null.");
            checkNotNull(
                    configuration.get(DeploymentOptions.TARGET),
                    "No execution.target specified in your configuration file.");
    
            // 封装的PipelineExecutorFactory工厂方法
            final PipelineExecutorFactory executorFactory =
                    executorServiceLoader.getExecutorFactory(configuration);
    
            // 判断
            checkNotNull(
                    executorFactory,
                    "Cannot find compatible factory for specified execution.target (=%s)",
                    configuration.get(DeploymentOptions.TARGET));
    
            // 将流图、configuration、userClassloader三个参数传入并异步调用执行方法
            CompletableFuture<JobClient> jobClientFuture =
                    executorFactory
                            .getExecutor(configuration)
                            .execute(streamGraph, configuration, userClassloader);
    
            // 获取jobClient执行结果
            try {
                JobClient jobClient = jobClientFuture.get();
                jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
                return jobClient;
            } catch (ExecutionException executionException) {
                final Throwable strippedException =
                        ExceptionUtils.stripExecutionException(executionException);
                jobListeners.forEach(
                        jobListener -> jobListener.onJobSubmitted(null, strippedException));
    
                throw new FlinkException(
                        String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
                        strippedException);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    executorFactory.getExecutor(configuration).execute(streamGraph, configuration, userClassloader)

       CompletableFuture<JobClient> execute(
                final Pipeline pipeline,
                final Configuration configuration,
                final ClassLoader userCodeClassloader)
                throws Exception;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    找到这个接口的实现

    @Override
        public CompletableFuture<JobClient> execute(
                @Nonnull final Pipeline pipeline,
                @Nonnull final Configuration configuration,
                @Nonnull final ClassLoader userCodeClassloader)
                throws Exception {
            /* 将流图 转化为 作业图   , 这里需要点进去了 看下流图是如何转化生作业图的 */
            final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
    
            // 获取集群描述器: 创建、启动了YarnClient,包含了一些yarn、flink的配置和环境信息
            try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                    clusterClientFactory.createClusterDescriptor(configuration)) {
                final ExecutionConfigAccessor configAccessor =
                        ExecutionConfigAccessor.fromConfiguration(configuration);
    
                // 特有资源配置: JOBManager内存、TaskManager内存、每个tm的slot数量
                final ClusterSpecification clusterSpecification =
                        clusterClientFactory.getClusterSpecification(configuration);
    
                final ClusterClientProvider<ClusterID> clusterClientProvider =
                        clusterDescriptor.deployJobCluster(
                                clusterSpecification, jobGraph, configAccessor.getDetachedMode());
                LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
    
                return CompletableFuture.completedFuture(
                        new ClusterClientJobClientAdapter<>(
                                clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    PipelineExecutorUtils.getJobGraph(pipeline, configuration)

    public static JobGraph getJobGraph(
                @Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration)
                throws MalformedURLException {
            checkNotNull(pipeline);
            checkNotNull(configuration);
    
            final ExecutionConfigAccessor executionConfigAccessor =
                    ExecutionConfigAccessor.fromConfiguration(configuration);
    //   获取作业图
            final JobGraph jobGraph =
                    FlinkPipelineTranslationUtil.getJobGraph(
                            pipeline, configuration, executionConfigAccessor.getParallelism());
    
            configuration
                    .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
                    .ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID)));
    
            jobGraph.addJars(executionConfigAccessor.getJars());
            jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
            jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
    
            return jobGraph;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism())

    public static JobGraph getJobGraph(
                Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
    
            FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
    
            return pipelineTranslator.translateToJobGraph(
                    pipeline, optimizerConfiguration, defaultParallelism);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    pipelineTranslator.translateToJobGraph(pipeline, optimizerConfiguration, defaultParallelism)

    在这里插入图片描述

    @Override
        public JobGraph translateToJobGraph(
                Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
            checkArgument(
                    pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");
    
            StreamGraph streamGraph = (StreamGraph) pipeline;
            return streamGraph.getJobGraph(null);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    streamGraph.getJobGraph(null)

    /** Gets the assembled {@link JobGraph} with a specified {@link JobID}. */
    // 获取具有指定的{@link JobID}的组装的{@link JobGraph}。
        public JobGraph getJobGraph(@Nullable JobID jobID) {
            return StreamingJobGraphGenerator.createJobGraph(this, jobID);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    StreamingJobGraphGenerator.createJobGraph(this, jobID)

    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
            return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
        }
    
    • 1
    • 2
    • 3

    真的是套娃啊,咳!

    new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph()

    这里看到new了一个流式作业图生成器,还记得流图当时也有一个生成器new StreamGraphGenerator(),现在也是类似的对象,终于要开始创建作业图了。

    private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
            this.streamGraph = streamGraph;
            this.defaultStreamGraphHasher = new StreamGraphHasherV2();
            this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
    
            this.jobVertices = new HashMap<>();
            this.builtVertices = new HashSet<>();
            this.chainedConfigs = new HashMap<>();
            this.vertexConfigs = new HashMap<>();
            this.chainedNames = new HashMap<>();
            this.chainedMinResources = new HashMap<>();
            this.chainedPreferredResources = new HashMap<>();
            this.chainedInputOutputFormats = new HashMap<>();
            this.physicalEdgesInOrder = new ArrayList<>();
    
            jobGraph = new JobGraph(jobID, streamGraph.getJobName());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    可能我比较关注的是chain相关的属性。
    初始化的容器是真多啊。

    接下来我们一起看 生成器 方法 createJobGraph():

    private JobGraph createJobGraph() {
            preValidate();
            jobGraph.setJobType(streamGraph.getJobType());
    
            jobGraph.enableApproximateLocalRecovery(
                    streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
    
            // Generate deterministic hashes for the nodes in order to identify them across
            // submission iff they didn't change.
            Map<Integer, byte[]> hashes =
                    defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
    
            // Generate legacy version hashes for backwards compatibility
            List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
            for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
                legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
            }
    
            setChaining(hashes, legacyHashes);
    
            setPhysicalEdges();
    
            markContainsSourcesOrSinks();
    
            setSlotSharingAndCoLocation();
    
            setManagedMemoryFraction(
                    Collections.unmodifiableMap(jobVertices),
                    Collections.unmodifiableMap(vertexConfigs),
                    Collections.unmodifiableMap(chainedConfigs),
                    id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
                    id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
    
            configureCheckpointing();
    
            jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
    
            final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
                    JobGraphUtils.prepareUserArtifactEntries(
                            streamGraph.getUserArtifacts().stream()
                                    .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
                            jobGraph.getJobID());
    
            for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                    distributedCacheEntries.entrySet()) {
                jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
            }
    
            // set the ExecutionConfig last when it has been finalized
            try {
                jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
            } catch (IOException e) {
                throw new IllegalConfigurationException(
                        "Could not serialize the ExecutionConfig."
                                + "This indicates that non-serializable types (like custom serializers) were registered");
            }
    
            addVertexIndexPrefixInVertexName();
    
            setVertexDescription();
    
            return jobGraph;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
  • 相关阅读:
    【PAT甲级】1012 The Best Rank
    Spring cloud学习笔记(服务注册与发现框架Eureka)
    读书笔记:《过度的医疗》
    密钥、证书原理与创建
    Java 调用 Cpp 代码简单示例
    19、Java 中的 final 关键字、嵌套类、内部类
    分享十万级TPS的IM即时通讯综合消息系统的架构
    运维-技能大杂烩
    企业应用现代化实用教程 | 如何快、准、狠地进行应用容器化改造?
    iMazing - 将您的 iPhone、iPad 上的音乐图片视频等数据传输备份到 Mac 或 PC 电脑上
  • 原文地址:https://blog.csdn.net/u010772882/article/details/126061421