第七章就来学习一下Flink SQL的解析提交流程。
问题整理:
1. Flink中的Calcite是什么?
2. Flink SQL的提交流程是怎样的?
Apache Calcite是一个动态数据管理框架 ,它具备很多典型数据库管理系统的功能,如SQL解析、SQL校验、SQL查询优化等,又省略了一些功能,如不存储相关数据,也不完全包含相关处理数据等。
Flink中的sql解析、sql校验和sql优化便是依赖calcite来完成的。
梳理一下Calcite SQL执行的几个阶段:
先整体对Flink SQL 提交流程进行一个描述,再从源码角度进行详细解释。
总共包括两大阶段:
1. sql到operation的转换
SQL解析:调用parser方法,将SQL转为未经校验的AST抽象语法树,也就是SqlNode,它主要会用到词法解析和语法解析。词法解析就是将Sql语句转为一组token,而语法解析就是将token进行递归下降词法分析
SQL校验:就是将未经校验的抽象语法树校验成已经校验的抽象语法树,在校验阶段主要校验两部分:1)校验表名,字段名,函数名是否正确 2)校验特殊的类型是否正确,如join操作,groupby是否有嵌套等
调用rel()方法:将抽象语法树SqlNode转为关系代数树RelNode(关系代数表达式)和RexNode行表达式,在这个过程中,DDL它是不执行rel()方法的,因为DDL实际是对元素区的修改,不涉及复杂查询
调用convert()方法:最终会将RelNode转化为operation,operation它包括多种类型,但最终都会生成根节点modify operation
2. operation到transformations的转换
TableEnvironmentImpl是sql执行的入口类,TableEnvironmentImpl中提供了executeSql,sqlQuery等方法用来执行DDL和DML等sql,sql执行时会对sql进行解析,ParserImpl是flink调用sql解析的实现类,ParserImpl#parse()
方法中通过调用包装器对象CalciteParser#parse()
方法并创建并调用使用javacc生成的sql解析器(FlinkSqlParserImpl)中的parseSqlStmtEof方法完成sql解析,并返回SqlNode对象
核心代码:
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
//TODO 在这里调用使用javacc生成的分析器,将sql语句解析成sqlNode
SqlNode parsed = parser.parse(statement);
//TODO 将sqlNode转换为Operation
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
其中parser.parse(…)方法,将sql语句解析成sqlNode。对应的表名、列名、with属性参数、主键、唯一键、分区键、水印、表注释、表操作(create table、alter table、drop table)都放到SqlNode对象的对应属性中,SqlNode是一个树形结构也就是AST。
sql解析完成后执行sql校验,flink sql中增加了SqlNode转换为Operation的过程,sql校验是在这个过程中完成。在SqlToOperationConverter#convert()
方法中完成这个过程的转换,之间会通过FlinkPlannerInpm#validate()
方法对表、函数、字段等完成校验并基于生成的validated SqlNode生成对应的Opeation。
不同的sql经过convert处理后返回不同的Operation,最后会根据不同的Operation有不同的处理行为。
Blink中并没有直接使用Calcite的优化器,而是通过规则组合和Calcite优化组合分别为batch和stream实现了自定义的优化器。
优化执行前会先将SqlNode转为RelNode,基于RelNode调用PlannerBase#optimize()
并执行StreamCommonSubGraphBasedOptimizer#doOptimize()
方法完成优化
在完成Sql到RelNode的转换后,会执行executeOperation(…)操作,在这里先将sqlNode转换成RelNode。然后进行优化操作。
然后根据传入的sql语句类型,选择不同的操作。包含有Modify、CreateTable、DropTable等。
在这里,有进行转换和优化操作,重点是在translate方法中,最终调用的是PlannerBase里的translate(...)
方法
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
if (modifyOperations.isEmpty) {
return List.empty[Transformation[_]]
}
// prepare the execEnv before translating
getExecEnv.configure(
getTableConfig.getConfiguration,
Thread.currentThread().getContextClassLoader)
overrideEnvParallelism()
// TODO 在这里完成转换 SqlNode转换为RelNode
val relNodes = modifyOperations.map(translateToRel)
// TODO 在这里完成优化
val optimizedRelNodes = optimize(relNodes)
val execNodes = translateToExecNodePlan(optimizedRelNodes)
translateToPlan(execNodes)
}
最终由translateToPlan方法将ExecNode转换成Transfomation列表
基于生成的Transformation对象调用StreamExecutor#createPipeline()方法生成StreamGraph便可以执行任务了。
参考: