在具体执行环节中,考虑并行子任务的分配、数据在任务间的传输,以及合并算子链的优化,将逻辑流图转换为物理数据流图。
StreamGraph——JobGraph——ExecutionGraph

一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)【map、filter、flatMap】的直通 (forwarding)模式,也可以是打乱的重分区(redistributing)模式
算子链:并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task)【类似于spark中的窄依赖】
算子链接成task是非常有效的优化,可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。
DataStream 的子类包括 SingleOutputStreamOperator、 DataStreamSource KeyedStream 、IterativeStream, SplitStream(已弃用)。
每一个 DataStream 的底层都有对应的一个StreamTransformation。在 DataStream 上面通过 map 等算子不断进行转换,根据不同的操作生成不同的 StreamTransformation,并将其加入StreamExecutionEnvironment 的List
>transformations 来保留生成 DataStream 的所有转换。
StreamGraphGenerator 会基于 transformations 列表来生成 StreamGraph。
遍历 List生成 StreamGraph 的时候,会递归调用StreamGraphGenerator#transform方法。对于每一个 StreamTransformation, 确保当前其上游已经完成转换。StreamTransformations 被转换为 StreamGraph 中的节点 StreamNode,并为上下游节点添加边 StreamEdge。
在 StreamGraph 中,每一个算子(Operator) 对应了图中的一个节点(StreamNode)
将多个符合条件的节点串联(Chain) 在一起形成一个节点,从而减少数据在不同节点之间流动所产生的序列化、反序列化、网络传输的开销。多个算子被 chain 在一起的形成的节点在 JobGraph 中对应的就是 JobVertex。
JobGraph 的关键在于将多个 StreamNode 优化为一个 JobVertex, 对应的 StreamEdge 则转化为 JobEdge, 并且 JobVertex 和 JobEdge 之间通过 IntermediateDataSet 形成一个生产者和消费者的连接关系。
在 StramGraph 中,StreamNode 之间是通过 StreamEdge 建立连接的。在 JobEdge 中,对应的是 JobEdge 。
StreamEdge 中同时保留了源节点和目标节点 (sourceId 和 targetId), JobEdge 中只有源节点信息。由于 JobVertex 中保存了所有输入的 JobEdge 的信息,因而同样可以在两个节点之间建立连接。更确切地说,JobEdge 是和节点的输出结果相关联的。
JobVertex 产生的数据被抽象为 IntermediateDataSet(中间数据集)。JobEdge 是和节点的输出结果相关联的,其实就是指可以把 JobEdge 看作是 IntermediateDataSet 的消费者,那么 JobVertex 自然就是生产者了。
从 StreamGraph 到 JobGraph 的转换入口在StreamingJobGraphGenerator 中。
通过 DFS 遍历所有 StreamNode, 按照 chainable 的条件将可以串联的stream operator 放在同一个 operator chain 中。每一个 StreamNode 的配置信息都会被序列化到对应的 StreamConfig 中。
只有 operator chain 的头部节点会生成对应的 JobVertex ,一个 operator chain
的所有内部节点都会以序列化的形式写入头部节点的 CHAINED_TASK_CONFIG 配置项中。
每一个 operator chain 都会为所有的实际输出边创建对应的 JobEdge,并和 JobVertex 连接
StreamGraph, JobGraph 这两个执行图都是在 client 端生成的,ExecutionGraph 是在 JobManager 中生成的。Client 向 JobManager 提交 JobGraph 后, JobManager 就会根据 JobGraph 来创建对应的 ExecutionGraph,并以此来调度任务。
ExecutionGraph 中,节点对应的类是 ExecutionJobVertex
ExexutionJobVertex 的成员变量中包含一个 ExecutionVertex 数组。
ExecutionVertex 是并行任务的一个子任务,算子的并行度是多少,那么就会有多少个 ExecutionVertex。
Execution 是对 ExecutionVertex 的一次执行,通过 ExecutionAttemptId 来唯一标识。
在 JobGraph 中用 IntermediateDataSet(中间数据集) 表示 JobVertex 的对外输出,一个 JobGraph 可能有 n(n >=0) 个输出。在 ExecutionGraph 中,与此对应的就是 IntermediateResult。
ExecutionEdge 表示 ExecutionVertex 的输入,通过 ExecutionEdge 将 ExecutionVertex 和 IntermediateResultPartition 连接起来,进而在不同的 ExecutionVertex 之间建立联系。
创建 ExecutionGraph 的入口在 ExecutionGraphBuilder#buildGraph() 中。
Summary:
streamGraph 是最原始的,更贴近用户逻辑的 DAG 执行图;JobGraph 是对 StreamGraph 的进一步优化,将能够合并的算子合并为一个节点以降低运行时数据传输的开销;ExecutionGraph 则是作业运行是用来调度的执行图,可以看作是并行化版本的 JobGraph,将 DAG 拆分到基本的调度单元。