在数据中台中,通常我们面对的是海量的基于数仓的ETL、取数、建模、业务调用等等的数据操作任务,面对错综复杂的调度依赖关系,当出现问题需要快速追溯数据链路、以及对热点资源的盘点治理,人工进行整理效率太低。所以目前一般的数据平台都会实现一个数据地图(任务/SQL维度的血缘)的产品帮助开发/运维更好的进行数据治理。
今天主要分享下SQL维度的血缘实现,对于一个数据开发任务一般对应一个脚本开发任务,离线的可能就是HiveSQL脚本,实时的可能就是Imapla;presto;SparkSQL;FlinkSQL等等脚本任务,我们除了可以在任务的维度进行血缘关系捕获(一般有调度平台可以维护任务的依赖关系),还可以基于离线或者实时hook捕获对应引擎(比如hive;presto;flink等)执行的SQL脚本来进行sql血缘的分析(具体的场景看公司的需求)。
目前来讲,Hive;Presto;Flink等SQL模块大部分的解析引擎都使用了ANTLR进行AST树的生成,然后进行一系列的优化,再生成对应引擎可执行的操作任务,那么我们是不是可以直接复用他们解析的这块功能?答案肯定是可以的,比如Apache Altas产品就是大量使用Hook插件,复用这些计算引擎提供的SQL解析与血缘分析的接口服务,但是有没有缺点呢?肯定是有的,大量的Hook插件对于计算引擎的侵入以及性能的影响肯定是有的。
得益于现在很多的开源框架,对于SQL AST的生成这块都有好几个框架进行了实现,使得我们不必再局限于各个计算引擎本身所自带的SQL解析之中。比如现在的Druid(官方号称是比antlr;javacc等快10-100倍数)、gudusoft、Antlr、javacc等等都是比较优秀的解析框架,接下来,我随便分享几个。。。
Hive源码中parser模块,主要就是基于antrl文件(.g)词法(比如Lexer.g)语法(比如HiveParser)等文件,生成各种解析以及AST访问遍历的实现类。antlr具体的实现后续单独讨论。
对于Hive血缘解析的这块可以参考以下这个血缘的Hook (Altas就是类似这种方式实现的血缘采集)
看以下已有的方法:生成边;创建目标节点/来源节点;获取边以及节点:
边(关系);来源表;目标表; sql表达式;表类型。。
上述方法对于字段级别表级别的血缘数据采集已经比较完善了,而且代码的话可以直接复用或者借鉴自己实现一个采集的血缘HOOK。开源社区也有单独将Hive解析部分抠出来的项目,大家自行找下,hook使用非本篇重点这里不再多讲…
gudusoft有一个商业化的应用产品叫SqlFlow
注意的几个问题:
其实不要被官方说明唬住了,相应的jar包我们还是能用的且不收费,比如 gudusoft.gsqlparser-2.5.1.9.jar(稍后我传上去),这个需要我们将解析的结果进行提取,或者用它的工具方法进行解析等等(完全免费)。
先看下sqlflow的解析功能吧
能支持的粒度: 表级别与字段级别(包括函数)
-- case 1 : mysql
SELECT `user`.name,
`order`.price
FROM `user`,
`order`
WHERE `user`.id = `order`.uid
解析字段血缘结果:
name(`user`) -> name(RS-1)
price(`order`) -> price(RS-1)
支持show function:
-- case 2 :mysql
SELECT `user`.name,
pow(floor(`order`.price),2) as calculated_price
FROM `user`,
`order`
WHERE `user`.id = `order`.uid
解析字段血缘结果(包含中间被函数转换的过程):
name(`user`) -> name(RS-1)
price(`order`) -> floor -> pow -> calculated_price(RS-1)
SELECT temp.name, max(temp.price) as price
FROM (
SELECT `user`.name,
`order`.price
FROM `user`,
`order`
WHERE user.id = order.uid
) as temp
GROUP BY temp.name
解析字段血缘结果:
name(`user`) -> name(result of temp) -> name(RS-1)
price(`order`) -> price(result of temp) -> price(RS-1)
支持多种
源码里支持的种类如下: 已覆盖 Hive、Mysql、SparkSQL、Presto、impala这些常用的SQL引擎
dbvaccess,
dbvansi,
dbvathena,
dbvazuresql,
dbvbigquery,
dbvcouchbase,
dbvdax,
dbvdb2,
dbvexasol,
dbvfirebird,
dbvgeneric,
dbvgreenplum,
dbvhana,
dbvhive,
dbvimpala,
dbvinformix,
dbvmdx,
dbvmysql,
dbvmssql,
dbvnetezza,
dbvodbc,
dbvopenedge,
dbvoracle,
dbvpostgresql,
dbvpresto,
dbvredshift,
dbvsnowflake,
dbvsoql,
dbvsparksql,
dbvsybase,
dbvteradata,
dbvtrino,
dbvvertica;
略
java版, python版demo都有, 本篇先不展开,有兴趣的可以自己下载研究下
自己解析需要自己去遍历AST树,在每一层的信息也需要自己去保存,上下层之间的关系也需要自己维护,直接写比较麻烦点(后面优化下代码设计其实也还可以):
import gudusoft.gsqlparser.EDbVendor;
import gudusoft.gsqlparser.TGSqlParser;
import gudusoft.gsqlparser.TStatementList;
import gudusoft.gsqlparser.nodes.TExpression;
import gudusoft.gsqlparser.nodes.TResultColumn;
import gudusoft.gsqlparser.nodes.TResultColumnList;
import gudusoft.gsqlparser.stmt.TCreateTableSqlStatement;
import gudusoft.gsqlparser.stmt.TSelectSqlStatement;
import java.util.LinkedList;
import java.util.stream.StreamSupport;
/**
* @author pushkin
* @version v1.0.0
* @date 2022/6/1 16:20
*
* Modification History:
* Date Author Version Description
* ------------------------------------------------------------
*/
public class DemoD {
public static void main(String[] args) {
// 注SQL已脱敏
String sql1 = "CREATE TABLE tmp.jinpushi_066\n" +
"AS\n" +
"SELECT \n" +
"T1.a1, " +
"T1.a2, " +
"T1.a3, " +
"T1.a4 " +
",T2.b1" +
",T2.b2 \n" +
",T3.b3\n" +
",T2.b4\n" +
",T1.a5\n" +
"FROM dwd.jinpushi_01 T1\n" +
"LEFT JOIN dim.jinpushi_02 T2\n" +
"ON T1.id = T2.id\n" +
"LEFT JOIN ods.jinpushi_03 T3\n" +
"ON T1.code = T3.value\n" +
"AND T3.tyoe = 12306\n" +
";\n" +
"INSERT OVERWRITE TABLE dwb.jinpushi_05 \n" +
"SELECT * FROM tmp.jinpushi_066\n" +
";";
TGSqlParser sqlparser = new TGSqlParser(EDbVendor.dbvhive);
sqlparser.sqltext = sql1;
int ret = sqlparser.parse();
if (ret == 0) {
// 解析出所有语句
TStatementList stmts = sqlparser.getSqlstatements();
// 拿到create table语句的实例
TCreateTableSqlStatement stmt = (TCreateTableSqlStatement) stmts.get(0);
// 从create table语句的子查询中,拿到select语句的实例,再获取column
TSelectSqlStatement subquery = stmt.getSubQuery();
TResultColumnList columns = subquery.getResultColumnList();
LinkedList[] lineages = new LinkedList[columns.size()];
for (int i = 0; i < columns.size(); i++) {
TResultColumn column = columns.getResultColumn(i);
LinkedList lineage = lineages[i] = new LinkedList<>();
lineage.addFirst(String.format("%s(%s)", column.getDisplayName(), stmt.getTableName()));
lineage.addFirst(String.format("%s(RS-1)", column.getDisplayName()));
String columnName = "";
if (column.getExpr() != null) {
TExpression expr = column.getExpr();
while (expr.getFunctionCall() != null) {
expr = expr.getFunctionCall().getArgs().getExpression(0);
}
columnName = expr.toString();
}
String[] pair = columnName.split("\\.");
if (pair.length == 2) {
// 有alias,在alias对应的select语句中搜索
String prefix = pair[0];
String columnDisplayName = pair[1];
lineage.addFirst(String.format("%s(%s)", columnDisplayName, prefix));
StreamSupport
.stream(subquery.tables.spliterator(), false)
.filter(t -> t.getAliasClause().toString().equalsIgnoreCase(prefix))
.findFirst().ifPresent(table -> {
TSelectSqlStatement subquery1 = table.subquery;
if (subquery1 != null) {
TResultColumnList resultColumnList = subquery1.getResultColumnList();
resultColumnList.forEach(tableColumn -> {
if (columnDisplayName.equalsIgnoreCase(tableColumn.getDisplayName())) {
if (tableColumn.getExpr().getFunctionCall() == null) {
lineage.addFirst(String.format("%s(%s)", columnDisplayName, table.subquery.tables.getTable(0).getTableName()));
} else {
lineage.addFirst(String.format("%s(%s)",
tableColumn.getExpr().getFunctionCall().getArgs().getElement(0),
table.subquery.tables.getTable(0).getTableName()));
}
}
});
}
});
} else if (pair.length == 1) {
// 没有alias,在所有的select语句中搜索
String columnDisplayName = pair[0];
StreamSupport
.stream(subquery.tables.spliterator(), false)
.filter(t -> {
if (t.subquery != null) {
for (int j = 0; j < t.subquery.getResultColumnList().size(); j++) {
if (t.subquery.getResultColumnList().getResultColumn(j).getDisplayName().equalsIgnoreCase(columnDisplayName)) {
return true;
}
}
}
return false;
})
.findFirst().ifPresent(table -> {
lineage.addFirst(String.format("%s(%s)", columnDisplayName, table.getAliasClause()));
lineage.addFirst(String.format("%s(%s)", columnDisplayName, table.subquery.tables.getTable(0)));
});
}
}
for (LinkedList lineage : lineages) {
System.out.println(String.join(" -> ", lineage));
}
} else {
System.out.println(sqlparser.getErrormessage());
}
}
}
运行结果:
a1(T1) -> a1(RS-1) -> a1(tmp.jinpushi_066)
a2(T1) -> a2(RS-1) -> a2(tmp.jinpushi_066)
a3(T1) -> a3(RS-1) -> a3(tmp.jinpushi_066)
a4(T1) -> a4(RS-1) -> a4(tmp.jinpushi_066)
b1(T2) -> b1(RS-1) -> b1(tmp.jinpushi_066)
b2(T2) -> b2(RS-1) -> b2(tmp.jinpushi_066)
b3(T3) -> b3(RS-1) -> b3(tmp.jinpushi_066)
b4(T2) -> b4(RS-1) -> b4(tmp.jinpushi_066)
a5(T1) -> a5(RS-1) -> a5(tmp.jinpushi_066)
import gudusoft.gsqlparser.EDbVendor;
import gudusoft.gsqlparser.dlineage.DataFlowAnalyzer;
import gudusoft.gsqlparser.dlineage.dataflow.model.json.Dataflow;
import gudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow;
import gudusoft.gsqlparser.dlineage.util.RemoveDataflowFunction;
public class TestGSql {
public static void main(String[] args) {
String sql2 = "CREATE TABLE tmp.tmp_table_a AS \n" +
"SELECT T1.id, T1.age, T2.name \n" +
"FROM dwd.table1 T1 \n" +
"LEFT JOIN \n" +
"dwd.table2 T2 ON T1.id = T2.id";
System.out.println(getSqlLineage(sql2));
}
private static Dataflow getSqlLineage(String sql) {
DataFlowAnalyzer dlineage = new DataFlowAnalyzer(sql, EDbVendor.dbvhive, false);
dlineage.setSqlEnv(null);
dlineage.setShowJoin(true);
dlineage.setIgnoreRecordSet(true);
dlineage.setLinkOrphanColumnToFirstTable(false);
dlineage.setTextFormat(false);
dlineage.generateDataFlow();
dataflow dataFlow = dlineage.getDataFlow();
dataFlow.getDatabases();
dataflow dataflow = new RemoveDataflowFunction().removeFunction(dlineage.getDataFlow());
Dataflow flow = DataFlowAnalyzer.getSqlflowJSONModel(dataflow);
return flow;
}
}
调试结果:
表(包括对应的字段):
关系
根据以上的信息,其实我们完全可以从中解析出我们想要的结果(表血缘;字段级别血缘)
唯一不足的是对于select * 我们不太好解析,这块后续我也会提供具体实现方案
pass
等待我有时间写吧
https://github.com/Shkin1/hathor
等待我有时间写吧
等待我有时间写吧
pass