• 【Spark】Spark SQL 字段血缘如何实现


    0、背景

    字段血缘是在表处理的过程中将字段的处理过程保留下来。为什么会需要字段血缘呢?

    有了字段间的血缘关系,便可以知道数据的来源去处,以及字段之间的转换关系,这样对数据的质量,治理有很大的帮助。

    Spark SQL 相对于 Hive 来说通常情况下效率会比较高,对于运行时间、资源的使用上面等都会有较大的收益。

    平台计划将 Hive 任务迁移到 Spark SQL 上,同时也需要实现字段血缘的功能。Hive的数据血缘直接Atlas支持,Spark的字段血缘如何实现呢?

    一、SparkSQL扩展

    Spark 是支持扩展的:允许用户对 Spark SQL 的 SQL 解析、逻辑计划的分析和检查、逻辑计划的优化、物理计划的形成等进行扩展,且对 Spark 的源码没有改动,代价也比较小。

    1.1 Spark可扩展的内容

    SparkSessionExtensions是比较重要的一个类,其中定义了注入规则的方法,现在支持以下内容:

    • 【Analyzer Rules】逻辑计划分析规则

    • 【Check Analysis Rules】逻辑计划检查规则

    • 【Optimizer Rules.】 逻辑计划优化规则

    • 【Planning Strategies】形成物理计划的策略

    • 【Customized Parser】自定义的sql解析器

    • 【(External) Catalog listeners  catalog】监听器

    在以上六种可以用户自定义的地方,我们选择了【Check Analysis Rules】。因为该检查规则在方法调用的时候是不需要有返回值的,也就意味着不需要对当前遍历的逻辑计划树进行修改,这正是我们需要的。

    而【Analyzer Rules】、【Optimizer Rules】则需要对当前的逻辑计划进行修改,使得我们难以迭代整个树,难以得到我们想要的结果。

    1.2 实现自己的扩展

    1. class ExtralSparkExtension extends (SparkSessionExtensions => Unit) {
    2. override def apply(spark: SparkSessionExtensions): Unit = {
    3. //字段血缘
    4. spark.injectCheckRule(FieldLineageCheckRuleV3)
    5. //sql解析器
    6. spark.injectParser { case (_, parser) => new ExtraSparkParser(parser) }
    7. }
    8. }

    上面按照这种方式实现扩展,并在 apply 方法中把自己需要的规则注入到SparkSessionExtensions 即可,除了以上四种可以注入的以外还有其他的规则。

    要让 ExtralSparkExtension 起到作用的话我们需要在spark-default.conf下配置spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension,在启动 Spark 任务的时候即可生效。

    注意到我们也实现了一个自定义的SQL解析器,其实该解析器并没有做太多的事情。只是在判断如果该语句包含insert的时候就将 SQLText(SQL语句)设置到一个为FIELD_LINE_AGE_SQL,之所以将SQLText放到FIELD_LINE_AGE_SQL里面。因为在 DheckRule 里面是拿不到SparkPlan的我们需要对SQL再次解析拿到 SprkPlan,而FieldLineageCheckRuleV3的实现也特别简单,重要的在另一个线程实现里面。

    这里我们只关注了insert语句,因为插入语句里面有从某些个表里面输入然后写入到某个表。

    1. class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging{
    2. override def parsePlan(sqlText: String): LogicalPlan = {
    3. val lineAgeEnabled = SparkSession.getActiveSession
    4. .get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
    5. logDebug(s"SqlText: $sqlText")
    6. if(sqlText.toLowerCase().contains("insert")){
    7. if(lineAgeEnabled){
    8. if(FIELD_LINE_AGE_SQL_COULD_SET.get()){
    9. //线程本地变量在这里
    10. FIELD_LINE_AGE_SQL.set(sqlText)
    11. }
    12. FIELD_LINE_AGE_SQL_COULD_SET.remove()
    13. }
    14. }
    15. delegate.parsePlan(sqlText)
    16. }
    17. //调用原始的sqlparser
    18. override def parseExpression(sqlText: String): Expression = {
    19. delegate.parseExpression(sqlText)
    20. }
    21. //调用原始的sqlparser
    22. override def parseTableIdentifier(sqlText: String): TableIdentifier = {
    23. delegate.parseTableIdentifier(sqlText)
    24. }
    25. //调用原始的sqlparser
    26. override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
    27. delegate.parseFunctionIdentifier(sqlText)
    28. }
    29. //调用原始的sqlparser
    30. override def parseTableSchema(sqlText: String): StructType = {
    31. delegate.parseTableSchema(sqlText)
    32. }
    33. //调用原始的sqlparser
    34. override def parseDataType(sqlText: String): DataType = {
    35. delegate.parseDataType(sqlText)
    36. }
    37. }

    1.3 扩展的规则类

    1. case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {
    2. val executor: ThreadPoolExecutor =
    3. ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)
    4. override def apply(plan: LogicalPlan): Unit = {
    5. val sql = FIELD_LINE_AGE_SQL.get
    6. FIELD_LINE_AGE_SQL.remove()
    7. if(sql != null){
    8. //这里我们拿到sql然后启动一个线程做剩余的解析任务
    9. val task = new FieldLineageRunnableV3(sparkSession,sql)
    10. executor.execute(task)
    11. }
    12. }
    13. }

    很简单,我们只是拿到了 SQL 然后便启动了一个线程去得到 SparkPlan,实际逻辑在

    FieldLineageRunnableV3。

    1.4 具体的实现方法

    1.4.1 得到 SparkPlan

    我们在 run 方法中得到 SparkPlan:

    1. override def run(): Unit = {
    2. val parser = sparkSession.sessionState.sqlParser
    3. val analyzer = sparkSession.sessionState.analyzer
    4. val optimizer = sparkSession.sessionState.optimizer
    5. val planner = sparkSession.sessionState.planner
    6. ............
    7. val newPlan = parser.parsePlan(sql)
    8. PASS_TABLE_AUTH.set(true)
    9. val analyzedPlan = analyzer.executeAndCheck(newPlan)
    10. val optimizerPlan = optimizer.execute(analyzedPlan)
    11. //得到sparkPlan
    12. val sparkPlan = planner.plan(optimizerPlan).next()
    13. ...............
    14. if(targetTable != null){
    15. val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
    16. val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
    17. //projection
    18. projectionLineAge(levelProject, sparkPlan.child)
    19. //predication
    20. predicationLineAge(predicates, sparkPlan.child)
    21. ...............

    为什么要使用 SparkPlan 呢?当初我们考虑的时候,物理计划拿取字段关系的时候是比较准的,且链路比较短也更直接。

    在这里补充一下 Spark SQL 解析的过程如下:

    经过SqlParser后会得到逻辑计划,此时表名、函数等都没有解析,还不能执行;经过Analyzer会分析一些绑定信息,例如表验证、字段信息、函数信息;经过Optimizer 后逻辑计划会根据既定规则被优化,这里的规则是RBO,当然 Spark 还支持CBO的优化;经过SparkPlanner后就成了可执行的物理计划。

    我们看一个逻辑计划与物理计划对比的例子:

    一个 SQL 语句:

    1. select item_id,TYPE,v_value,imei from t1
    2. union all
    3. select item_id,TYPE,v_value,imei from t2
    4. union all
    5. select item_id,TYPE,v_value,imei from t3

    逻辑计划:

    物理计划:

    显然简化了很多。

    得到 SparkPlan 后,我们就可以根据不同的SparkPlan节点做迭代处理。

    我们将字段血缘分为两种类型:projection(select查询字段)、predication(wehre查询条件)。

    这两种是一种点对点的关系,即从原始表的字段生成目标表的字段的对应关系。

    想象一个查询是一棵树,那么迭代关系会如下从树的顶端开始迭代,直到树的叶子节点,叶子节点即为原始表:

    那么我们迭代查询的结果应该为

    id ->tab1.id , 

    name->tab1.name,tabb2.name,

    age→tabb2.age。

    注意到有该变量

     val levelProject = new ArrayBuffer

    [ArrayBuffer[NameExpressionHolder]](),通过projecti-onLineAge 迭代后 levelProject 存储了顶层id,name,age对应的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。

    当然也不是简单的递归迭代,还需要考虑特殊情况例如:Join、ExplandExec、Aggregate、Explode、GenerateExec等都需要特殊考虑。

    例子及效果:

    SQL:

    1. with A as (select id,name,age from tab1 where id > 100 ) ,
    2. C as (select id,name,max(age) from A group by A.id,A.name) ,
    3. B as (select id,name,age from tabb2 where age > 28)
    4. insert into tab3
    5. select C.id,concat(C.name,B.name) as name, B.age from
    6. B,C where C.id = B.id

    效果:

    1. {
    2. "edges": [
    3. {
    4. "sources": [
    5. 3
    6. ],
    7. "targets": [
    8. 0
    9. ],
    10. "expression": "id",
    11. "edgeType": "PROJECTION"
    12. },
    13. {
    14. "sources": [
    15. 4,
    16. 7
    17. ],
    18. "targets": [
    19. 1
    20. ],
    21. "expression": "name",
    22. "edgeType": "PROJECTION"
    23. },
    24. {
    25. "sources": [
    26. 5
    27. ],
    28. "targets": [
    29. 2
    30. ],
    31. "expression": "age",
    32. "edgeType": "PROJECTION"
    33. },
    34. {
    35. "sources": [
    36. 6,
    37. 3
    38. ],
    39. "targets": [
    40. 0,
    41. 1,
    42. 2
    43. ],
    44. "expression": "INNER",
    45. "edgeType": "PREDICATE"
    46. },
    47. {
    48. "sources": [
    49. 6,
    50. 5
    51. ],
    52. "targets": [
    53. 0,
    54. 1,
    55. 2
    56. ],
    57. "expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",
    58. "edgeType": "PREDICATE"
    59. },
    60. {
    61. "sources": [
    62. 3
    63. ],
    64. "targets": [
    65. 0,
    66. 1,
    67. 2
    68. ],
    69. "expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",
    70. "edgeType": "PREDICATE"
    71. }
    72. ],
    73. "vertices": [
    74. {
    75. "id": 0,
    76. "vertexType": "COLUMN",
    77. "vertexId": "default.tab3.id"
    78. },
    79. {
    80. "id": 1,
    81. "vertexType": "COLUMN",
    82. "vertexId": "default.tab3.name"
    83. },
    84. {
    85. "id": 2,
    86. "vertexType": "COLUMN",
    87. "vertexId": "default.tab3.age"
    88. },
    89. {
    90. "id": 3,
    91. "vertexType": "COLUMN",
    92. "vertexId": "default.tab1.id"
    93. },
    94. {
    95. "id": 4,
    96. "vertexType": "COLUMN",
    97. "vertexId": "default.tab1.name"
    98. },
    99. {
    100. "id": 5,
    101. "vertexType": "COLUMN",
    102. "vertexId": "default.tabb2.age"
    103. },
    104. {
    105. "id": 6,
    106. "vertexType": "COLUMN",
    107. "vertexId": "default.tabb2.id"
    108. },
    109. {
    110. "id": 7,
    111. "vertexType": "COLUMN",
    112. "vertexId": "default.tabb2.name"
    113. }
    114. ]
    115. }

    二、总结

    在 Spark SQL 的字段血缘实现中,我们通过其自扩展,首先拿到了 insert 语句,在我们自己的检查规则中拿到SQL 语句,通过SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最终得到了物理计划。

    我们通过迭代物理计划,根据不同执行计划做对应的转换,然后就得到了字段之间的对应关系。当前的实现是比较简单的,字段之间是直线的对应关系,中间过程被忽略,如果想实现字段的转换的整个过程也是没有问题的。

  • 相关阅读:
    激活函数总结(四十四):激活函数补充(NLSIG、EvoNorms)
    vuex常用属性
    java选做实验4常用集合类使用
    Linux内核设计与实现 第八章 下半部和推后执行的工作
    Linux下查看根目录各文件内存大小
    .git 目录中有什么?
    Python中的List
    【xubuntu-22.04】精简模式,给intel 盒子安装系统,使用稳定,内存cpu占用低,比之前的版本更加稳定,可以做个服务器使用,也可以上网,功耗低
    【前端验证】验证自动化脚本的最后一块拼图补全——gen_tb
    NNDL:作业3:分别使用numpy和pytorch实现FNN例题
  • 原文地址:https://blog.csdn.net/u011487470/article/details/125403998