//-------------------- streapGraph to jobGraph ----------------------////package org.apache.flink.client.deployment.application.executors;publicclassEmbeddedExecutorimplementsPipelineExecutor{finalJobGraph jobGraph =PipelineExecutorUtils.getJobGraph(pipeline, configuration);S
```java
packageorg.apache.flink.table.planner.delegation;privateTableResultInternalexecuteInternal(List<Transformation<?>> transformations,List<String> sinkIdentifierNames){finalString defaultJobName ="insert-into_"+String.join(",", sinkIdentifierNames);// We pass only the configuration to avoid reconfiguration with the rootConfigurationPipeline pipeline =
execEnv.createPipeline(
transformations, tableConfig.getConfiguration(), defaultJobName);try{JobClient jobClient = execEnv.executeAsync(pipeline);finalList<Column> columns =newArrayList<>();Long[] affectedRowCounts =newLong[transformations.size()];for(int i =0; i < transformations.size();++i){// use sink identifier name as field name
columns.add(Column.physical(sinkIdentifierNames.get(i),DataTypes.BIGINT()));
affectedRowCounts[i]=-1L;}returnTableResultImpl.builder().jobClient(jobClient).resultKind(ResultKind.SUCCESS_WITH_CONTENT).schema(ResolvedSchema.of(columns)).resultProvider(newInsertResultProvider(affectedRowCounts).setJobClient(jobClient)).build();}catch(Exception e){thrownewTableException("Failed to execute sql", e);}}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
TableEnvironmentImpl
packageorg.apache.flink.table.api.internal;@InternalpublicclassTableEnvironmentImplimplementsTableEnvironmentInternal{privateTableResultInternalexecuteQueryOperation(QueryOperation operation){CollectModifyOperation sinkOperation =newCollectModifyOperation(operation);List<Transformation<?>> transformations =translate(Collections.singletonList(sinkOperation));finalString defaultJobName ="collect";// We pass only the configuration to avoid reconfiguration with the rootConfigurationPipeline pipeline =
execEnv.createPipeline(
transformations, tableConfig.getConfiguration(), defaultJobName);try{JobClient jobClient = execEnv.executeAsync(pipeline);ResultProvider resultProvider = sinkOperation.getSelectResultProvider();
resultProvider.setJobClient(jobClient);returnTableResultImpl.builder().jobClient(jobClient).resultKind(ResultKind.SUCCESS_WITH_CONTENT).schema(operation.getResolvedSchema()).resultProvider(resultProvider).setPrintStyle(PrintStyle.tableauWithTypeInferredColumnWidths(// sinkOperation.getConsumedDataType() handles legacy typesDataTypeUtils.expandCompositeTypeToSchema(
sinkOperation.getConsumedDataType()),
resultProvider.getRowDataStringConverter(),PrintStyle.DEFAULT_MAX_COLUMN_WIDTH,false,
isStreamingMode)).build();}catch(Exception e){thrownewTableException("Failed to execute sql", e);}}