• flink sql text to jobGraph


    operator

    parser = TableEnvriomnetInternal.getParser();
    operator = parser.parser(sqlStatementString);
    
    • 1
    • 2

    transform

    package org.apache.flink.table.api.internal;
    public class TableEnvironmentImpl implements TableEnvironmentInternal {
    
     protected List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
            return planner.translate(modifyOperations);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    streamGraph

    //StreamGraph implements Pipeline 
     Pipeline pipeline =
                    execEnv.createPipeline(
                            transformations, tableConfig.getConfiguration(), defaultJobName);
    
    • 1
    • 2
    • 3
    • 4

    jobStreamGraph

    //org.apache.flink.table.api.internal.TableEnvironmentImpl
    //
    StreamExecutionEnvironment executionEnvironment;
    executionEnvironment.executeAsync((StreamGraph) pipeline);
    //-------------------- 
    //org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    //
    final PipelineExecutorFactory executorFactory =
                executorServiceLoader.getExecutorFactory(configuration);
    
    CompletableFuture<JobClient> jobClientFuture =
            executorFactory
                    .getExecutor(configuration)
                    .execute(streamGraph, configuration, userClassloader);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    //-------------------- streapGraph to jobGraph ----------------------
    //
    //package org.apache.flink.client.deployment.application.executors;
    	public class EmbeddedExecutor implements PipelineExecutor {
    		final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
    S
    ```java
    package org.apache.flink.table.planner.delegation;
    	private TableResultInternal executeInternal(
                List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
            final String defaultJobName = "insert-into_" + String.join(",", sinkIdentifierNames);
            // We pass only the configuration to avoid reconfiguration with the rootConfiguration
            Pipeline pipeline =
                    execEnv.createPipeline(
                            transformations, tableConfig.getConfiguration(), defaultJobName);
            try {
                JobClient jobClient = execEnv.executeAsync(pipeline);
                final List<Column> columns = new ArrayList<>();
                Long[] affectedRowCounts = new Long[transformations.size()];
                for (int i = 0; i < transformations.size(); ++i) {
                    // use sink identifier name as field name
                    columns.add(Column.physical(sinkIdentifierNames.get(i), DataTypes.BIGINT()));
                    affectedRowCounts[i] = -1L;
                }
    
                return TableResultImpl.builder()
                        .jobClient(jobClient)
                        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                        .schema(ResolvedSchema.of(columns))
                        .resultProvider(
                                new InsertResultProvider(affectedRowCounts).setJobClient(jobClient))
                        .build();
            } catch (Exception e) {
                throw new TableException("Failed to execute sql", e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    TableEnvironmentImpl

    package org.apache.flink.table.api.internal;
    
    @Internal
    public class TableEnvironmentImpl implements TableEnvironmentInternal {
     
     private TableResultInternal executeQueryOperation(QueryOperation operation) {
            CollectModifyOperation sinkOperation = new CollectModifyOperation(operation);
            List<Transformation<?>> transformations =
                    translate(Collections.singletonList(sinkOperation));
            final String defaultJobName = "collect";
            // We pass only the configuration to avoid reconfiguration with the rootConfiguration
            Pipeline pipeline =
                    execEnv.createPipeline(
                            transformations, tableConfig.getConfiguration(), defaultJobName);
            try {
                JobClient jobClient = execEnv.executeAsync(pipeline);
                ResultProvider resultProvider = sinkOperation.getSelectResultProvider();
                resultProvider.setJobClient(jobClient);
                return TableResultImpl.builder()
                        .jobClient(jobClient)
                        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                        .schema(operation.getResolvedSchema())
                        .resultProvider(resultProvider)
                        .setPrintStyle(
                                PrintStyle.tableauWithTypeInferredColumnWidths(
                                        // sinkOperation.getConsumedDataType() handles legacy types
                                        DataTypeUtils.expandCompositeTypeToSchema(
                                                sinkOperation.getConsumedDataType()),
                                        resultProvider.getRowDataStringConverter(),
                                        PrintStyle.DEFAULT_MAX_COLUMN_WIDTH,
                                        false,
                                        isStreamingMode))
                        .build();
            } catch (Exception e) {
                throw new TableException("Failed to execute sql", e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    DefaultExecutor

    
    package org.apache.flink.table.planner.delegation;
    
    
    /** Default implementation of {@link Executor}. */
    @Internal
    public class DefaultExecutor implements Executor {
    
        private static final String DEFAULT_JOB_NAME = "Flink Exec Table Job";
    
        private final StreamExecutionEnvironment executionEnvironment;
    
        public DefaultExecutor(StreamExecutionEnvironment executionEnvironment) {
            this.executionEnvironment = executionEnvironment;
        }
    
        public StreamExecutionEnvironment getExecutionEnvironment() {
            return executionEnvironment;
        }
    
        @Override
        public ReadableConfig getConfiguration() {
            return executionEnvironment.getConfiguration();
        }
    
        @Override
        public Pipeline createPipeline(
                List<Transformation<?>> transformations,
                ReadableConfig tableConfiguration,
                @Nullable String defaultJobName) {
    
            // reconfigure before a stream graph is generated
            executionEnvironment.configure(tableConfiguration);
    
            // create stream graph
            final RuntimeExecutionMode mode = getConfiguration().get(ExecutionOptions.RUNTIME_MODE);
            switch (mode) {
                case BATCH:
                    configureBatchSpecificProperties();
                    break;
                case STREAMING:
                    break;
                case AUTOMATIC:
                default:
                    throw new TableException(String.format("Unsupported runtime mode: %s", mode));
            }
    
            final StreamGraph streamGraph = executionEnvironment.generateStreamGraph(transformations);
            setJobName(streamGraph, defaultJobName);
            return streamGraph;
        }
    
        @Override
        public JobExecutionResult execute(Pipeline pipeline) throws Exception {
            return executionEnvironment.execute((StreamGraph) pipeline);
        }
    
        @Override
        public JobClient executeAsync(Pipeline pipeline) throws Exception {
            return executionEnvironment.executeAsync((StreamGraph) pipeline);
        }
    
        @Override
        public boolean isCheckpointingEnabled() {
            return executionEnvironment.getCheckpointConfig().isCheckpointingEnabled();
        }
    
        private void configureBatchSpecificProperties() {
            executionEnvironment.getConfig().enableObjectReuse();
        }
    
        private void setJobName(StreamGraph streamGraph, @Nullable String defaultJobName) {
            final String adjustedDefaultJobName =
                    StringUtils.isNullOrWhitespaceOnly(defaultJobName)
                            ? DEFAULT_JOB_NAME
                            : defaultJobName;
            final String jobName =
                    getConfiguration().getOptional(PipelineOptions.NAME).orElse(adjustedDefaultJobName);
            streamGraph.setJobName(jobName);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
  • 相关阅读:
    处理json异常问题,由于发送kafka消息是一个字符串,等到消费时json字符串会有多个““引号,故需要先处理json再转对象
    CSS 之 grid 网格布局
    【PAT甲级 - C++题解】1104 Sum of Number Segments
    C语言大佬的必杀技---宏的高级用法
    学习突围2 - 关于高效学习的方法
    C++ 无法从“const char [ ]”转换为“char *”
    图论 - 二分图(染色法、匈牙利算法)
    软件测试面试题之自动化测试题合集(金九银十必备)
    COCO数据集中图像的caption读取到txt文件
    LeetCode高频题:《逆水寒》在地图的制作中,美术在地图上刷一片连通区域,连通区域自动填充,请你判断给定几个点位置,他们是否属于被刷区域
  • 原文地址:https://blog.csdn.net/weixin_46661903/article/details/128185011