子图复用优化是为了找到SQL执行计划中重复的节点,将其复用,避免这部分重复计算的逻辑。先回顾SQL执行的主要流程 parser -> validate -> logical optimize -> physical optimize -> translateToExecNode。
而子图复用的逻辑就是在这个阶段进行的
private[flink] def translateToExecNodeGraph( optimizedRelNodes: Seq[RelNode], isCompiled: Boolean): ExecNodeGraph = { val nonPhysicalRel = optimizedRelNodes.filterNot(_.isInstanceOf[FlinkPhysicalRel]) if (nonPhysicalRel.nonEmpty) { throw new TableException( "The expected optimized plan is FlinkPhysicalRel plan, " + s"actual plan is ${nonPhysicalRel.head.getClass.getSimpleName} plan.") } require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel])) // Rewrite same rel object to different rel objects // in order to get the correct dag (dag reuse is based on object not digest) val shuttle = new SameRelObjectShuttle() val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle)) // reuse subplan val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, tableConfig) // convert FlinkPhysicalRel DAG to ExecNodeGraph val generator = new ExecNodeGraphGenerator() val execGraph = generator.generate(reusedPlan.map(_.asInstanceOf[FlinkPhysicalRel]), isCompiled) // process the graph val context = new ProcessorContext(this) val processors = getExecNodeGraphProcessors processors.foldLeft(execGraph)((graph, processor) => processor.process(graph, context)) }
可以看到这里首先会校验relNodes都是FlinkPhysicalRel
物理执行计划的节点
require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel]))
SameRelObjectShuttle
/** * Rewrite same rel object to different rel objects. * * e.g.
* {{{ * Join Join * / \ / \ * Filter1 Filter2 => Filter1 Filter2 * \ / | | * Scan Scan1 Scan2 * }}} * After rewrote, Scan1 and Scan2 are different object but have same digest. */ class SameRelObjectShuttle extends DefaultRelShuttle { private val visitedNodes = Sets.newIdentityHashSet[RelNode]() override def visit(node: RelNode): RelNode = { val visited = !visitedNodes.add(node) var change = false val newInputs = node.getInputs.map { input => val newInput = input.accept(this) change = change || (input ne newInput) newInput } if (change || visited) { node.copy(node.getTraitSet, newInputs) } else { node } } }
然后进行rel节点重写,RelShuttle的作用就是提供visit的模式根据实现的逻辑来替换树中的某些节点。可以看到这个实现中会将 同一个objec(注意这里保存visitedNodes使用的是identity hash set) 第二次访问时 copy成一个新的对象,但是有相同的digest,这一步的目的是什么呢?
我们往下面看在后续生成ExecNode时, 会创建一个IdentityHashMap 来保存访问过的Rels,所以意思就是真正生成ExecNode时,是和Rels对象一一对应的。
private final Map<FlinkPhysicalRel, ExecNode>> visitedRels = new IdentityHashMap(); private ExecNode> generate(FlinkPhysicalRel rel, boolean isCompiled) { ExecNode> execNode = visitedRels.get(rel); if (execNode != null) { return execNode; } if (rel instanceof CommonIntermediateTableScan) { throw new TableException("Intermediate RelNode can't be converted to ExecNode."); } List<ExecNode>> inputNodes = new ArrayList<>(); for (RelNode input : rel.getInputs()) { inputNodes.add(generate((FlinkPhysicalRel) input, isCompiled)); } execNode = rel.translateToExecNode(isCompiled); // connects the input nodes List<ExecEdge> inputEdges = new ArrayList<>(inputNodes.size()); for (ExecNode> inputNode : inputNodes) { inputEdges.add(ExecEdge.builder().source(inputNode).target(execNode).build()); } execNode.setInputEdges(inputEdges); visitedRels.put(rel, execNode); return execNode; }
看到这里上面将同一个object 拆成两个的目的就更不可理解了,因为本来是一个object的话在这里天然就复用了,但是拆成2个反而就不能复用了。
这里的目的是先将相同的object被重复引用的节点拆开,然后再根据digest相同以及内部规则来决定是否复用。这样就可以有Flink引擎来控制哪些节点是可以合并的。
SubplanReuseContext
在context中通过ReusableSubplanVisitor
构造两组映射关系
// mapping a relNode to its digest private val mapRelToDigest = Maps.newIdentityHashMap[RelNode, String]() // mapping the digest to RelNodes private val mapDigestToReusableNodes = new util.HashMap[String, util.List[RelNode]]()
中间的逻辑比较简单就是遍历整棵树,查找是否存在可reusable的节点,怎么判断可reusable呢?
- 同一digest下,挂了多个RelNode节点,那么这一组RelNode是同一语义的,是可以复用的候选
- 节点没有disable reusable
/** Returns true if the given node is reusable disabled */ private def isNodeReusableDisabled(node: RelNode): Boolean = { node match { // TableSourceScan node can not be reused if reuse TableSource disabled case _: FlinkLogicalLegacyTableSourceScan | _: CommonPhysicalLegacyTableSourceScan | _: FlinkLogicalTableSourceScan | _: CommonPhysicalTableSourceScan => !tableSourceReuseEnabled // Exchange node can not be reused if its input is reusable disabled case e: Exchange => isNodeReusableDisabled(e.getInput) // TableFunctionScan and sink can not be reused case _: TableFunctionScan | _: LegacySink | _: Sink => true case _ => false } }
例如TableFunctionScan就不能被Reuse(这个原因还没理解),或者exchange只有input被reuse时,该节点才能复用
SubplanReuseShuttle
在以上的visit执行完之后以及知道哪些节点是可以复用的了,最后通过一个Shuttle来将可复用的节点进行替换
class SubplanReuseShuttle(context: SubplanReuseContext) extends DefaultRelShuttle { private val mapDigestToNewNode = new util.HashMap[String, RelNode]() override def visit(rel: RelNode): RelNode = { val canReuseOtherNode = context.reuseOtherNode(rel) val digest = context.getRelDigest(rel) if (canReuseOtherNode) { val newNode = mapDigestToNewNode.get(digest) if (newNode == null) { throw new TableException("This should not happen") } newNode } else { val newNode = visitInputs(rel) mapDigestToNewNode.put(digest, newNode) newNode } } }
实现的方式就是记录每个digest对应的newNode,当可以复用时,那么直接返回该复用digest对应的RelNode(替换了原先的digest相同,对象不同的RelNode),这样整棵树中可复用的节点又重新合并了。