• 【calcite】calcite实现SQL列级数据血缘 data lineage 查询


    一、背景

    大数据数据血缘,内部实现十分复杂一般需要依赖框架。calcite作为apache顶级项目,且为java体系成员,被多个项目所使用,如flinksparkkafka等。calcite 对mysql,oracle,postgres和其他大数据平台支持较好,对sqlserver支持较差,没有看到sqlserver相关的代码。
    另,python系推荐使用sqlglotdatahub采用。

    calcite官方文档

    二、 实现方式

    gradle添加依赖:

    dependencies {
        testImplementation('org.apache.calcite:calcite-core:1.32.0')
    }
    

    以下均有scala语言实现,并使用Mysql5.7测试完成:

    drop table if exists test.st01;
    CREATE TABLE test.st01(
    s_id BIGINT comment '主键',
    s_name VARCHAR(20)  comment '姓名',
    s_age INT comment '年龄',
    s_sex VARCHAR(10) comment '性别',
    s_part  VARCHAR(10) comment '分区字段',
    ts TIMESTAMP comment '创建时间'
    );
    insert into test.hive_st01 values(1,'zhangsan',10,'male','student','2020-01-01 18:01:01.666');
    insert into test.hive_st01 values(2,'lisi',66,'female','teacher','2020-01-01 10:01:01.666');
    insert into test.hive_st01 values(3,'sunlirong',50,'male','student','2020-01-01 10:01:01.666');
    insert into test.hive_st01 values(4,'laoliu',38,'female','teacher','2020-01-01 10:01:01.666');
    
    create table test.st02 like test.st01;
    insert into test.hive_st02 values(2,'wangwu',66,'male','teacher','2020-01-01 10:01:01.666');
    insert into test.hive_st02 values(3,'zhaoliu',66,'female','student','2020-01-01 10:01:01.666');
    
    create table test.st03 like test.st01;
    

    先是设置好两个sql语句:

      /**
       * 简单测试
       */
      val MYSQL_SQL1 =
        """
          |select * from `st01` where 1=1
          |""".stripMargin
    
      /**
       * 测试内容:1、insert into 2、mysql非标准sql函数CONCAT 3、join 4、where
       */
      val MYSQL_SQL2 =
        """
          |insert into `test`.`st03`
          |select s_id,combined_name s_name,s_age,s_sex,s_part,ts from (
          |select
          |a.s_id as s_id
          |,CONCAT(a.s_name,'-',b.s_name) as combined_name
          |,a.s_age+b.s_age as s_age
          |,a.s_sex as s_sex
          |,'none' as s_part
          |,current_timestamp as ts
          |from `test`.`st01` a inner join `test`.`st02` b on a.s_id=b.s_id
          |where a.s_sex='male'
          |) t0 order by ts limit 2
          |""".stripMargin.trim
    

    初始化数据库连接参数:

      val MYSQL_DATABASE = "test"
      val MYSQL_USERNAME = "root"
      val MYSQL_PASSWORD = "你的密码"
      val MYSQL_JDBC_URL = s"jdbc:mysql://192.168.100.100:3306/${MYSQL_DATABASE}?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true"
    

    依赖:

    import org.apache.calcite.config.{Lex}
    import org.apache.calcite.sql.{SqlBasicCall, SqlIdentifier, SqlNode, SqlNumericLiteral,  SqlSelect}
    import org.apache.calcite.sql.parser.SqlParser
    import org.apache.calcite.adapter.jdbc.JdbcSchema
    import org.apache.calcite.jdbc.{CalciteConnection}
    import org.apache.calcite.plan.RelOptTable
    import org.apache.calcite.rel.{ RelRoot}
    import org.apache.calcite.rel.metadata.{RelColumnOrigin, RelMetadataQuery}
    import org.apache.calcite.schema.SchemaPlus
    import org.apache.calcite.server.CalciteServerStatement
    import org.apache.calcite.sql.fun.{SqlLibrary, SqlLibraryOperatorTableFactory}
    import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidator}
    import org.apache.calcite.tools.{FrameworkConfig, Frameworks, Planner}
    import org.apache.commons.dbcp2.BasicDataSource
    
    import java.sql.{Connection, DriverManager, ResultSet}
    import org.scalatest.funsuite.AnyFunSuite
    
    import java.util
    import scala.collection.JavaConversions._
    import scala.collection.mutable
    
    // 此处就是根据表结构设置,写死
      def printStandardRs(rs: ResultSet): Unit = {
        while (rs.next()) {
          val s_id = rs.getLong(1)
          val s_name = rs.getString(2)
          val s_age = rs.getInt(3)
          val s_sex = rs.getString(4)
          val s_part = rs.getString(5)
          val ts = rs.getTimestamp(6)
          println(s"Get result s_id:${s_id},s_name:${s_name},s_age:${s_age},s_sex:${s_sex},s_part:${s_part},ts:${ts}")
        }
      }
    // 打印RelRoot内部含有的列的血缘信息
      def printRelRoot(relRoot: RelRoot): Unit = {
        val fields = relRoot.fields
        val mq: RelMetadataQuery = relRoot.rel.getCluster.getMetadataQuery
        for (index <- 0 until fields.size()) {
          val destFieldName = fields(index).getValue
          val origins: util.Set[RelColumnOrigin] = mq.getColumnOrigins(relRoot.rel, index)
          if (origins != null && origins.nonEmpty) {
            val oriStr = origins.map(ori => {
              val depTbl: RelOptTable = ori.getOriginTable
              // val depTblSchema = depTbl.getRelOptSchema
              val fieldNames = depTbl.getRowType.getFieldNames.toSeq
              val depColOrd: Int = ori.getOriginColumnOrdinal
              val depFldName: String = fieldNames(depColOrd)
              val qualifiers: mutable.ListBuffer[String] = mutable.ListBuffer.empty
              qualifiers.addAll(depTbl.getQualifiedName)
              qualifiers.add(depFldName)
              qualifiers.mkString(".")
            }).mkString(",")
            println(s"${destFieldName} <- ${oriStr}")
          }
        }
      }
    

    本文都是使用calcite 的RelMetadataQuery 类,提供的血缘信息查询。
    需要注意的是
    (1)都需要提供数据库名db名.tbl名否则,无法找到表,因其是从SchemaPlus提供的元数据信息找表。
    (2)create table as无法被planner识别,如果需要分析create table as的血缘可能需要自己写正则拆分sql,把select部分单独提取出来,再进行识别。

    2.1 通用版本

        var sql = MYSQL_SQL2
        // 具体连接参数参考:org.apache.calcite.config.CalciteConnectionProperty
        // 具体sql functions参考:org.apache.calcite.sql.fun.SqlLibraryOperators
        val conn = DriverManager.getConnection("jdbc:calcite:fun=mysql;lex=MYSQL;model=inline:" + getDefaultMysqlConnConfig)
        val stmt = conn.createStatement()
    
        try {
          val rs = stmt.executeQuery(sql)
          printStandardRs(rs)
          rs.close()
        } catch {
          case ex: Exception => println(ex.getMessage)
        }
    
        val ccStmt = conn.createStatement().unwrap(classOf[CalciteServerStatement])
        val cxt = ccStmt.createPrepareContext()
    
        val mysqlValidateConfig: SqlValidator.Config = SqlValidator.Config.DEFAULT.withConformance(SqlConformanceEnum.MYSQL_5)
    
        // 获取parse config,用于planner.parse
        val mysqlParseConfig = SqlParser.config()
          .withLex(Lex.MYSQL)
          .withConformance(SqlConformanceEnum.MYSQL_5)
    
        // 获取OperatorTable,operator操作符集合,用于planner.validate
        // 方法一、不含 sql funcs,不可用
        // val calciteCatalogReader=new CalciteCatalogReader(cxt.getRootSchema,List(MYSQL_DATABASE),cxt.getTypeFactory,CalciteConnectionConfig.DEFAULT)
        // 方法二、所有内置的 sql funcs,不推荐使用
        // val sqlFuncs: Seq[SqlFunction] = classOf[SqlLibraryOperators].getFields.toSeq.map(f => f.get(null)).filter(v => v.isInstanceOf[SqlFunction]).map(f=>f.asInstanceOf[SqlFunction])
        // val sqlOperatorTable=SqlOperatorTables.of(sqlFuncs)
        // 方法三、推荐使用,使用扫描注解的方式加载,类SqlLibraryOperators中各个方法都有注解
        // 必须有SqlLibrary.STANDARD,否则 “=”都无法识别。
        val mysqlOperatorTable = SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(util.EnumSet.of(SqlLibrary.STANDARD, SqlLibrary.MYSQL))
    
        val frameworkConfig = Frameworks.newConfigBuilder()
          .parserConfig(mysqlParseConfig)
          .defaultSchema(cxt.getRootSchema.plus())
          .sqlValidatorConfig(mysqlValidateConfig)
          .operatorTable(mysqlOperatorTable)
          .build()
    
        val planner = Frameworks.getPlanner(frameworkConfig)
        val parsedNode = planner.parse(sql)
        val validatedNode = planner.validate(parsedNode)
        val relRoot: RelRoot = planner.rel(validatedNode)
        // println(s"get RelNode:${relRoot}")
        printRelRoot(relRoot)
        stmt.close()
        conn.close()
    

    2.2 代码版本

    区别就是此处使用代码构建dataSource和SchemaPlus

      case class CalciteConn(conn: Connection, schema: SchemaPlus)
    
      def getCalciteMysqlConn(): CalciteConn = {
        Class.forName("com.mysql.cj.jdbc.Driver")
        val dataSource = new BasicDataSource
        dataSource.setUrl(MYSQL_JDBC_URL)
        dataSource.setUsername(MYSQL_USERNAME)
        dataSource.setPassword(MYSQL_PASSWORD)
        Class.forName("org.apache.calcite.jdbc.Driver")
        val connection = DriverManager.getConnection("jdbc:calcite:fun=mysql;lex=MYSQL")
        val calciteConnection = connection.unwrap(classOf[CalciteConnection])
        val rootSchema: SchemaPlus = calciteConnection.getRootSchema
        val schema: JdbcSchema = JdbcSchema.create(rootSchema, MYSQL_DATABASE, dataSource, null, MYSQL_DATABASE)
        rootSchema.add(MYSQL_DATABASE, schema)
        CalciteConn(calciteConnection, rootSchema)
      }
    
        
        var targetSql = MYSQL_SQL2
        println(s"get sql:\n${targetSql}")
        var parserConfig: SqlParser.Config = SqlParser.config()
        parserConfig = parserConfig
          .withLex(Lex.MYSQL)
          .withConformance(SqlConformanceEnum.MYSQL_5)
    
        val calciteConn = getCalciteMysqlConn()
        val rootSchema = calciteConn.schema
        val schemaList = new java.util.ArrayList[String]
        schemaList.add(MYSQL_DATABASE)
    
        val mysqlOperatorTable = SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(SqlLibrary.STANDARD, SqlLibrary.MYSQL)
    
        val mysqlConfig: SqlValidator.Config = SqlValidator.Config.DEFAULT
          .withConformance(SqlConformanceEnum.MYSQL_5)
          .withLenientOperatorLookup(true)
    
        val frameworkConfig: FrameworkConfig = Frameworks.newConfigBuilder
          .defaultSchema(rootSchema)
          .operatorTable(mysqlOperatorTable)
          .parserConfig(parserConfig)
          .sqlValidatorConfig(mysqlConfig)
          .build()
    
        val planner = Frameworks.getPlanner(frameworkConfig)
        val sqlNode: SqlNode = planner.parse(targetSql)
        val validatedNode: SqlNode = planner.validate(sqlNode)
        val relRoot: RelRoot = planner.rel(validatedNode)
        printRelRoot(relRoot)
    

    最终打印结果:

    s_id <- test.hive_st01.s_id
    s_name <- test.hive_st02.s_name,test.hive_st01.s_name
    s_age <- test.hive_st02.s_age,test.hive_st01.s_age
    s_sex <- test.hive_st01.s_sex
    

    参考文章:

    基于Calcite解析Flink SQL列级数据血缘

  • 相关阅读:
    SpringMVC学习|JSON讲解、Controller返回JSON数据、Jackson、JSON乱码处理、FastJson
    2022年前端技术发展趋势
    jdbc回顾
    项目实战之安装依赖npm install
    Python神经网络入门与实战,神经网络算法python实现
    【PostgreSQL】PostgreSQL 15移除了Stats Collector
    es6-promise对象详解
    3-3主机发现-四层发现
    运输层(计算机网络谢希仁第八版)——学习笔记五
    LeetCode-775. 全局倒置与局部倒置【最小后缀,归纳】
  • 原文地址:https://blog.csdn.net/lisacumt/article/details/138530814