如果你的工作是从事数据挖掘、数据仓库建设或者信息系统开发/维护,有没有曾经遇到过如下的烦恼?
面对着几百上千张数据表,不知该如何关联,不知哪些表更有价值
执行着长的令人绝望,慢的无法忍受的SQL脚本,却不敢挥刀整改
准备着新功能上线,但总担心一行代码的修改会造成严重的生产事故
有没有科学的办法,来管理表之间、代码之间的复杂关系?从而帮助开发人员更好地认识和理解业务系统业务与底层表关系、底层表的表间关系,理清当前数据(字段、关键指标或者数据标签)从哪里来、到哪里去,搞清楚哪些下游系统在使用这些数据。
血缘分析是解决这类问题的一种技术手段。数据血缘(Data Lineage),指的是数据从产生、ETL处理、加工、融合、流转到最终消亡,数据之间自然形成一种关系。这些关系就是描述数据的数据(元数据)。掌握了这个元数据,就能最大程度的做好数据的应用和管理。
那么如何推导数据之间的血缘关系呢?其实在开发或者分析师团队提供的成百上千的数据脚本中,每一行代码背后都蕴含着业务逻辑和数据关系。有没有可能通过批量解析这些数据脚本,自动提炼出背后的数据逻辑,以及脚本之间的依赖关系呢?
本文介绍一种针对python代码的推导方法。
既然解析的对象是python代码,我们首先要理解python的编译过程。以 CPython 为例,编译过程如下:
将源代码解析为解析树(Parser Tree)
将解析树转换为抽象语法树(Abstract Syntax Tree)
将抽象语法树转换到控制流图(Control Flow Graph)
根据流图将字节码(bytecode)发送给虚拟机(eval)
我们平常在python开发环境中编写代码时,IDE会提示各种编写过程中的语法错误,本质上是代码静态检查,对代码的内容和结构进行解析和分析,类似编译过程中的前三个步骤,让机器读懂代码并且判断其是否符合规范。
因此我们就可以利用这个原理,通过代码解析自动提取代码中的关键信息,例如代码
引用了哪些外部函数,进一步调用了哪些数据脚本
SQL语句使用了哪个数据源,查询了哪些表,更新了哪些字段
处理逻辑中对字段做了哪些衍生操作,用了什么算法
这种方法虽不能一键生成完整的开发文档,但却能提供大量丰富的线索,有助于快速开展梳理工作,事半功倍。
我们参考Pyflakes来实现以上功能,Pyflakes是Python的一个代码分析包,用来分析代码,发现潜在的代码问题,例如:引入但没有用到的模块、变量创建但是没有使用。查看Pyflakes的源码,可以发现其进一步使用ast 模块,其用于生成和编译 Python 代码的抽象语法树,关于ast的介绍可以进一步查看https://blog.csdn.net/ThinkTimes/article/details/110831176
一般来说pyFlakes是用cmd命令来执行,但是我们这里为了了解其运行机制,使用python的方式来调用其API函数,方便使用debug的方式跟踪程序。具体代码如下:
- from pyflakes import reporter as modReporter
- from pyflakes import api
-
-
- if __name__ == "__main__":
- reporter = modReporter._makeDefaultReporter()
- args = ['C:\\Users\\yzeng\\PycharmProjects\\pythonProject\\flakes']
- warnings = api.checkRecursive(args, reporter)
Args传入的是文件夹信息,然后调用程序checkRecursive检查该文件夹下的所有代码,进一步调用函数checkPath,使用f.read读取代码内容,调用核心函数check(codestr, filename, reporter)。
在这个函数中,首先使用tree = ast.parse(codeString, filename=filename),生成抽象语法树。在pycharm的debug窗口查看tree的结构,如下
这个过程类似语言处理技术,对文字的词法和句法解析以便让机器了解文字含义。Python运行时需要对python脚本内容进行解析,也就是把python脚本的每一个语句进行分类,并且建立语句之间的语法关系,也就是抽象语法树。在这个截图中,可以看到有5个节点,对应源代码的5段代码片段,例如 ImportFrom代码 和 函数定义function define的代码。这5个节点还有其各自的子节点,例如assign节点的子节点是call类型的节点(如果是调用一个函数)。更多语法树的节点类型,可参考https://docs.python.org/3/library/ast.html 不同类型的节点其属性不一样,通用的属性有位置信息,例如col_offset和end_col_offset指的是该代码片段在列的起始和结束位置,type_comment指的是该代码是否有type 类型的注释(可以为函数参数、返回值、变量等添加类型提示,主要目的在于帮助开发工具通过静态检查发现代码中的 Bug)。
而后调用file_tokens = checker.make_tokens(codeString)将代码的所有内容进行分词,记录每一个词在代码中的起始位置,结果如下:
最后,w = checker.Checker(tree, file_tokens=file_tokens, filename=filename) 是主要实现代码检查的函数。在checker类中scopeStack约定语法树的范围,scope本质是一个字典,默认是代码文件级别(即module)。针对所有python内置的标识符(函数),将其赋值到字典self.scope里
- for builtin in self.builtIns:
- self.addBinding(None, Builtin(builtin))
调用self.handleChildren(tree),遍历树里面的每一个节点,计算整棵树的深度nodeDepth,记录每个节点所在的深度,进一步调用handler = self.getNodeHandler(node.__class__),为每一种类型的节点动态加载针对该节点类型的处理函数,并且执行它,例如 函数IMPORTFROM就是针对import from节点执行的函数。在该函数中,解析到具体引入的包名,然后使用addBinding函数,检查这个节点的属性是否在self.scope里,如果没有就新建这个值,如果有则会根据python语法规则进行判断提示警告,例如是否是“重复导入相同的包”。当代码片段是赋值操作的时候,就会调用handleNodeLoad,判断之前引入的变量是否存在且完成赋值。后续在函数checkDeadScopes就会检查该变量是否被使用,没有就会报“imported but unused”的警告。
在了解Pyflakes源码基础上,我们采用下面的代码来遍历语法树,会有一个更直观的感受。
- #借鉴flakes的类Checker
- class linkage_Checker:
- nodeDepth = 0
-
-
- def __init__(self, tree, file_tokens=(), filename='(none)',codestr='none'):
- self._nodeHandlers = {}
- self.codelines = codestr.decode().split('\r\n')
- self.handleChildren(tree)
-
-
-
-
- #遍历语法树
- def handleChildren(self, tree, omit=None):
- for node in checker.iter_child_nodes(tree, omit=omit):
- self.handleNode(node, tree)
-
-
- #针对节点处理
- def handleNode(self, node, parent):
- if node is None:
- return
- self.nodeDepth += 1
- print('-----------------')
- print('节点类型:%s' % node.__class__)
- print('节点层次:%s' % self.nodeDepth)
- try:
- fields = '/'.join([field for field in node.__class__._fields])
- print('节点属性:%s' % fields)
- except:
- print(123)
-
-
- lineno = getattr(node, 'lineno', 0)
- end_lineno = getattr(node, 'end_lineno', 0)
- col_offset = getattr(node, 'col_offset', 0)
- end_col_offset = getattr(node, 'end_col_offset', 0)
-
-
- print('起始行:%s' %lineno )
- print('结束行:%s' %end_lineno )
- print('起始列:%s' %col_offset )
- print('结束列:%s' %end_col_offset )
- if lineno > 0:
- getCodebyposition(self.codelines, lineno, end_lineno, col_offset, end_col_offset)
-
-
- try:
- handler = self.getNodeHandler(node.__class__)
- handler(node)
- finally:
- self.nodeDepth -= 1
-
-
- #针对节点类型获得对应的处理函数
- def getNodeHandler(self, node_class):
- try:
- return self._nodeHandlers[node_class]
- except KeyError:
- nodeType = checker.getNodeType(node_class)
- self._nodeHandlers[node_class] = handler = getattr(
- self, nodeType, self._unknown_handler,
- )
- return handler
-
-
- #默认就使用 遍历的函数
- def _unknown_handler(self, node):
- self.handleChildren(node)
-
-
-
-
- #解析语法树
- def check(codestr, filename, reporter=None):
- try:
- tree = ast.parse(codestr, filename=filename)
- except SyntaxError:
- value = sys.exc_info()[1]
- msg = value.args[0]
- (lineno, offset, text) = value.lineno, value.offset, value.text
- print(lineno, offset, text)
- # 分词
- file_tokens = checker.make_tokens(codestr)
- w = linkage_Checker(tree, file_tokens=file_tokens,filename=filename,codestr=codestr)
- return 1
-
-
- def getCodebyposition(codelines,lineno,end_lineno,col_offset,end_col_offset):
- for i in range(lineno,end_lineno+1):
- if i == lineno and lineno == end_lineno:
- print(codelines[lineno-1][col_offset:end_col_offset])
- elif i == lineno:
- print(codelines[lineno - 1][col_offset:])
- elif i == end_lineno:
- print(codelines[end_lineno - 1][:end_col_offset])
- else:
- print(codelines[i - 1])
- return 1
通过代码的运行结果可以看到树状的层次结构,
例如代码:
records = pd.read_sql_query('''select cast(date as date) as NaturalDay, symbol from table where date between begin and end’’’)
是一个ast.assign类型,它的一个子节点是ast.call,对应的代码是
pd.read_sql_query('''select cast(date as date) as NaturalDay, symbol from table where date between begin and end’’’)
其有一个类型为ast.Attribute子节点,对应代码是pd.read_sql_query。这个节点又有一个ast.Name的子节点,对应代码是pd。可见,语法树是把一段代码按照语法结构解析的树状结果,以便编译器进一步将抽象语法树转换为更接近机器代码的 control flow Graph。
进一步,在充分理解ast解析过程和语法树结构的基础上,我们可以针对关心的代码片段进行提取,例如关心“类相关”或者“sql”相关的代码片段。这里我们自定义一个解析sql代码的函数,能够自动提取其用到的表名和字段名。
- #解析其中一段sql语句中的字段和表名
- def getTableField(statement):
- result = {}
- matchObj = re.search( r'select(.*)from(.*)', statement, re.M|re.I)
- if pd.notnull(matchObj):
- fields = re.split(',', matchObj.group(1))
- fields = [field.strip() for field in fields]
- table = matchObj.group(2)
- # table = table.split()
- table = table.strip()
- result[table] = fields
- return result
-
-
- #解析sql语句中的字段和表名, 参考 https://www.robin.eu.org/programming/extracting-table-and-column-names-from-sql-query/
- def sqlparse(sql_str):
- sql_str = sql_str.replace('SELECT', 'select')
- sql_str = sql_str.replace('WHERE', 'where')
- sql_str = sql_str.replace('FROM', 'from')
- re_skip_detail = re.compile("([a-zA-Z0-9]+)") # 匹配英文和数字
- tmp = re_skip_detail.split(sql_str)
- # tmp =[x for x in tmp if len(x.strip())>0]
- select_index = from_index = 0
- parse_result = []
- for index,item in enumerate(tmp):
- # print(item)
- if item in ('select','where'):
- if from_index > 0 : # 如果前面已经有比较完整的sql语句了,也就是已经出现from了
- statement = ''.join(tmp[select_index : index]) # 截取到当前的位置
- # print('语句:%s' %statement)
- if len(statement) > 0:
- table_fields = getTableField(statement)
- parse_result.append(table_fields)
- # print(parse_result)
- # 新的开始
- from_index = 0
- if item == 'select':
- select_index = index
- elif item == 'from':
- from_index = index
-
-
- return parse_result
好啦,大功告成,最后展示的结果如下: