• (二开)Flink 修改源码拓展 SQL 语法


    1、Flink 扩展 calcite 中的语法解析
    1)定义需要的 SqlNode 节点类-以 SqlShowCatalogs 为例
    a)类位置

    flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java

    在这里插入图片描述

    核心方法

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
            writer.keyword("SHOW CATALOGS");
        }
    
    • 1
    • 2
    • 3
    • 4
    b)类血缘

    在这里插入图片描述

    2)修改 includes 目录下的 .ftl 文件,在 parserImpls.ftl 文件中添加语法逻辑
    a)文件位置

    在这里插入图片描述

    b)语法示例
    /**
    * Parse a "Show Catalogs" metadata query command.
    */
    SqlShowCatalogs SqlShowCatalogs() :
    {
    }
    {
         
        {
            return new SqlShowCatalogs(getPos());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    3)将 Calcite 源码中的 config.fmpp 文件复制到项目的 src/main/codegen 目录下,修改内容,来声明扩展的部分
    a)文件位置

    在这里插入图片描述

    b)config.fmpp 内容
    data: {
    	# 解析器文件路径
      parser: tdd(../data/Parser.tdd)
    }
    
    # 扩展文件的目录
    freemarkerLinks: {
      includes: includes/
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    c)Parser.tdd 部分内容
    # 生成的解析器包路径
    package: "org.apache.flink.sql.parser.impl",
    # 解析器名称
    class: "FlinkSqlParserImpl",
    # 引入的依赖类
    "org.apache.flink.sql.parser.dql.SqlShowCatalogs"
    # 新的关键字
    keywords: [
        "CATALOGS"
      ]
    # 新增的语法解析方法
    statementParserMethods: [
        "SqlShowCatalogs()"
      ]
    # 包含的扩展语法文件
    implementationFiles: [
        "parserImpls.ftl"
      ]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    4)编译模板文件和语法文件

    在这里插入图片描述

    5)配置扩展的解析器类
    withParserFactory(FlinkSqlParserImpl.FACTORY)
    
    • 1
    2、自定义扩展 Flink 的 Parser 语法
    1)定义 SqlNode 类
    package org.apache.flink.sql.parser.dql;
    
    import org.apache.calcite.sql.SqlCall;
    import org.apache.calcite.sql.SqlKind;
    import org.apache.calcite.sql.SqlNode;
    import org.apache.calcite.sql.SqlOperator;
    import org.apache.calcite.sql.SqlSpecialOperator;
    import org.apache.calcite.sql.SqlWriter;
    import org.apache.calcite.sql.parser.SqlParserPos;
    
    import java.util.Collections;
    import java.util.List;
    
    /** XSHOW CATALOGS sql call. */
    public class SqlXShowCatalogs extends SqlCall {
        public static final SqlSpecialOperator OPERATOR =
                new SqlSpecialOperator("XSHOW CATALOGS", SqlKind.OTHER);
    
        public SqlXShowCatalogs(SqlParserPos pos) {
            super(pos);
        }
    
        @Override
        public SqlOperator getOperator() {
            return OPERATOR;
        }
    
        @Override
        public List getOperandList() {
            return Collections.emptyList();
        }
    
        @Override
        public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
            writer.keyword("XSHOW CATALOGS");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    2)修改 includes 目录下的 parserImpls.ftl 文件
    /**
    * Parse a "XShow Catalogs" metadata query command.
    */
    SqlXShowCatalogs SqlXShowCatalogs() :
    {
    }
    {
         
        {
           return new SqlXShowCatalogs(getPos());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    3)修改 Parser.tdd 文件,新增-声明拓展的部分
    imports:
    
    "org.apache.flink.sql.parser.dql.SqlXShowCatalogs"
    
    keywords:
    
    "XSHOW"
    
    statementParserMethods:
    
    "SqlXShowCatalogs()"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    4)重新编译
     mvn generate-resources
    
    • 1
    5)执行测试用例

    可以看到,自定义 SQL 的报错,由解析失败,变为了校验失败。

    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    
    public class CustomFlinkSql {
        public static void main(String[] args) throws Exception {
    
            TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .build());
    				
    				// 拓展自定义语法 xshow catalogs 前
            // SQL parse failed. Non-query expression encountered in illegal context
            tEnv.executeSql("xshow catalogs").print();
    
            // 拓展自定义语法 xshow catalogs 后
            // SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    6)查看生成的扩展解析器类

    可以看到,在 FlinkSqlParserImpl 中,自定义的解析语法已经生成了。

    在这里插入图片描述

    3、validate 概述

    在向 Flink 中添加完自定义的解析规则后,报错信息如下:

    SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
    
    • 1
    修改 validate 部分的代码
    1)FlinkPlannerImpl#validate

    作用:校验 SqlNode ,如果是 show catalogs 语法时直接返回。

    在这里插入图片描述

    sqlNode.isInstanceOf[SqlXShowCatalogs]
    
    • 1
    2)SqlToOperationConverter#convert

    作用:将校验过的 SqlNode 转换为 Operator。

    在这里插入图片描述

    else if (validated instanceof SqlXShowCatalogs) {
                return Optional.of(converter.convertXShowCatalogs((SqlXShowCatalogs) validated));
    }
    
    • 1
    • 2
    • 3
    3)SqlToOperationConverter#convertXShowCatalogs
    /** Convert SHOW CATALOGS statement. */
    private Operation convertXShowCatalogs(SqlXShowCatalogs sqlXShowCatalogs) {
         return new XShowCatalogsOperation();
    }
    
    • 1
    • 2
    • 3
    • 4
    4)XShowCatalogsOperation
    package org.apache.flink.table.operations;
    
    public class XShowCatalogsOperation implements ShowOperation {
        @Override
        public String asSummaryString() {
            return "SHOW CATALOGS";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    4、执行测试用例
    package org.apache.flink.table.examples.java.custom;
    
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    
    public class CustomFlinkSql {
        public static void main(String[] args) throws Exception {
    
            TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .build());
    
    				// FlinkSQL原本支持的语法
            tEnv.executeSql("show catalogs").print();
            
            // 自定义语法
            tEnv.executeSql("xshow catalogs").print();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在这里插入图片描述

    5、总结-FlinkSQL 的执行流程
    1、对 SQL 进行校验
    
    final SqlNode validated = flinkPlanner.validate(sqlNode);
    
    2、预校验重写 Insert 语句
    
    3、调用 SqlNode.validate() 进行校验
    
    	1)如果是:ExtendedSqlNode【SqlCreateHiveTable、SqlCreateTable、SqlTableLike】
    	2)如果是:SqlKind.DDL、SqlKind.INSERT 等,无需校验,直接返回 SqlNode
    	3)如果是:SqlRichExplain
    	4)其它:validator.validate(sqlNode)
    		
    			1.校验作用域和表达式:validateScopedExpression(topNode, scope)
    					a)将 SqlNode 进行规范化重写
              b)如果SQL是【TOP_LEVEL = concat(QUERY, DML, DDL)】,则在父作用域中注册查询
              c)校验 validateQuery 
              	i)validateFeature
              	ii)validateNamespace
              	iii)validateModality
              	iv)validateAccess
              	v)validateSnapshot
              d)如果SQL不是【TOP_LEVEL = concat(QUERY, DML, DDL)】进行类型推导
           
           2.获取校验之后的节点类型
    
    2、将 SQLNode 转换为 Operation
    
    converter.convertSqlQuery(validated)
    
    	1)生成逻辑执行计划 RelNode
    	RelRoot relational = planner.rel(validated);
    		
    		1.对查询进行转换
    		sqlToRelConverter.convertQuery(validatedSqlNode)
    		
    	2)创建 PlannerQueryOperation
    	new PlannerQueryOperation(relational.project());
    	
    3、将 Operation 转换为 List>
    List> transformations = planner.translate(Collections.singletonList(modifyOperation));
    
    	1)对 RelNode 逻辑执行计划进行优化,获取 optimizedRelNodes
    	val optimizedRelNodes = optimize(relNodes)
    	
    	2)将 optimizedRelNodes 转换为 execGraph
    	val execGraph = translateToExecNodeGraph(optimizedRelNodes)
    	
    	3)将 execGraph 转换为 transformations
    	
    		1.使用代码生成技术生成Function,后续可以反射调用
    		val convertFunc = CodeGenUtils.genToInternalConverter(ctx, inputType)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
  • 相关阅读:
    QT 第六天 人脸识别系统
    一款快速从数据库中提取信息工具
    AUTOSAR AP硬核知识点梳理(1)
    【LeetCode-简单题】110. 平衡二叉树
    Frida使用与解题
    使用LangChain构建问答聊天机器人案例实战(三)
    云桌面 node_modules 切换艰辛历程记录 rebuild失败记录
    【微服务部署】四、Jenkins一键打包部署NodeJS(Vue)前端项目步骤详解
    优维低代码:Placeholders 占位符
    基于SpringBoot实现自动装配返回属性
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/134032900