• 数据血缘分析-Python代码的智能解析


    如果你的工作是从事数据挖掘、数据仓库建设或者信息系统开发/维护,有没有曾经遇到过如下的烦恼?

    •  面对着几百上千张数据表,不知该如何关联,不知哪些表更有价值

    • 执行着长的令人绝望,慢的无法忍受的SQL脚本,却不敢挥刀整改

    • 准备着新功能上线,但总担心一行代码的修改会造成严重的生产事故

    有没有科学的办法,来管理表之间、代码之间的复杂关系?从而帮助开发人员更好地认识和理解业务系统业务与底层表关系、底层表的表间关系,理清当前数据(字段、关键指标或者数据标签)从哪里来、到哪里去,搞清楚哪些下游系统在使用这些数据。

    血缘分析是解决这类问题的一种技术手段。数据血缘(Data Lineage),指的是数据从产生、ETL处理、加工、融合、流转到最终消亡,数据之间自然形成一种关系。这些关系就是描述数据的数据(元数据)。掌握了这个元数据,就能最大程度的做好数据的应用和管理。

    那么如何推导数据之间的血缘关系呢?其实在开发或者分析师团队提供的成百上千的数据脚本中,每一行代码背后都蕴含着业务逻辑和数据关系。有没有可能通过批量解析这些数据脚本,自动提炼出背后的数据逻辑,以及脚本之间的依赖关系呢?

    本文介绍一种针对python代码的推导方法。

    既然解析的对象是python代码,我们首先要理解python的编译过程。以 CPython 为例,编译过程如下:

    1. 将源代码解析为解析树(Parser Tree)

    2. 将解析树转换为抽象语法树(Abstract Syntax Tree)

    3. 将抽象语法树转换到控制流图(Control Flow Graph)

    4. 根据流图将字节码(bytecode)发送给虚拟机(eval)

    我们平常在python开发环境中编写代码时,IDE会提示各种编写过程中的语法错误,本质上是代码静态检查,对代码的内容和结构进行解析和分析,类似编译过程中的前三个步骤,让机器读懂代码并且判断其是否符合规范。

    因此我们就可以利用这个原理,通过代码解析自动提取代码中的关键信息,例如代码

    • 引用了哪些外部函数,进一步调用了哪些数据脚本

    • SQL语句使用了哪个数据源,查询了哪些表,更新了哪些字段

    • 处理逻辑中对字段做了哪些衍生操作,用了什么算法

    这种方法虽不能一键生成完整的开发文档,但却能提供大量丰富的线索,有助于快速开展梳理工作,事半功倍。

    我们参考Pyflakes来实现以上功能,Pyflakes是Python的一个代码分析包,用来分析代码,发现潜在的代码问题,例如:引入但没有用到的模块、变量创建但是没有使用。查看Pyflakes的源码,可以发现其进一步使用ast 模块,其用于生成和编译 Python 代码的抽象语法树,关于ast的介绍可以进一步查看https://blog.csdn.net/ThinkTimes/article/details/110831176

    一般来说pyFlakes是用cmd命令来执行,但是我们这里为了了解其运行机制,使用python的方式来调用其API函数,方便使用debug的方式跟踪程序。具体代码如下:

    1. from pyflakes import reporter as modReporter
    2. from pyflakes import api
    3. if __name__ == "__main__":
    4. reporter = modReporter._makeDefaultReporter()
    5. args = ['C:\\Users\\yzeng\\PycharmProjects\\pythonProject\\flakes']
    6. warnings = api.checkRecursive(args, reporter)

    Args传入的是文件夹信息,然后调用程序checkRecursive检查该文件夹下的所有代码,进一步调用函数checkPath,使用f.read读取代码内容,调用核心函数check(codestr, filename, reporter)。

    在这个函数中,首先使用tree = ast.parse(codeString, filename=filename),生成抽象语法树。在pycharm的debug窗口查看tree的结构,如下

    9e1a95808516387c84d594a86baa3f95.png

    这个过程类似语言处理技术,对文字的词法和句法解析以便让机器了解文字含义。Python运行时需要对python脚本内容进行解析,也就是把python脚本的每一个语句进行分类,并且建立语句之间的语法关系,也就是抽象语法树。在这个截图中,可以看到有5个节点,对应源代码的5段代码片段,例如 ImportFrom代码 和 函数定义function define的代码。这5个节点还有其各自的子节点,例如assign节点的子节点是call类型的节点(如果是调用一个函数)。更多语法树的节点类型,可参考https://docs.python.org/3/library/ast.html 不同类型的节点其属性不一样,通用的属性有位置信息,例如col_offset和end_col_offset指的是该代码片段在列的起始和结束位置,type_comment指的是该代码是否有type 类型的注释(可以为函数参数、返回值、变量等添加类型提示,主要目的在于帮助开发工具通过静态检查发现代码中的 Bug)。

    而后调用file_tokens = checker.make_tokens(codeString)将代码的所有内容进行分词,记录每一个词在代码中的起始位置,结果如下:

    89a8190852d34fa8bc6da2225536fcca.png

    最后,w = checker.Checker(tree, file_tokens=file_tokens, filename=filename) 是主要实现代码检查的函数。在checker类中scopeStack约定语法树的范围,scope本质是一个字典,默认是代码文件级别(即module)。针对所有python内置的标识符(函数),将其赋值到字典self.scope里

    1. for builtin in self.builtIns:
    2. self.addBinding(None, Builtin(builtin))

    调用self.handleChildren(tree),遍历树里面的每一个节点,计算整棵树的深度nodeDepth,记录每个节点所在的深度,进一步调用handler = self.getNodeHandler(node.__class__),为每一种类型的节点动态加载针对该节点类型的处理函数,并且执行它,例如 函数IMPORTFROM就是针对import from节点执行的函数。在该函数中,解析到具体引入的包名,然后使用addBinding函数,检查这个节点的属性是否在self.scope里,如果没有就新建这个值,如果有则会根据python语法规则进行判断提示警告,例如是否是“重复导入相同的包”。当代码片段是赋值操作的时候,就会调用handleNodeLoad,判断之前引入的变量是否存在且完成赋值。后续在函数checkDeadScopes就会检查该变量是否被使用,没有就会报“imported but unused”的警告。

    在了解Pyflakes源码基础上,我们采用下面的代码来遍历语法树,会有一个更直观的感受。

    1. #借鉴flakes的类Checker
    2. class linkage_Checker:
    3. nodeDepth = 0
    4. def __init__(self, tree, file_tokens=(), filename='(none)',codestr='none'):
    5. self._nodeHandlers = {}
    6. self.codelines = codestr.decode().split('\r\n')
    7. self.handleChildren(tree)
    8. #遍历语法树
    9. def handleChildren(self, tree, omit=None):
    10. for node in checker.iter_child_nodes(tree, omit=omit):
    11. self.handleNode(node, tree)
    12. #针对节点处理
    13. def handleNode(self, node, parent):
    14. if node is None:
    15. return
    16. self.nodeDepth += 1
    17. print('-----------------')
    18. print('节点类型:%s' % node.__class__)
    19. print('节点层次:%s' % self.nodeDepth)
    20. try:
    21. fields = '/'.join([field for field in node.__class__._fields])
    22. print('节点属性:%s' % fields)
    23. except:
    24. print(123)
    25. lineno = getattr(node, 'lineno', 0)
    26. end_lineno = getattr(node, 'end_lineno', 0)
    27. col_offset = getattr(node, 'col_offset', 0)
    28. end_col_offset = getattr(node, 'end_col_offset', 0)
    29. print('起始行:%s' %lineno )
    30. print('结束行:%s' %end_lineno )
    31. print('起始列:%s' %col_offset )
    32. print('结束列:%s' %end_col_offset )
    33. if lineno > 0:
    34. getCodebyposition(self.codelines, lineno, end_lineno, col_offset, end_col_offset)
    35. try:
    36. handler = self.getNodeHandler(node.__class__)
    37. handler(node)
    38. finally:
    39. self.nodeDepth -= 1
    40. #针对节点类型获得对应的处理函数
    41. def getNodeHandler(self, node_class):
    42. try:
    43. return self._nodeHandlers[node_class]
    44. except KeyError:
    45. nodeType = checker.getNodeType(node_class)
    46. self._nodeHandlers[node_class] = handler = getattr(
    47. self, nodeType, self._unknown_handler,
    48. )
    49. return handler
    50. #默认就使用 遍历的函数
    51. def _unknown_handler(self, node):
    52. self.handleChildren(node)
    53. #解析语法树
    54. def check(codestr, filename, reporter=None):
    55. try:
    56. tree = ast.parse(codestr, filename=filename)
    57. except SyntaxError:
    58. value = sys.exc_info()[1]
    59. msg = value.args[0]
    60. (lineno, offset, text) = value.lineno, value.offset, value.text
    61. print(lineno, offset, text)
    62. # 分词
    63. file_tokens = checker.make_tokens(codestr)
    64. w = linkage_Checker(tree, file_tokens=file_tokens,filename=filename,codestr=codestr)
    65. return 1
    66. def getCodebyposition(codelines,lineno,end_lineno,col_offset,end_col_offset):
    67. for i in range(lineno,end_lineno+1):
    68. if i == lineno and lineno == end_lineno:
    69. print(codelines[lineno-1][col_offset:end_col_offset])
    70. elif i == lineno:
    71. print(codelines[lineno - 1][col_offset:])
    72. elif i == end_lineno:
    73. print(codelines[end_lineno - 1][:end_col_offset])
    74. else:
    75. print(codelines[i - 1])
    76. return 1

    通过代码的运行结果可以看到树状的层次结构,

    00ab51680854ee8b48a027c32eb84b6f.png

    例如代码:

    records = pd.read_sql_query('''select cast(date as date) as NaturalDay, symbol from table where date between begin and end’’’)

    是一个ast.assign类型,它的一个子节点是ast.call,对应的代码是

    pd.read_sql_query('''select cast(date as date) as NaturalDay, symbol from table where date between begin and end’’’)

    其有一个类型为ast.Attribute子节点,对应代码是pd.read_sql_query。这个节点又有一个ast.Name的子节点,对应代码是pd。可见,语法树是把一段代码按照语法结构解析的树状结果,以便编译器进一步将抽象语法树转换为更接近机器代码的 control flow Graph。

    进一步,在充分理解ast解析过程和语法树结构的基础上,我们可以针对关心的代码片段进行提取,例如关心“类相关”或者“sql”相关的代码片段。这里我们自定义一个解析sql代码的函数,能够自动提取其用到的表名和字段名。

    1. #解析其中一段sql语句中的字段和表名
    2. def getTableField(statement):
    3. result = {}
    4. matchObj = re.search( r'select(.*)from(.*)', statement, re.M|re.I)
    5. if pd.notnull(matchObj):
    6. fields = re.split(',', matchObj.group(1))
    7. fields = [field.strip() for field in fields]
    8. table = matchObj.group(2)
    9. # table = table.split()
    10. table = table.strip()
    11. result[table] = fields
    12. return result
    13. #解析sql语句中的字段和表名, 参考 https://www.robin.eu.org/programming/extracting-table-and-column-names-from-sql-query/
    14. def sqlparse(sql_str):
    15. sql_str = sql_str.replace('SELECT', 'select')
    16. sql_str = sql_str.replace('WHERE', 'where')
    17. sql_str = sql_str.replace('FROM', 'from')
    18. re_skip_detail = re.compile("([a-zA-Z0-9]+)") # 匹配英文和数字
    19. tmp = re_skip_detail.split(sql_str)
    20. # tmp =[x for x in tmp if len(x.strip())>0]
    21. select_index = from_index = 0
    22. parse_result = []
    23. for index,item in enumerate(tmp):
    24. # print(item)
    25. if item in ('select','where'):
    26. if from_index > 0 : # 如果前面已经有比较完整的sql语句了,也就是已经出现from了
    27. statement = ''.join(tmp[select_index : index]) # 截取到当前的位置
    28. # print('语句:%s' %statement)
    29. if len(statement) > 0:
    30. table_fields = getTableField(statement)
    31. parse_result.append(table_fields)
    32. # print(parse_result)
    33. # 新的开始
    34. from_index = 0
    35. if item == 'select':
    36. select_index = index
    37. elif item == 'from':
    38. from_index = index
    39. return parse_result

    好啦,大功告成,最后展示的结果如下:

    55d43a3ac86de7d335b6902ea884c58e.png

  • 相关阅读:
    产品经理进阶:外包原因及类型(一)
    利用随机森林对特征重要性进行评估(公式原理)
    北京一互联网公司被端,所有开发被全部带走!
    SpringBoot解压zip包,读取每个文件内容
    Linux at任务调度机制
    Ab3d.DXEngine 6.0 Crack 2023
    计算机视觉CV:在自动驾驶方面的应用与C++代码实现
    初级程序员如何进阶
    汽车零部件企业信邦控股之项目管理实践案例
    计数排序(超详细)
  • 原文地址:https://blog.csdn.net/u011598442/article/details/125532174