• superset支持Kylin4.0.0(兼容处理日期分组功能)


    问题总结

    1. superset默认不支持kylin数据源

    安装kylinpy

    pip install kylinpy

    2. 安装kylin驱动后,无法连接kylin地址

    修改site-packages/kylinpy/sqla_dialect.py文件

    2.1 注释一下内容

    1. #def _compose_select_body(self, text, select, inner_columns, froms, byfrom, kwargs):
    2. # text = super(KylinSQLCompiler, self)._compose_select_body(
    3. # text, select, inner_columns, froms, byfrom, kwargs)
    4. # return text

    2.2 修改has_table的返回值

    return False ==> return table_name in self.get_table_names(connection, schema)

    3. 连接成功后,无法使用日期字段及粒度制作相关图,比如日期的折线图

    kylin4使用的spark引擎,语法group by不支持别名前面的表达式,修改为使用别名代替

    3.1 修改文件site-packages/kylinpy/kylinpy.py添加方法

    1. import re
    2. def between(_sql, start_s, end_s):
    3. group_start = _sql.find(start_s)
    4. if group_start == -1:
    5. print("未找到group by")
    6. return
    7. group_end = _sql.find(end_s)
    8. if group_end == -1:
    9. group_end = len(_sql)
    10. groups = _sql[group_start + len(start_s):group_end]
    11. print(groups)
    12. return groups
    13. def select_map(_sql):
    14. fields = between(_sql, "select", "from")
    15. field_arr = fields.split(",")
    16. f_map = {}
    17. for field in field_arr:
    18. findall = re.findall(r"(.*)\s+as?\s+(\"[^\"]+\")", field)
    19. if findall:
    20. findall_ = findall[0]
    21. expression = findall_[0]
    22. alias = findall_[1]
    23. print(expression, alias)
    24. f_map[expression] = alias
    25. return f_map
    26. def convert(r_sql):
    27. lower_sql = r_sql.lower()
    28. if lower_sql.count("from") == 1 and lower_sql.__contains__("group by"):
    29. _sql = re.sub(r"\s{2,}", " ", lower_sql)
    30. print(_sql)
    31. # 获取select的字段map
    32. _select_map = select_map(_sql)
    33. groups = between(_sql, "group by", "order by")
    34. raw_group = "group by" + groups
    35. group_arr = groups.split(",")
    36. new_group = []
    37. for g in group_arr:
    38. g_v = _select_map[g.rstrip()]
    39. if g_v:
    40. new_group.append(groups.replace(g, g_v))
    41. new_group = "group by " + ",".join(new_group) + " "
    42. _sql = _sql.replace(raw_group, new_group).upper()
    43. print(_sql)
    44. return _sql
    45. else:
    46. return r_sql

    3.2 修改def query(self, sql, **parameters):方法,把sql改为convert(sql)

    3.3 修改superset/db_engine_specs/kylin.py

    1. # #默认的时间粒度表达式
    2. # _time_grain_expressions = {
    3. # None: "{col}",
    4. # TimeGrain.SECOND: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO SECOND) AS TIMESTAMP)",
    5. # TimeGrain.MINUTE: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO MINUTE) AS TIMESTAMP)",
    6. # TimeGrain.HOUR: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO HOUR) AS TIMESTAMP)",
    7. # TimeGrain.DAY: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO DAY) AS DATE)",
    8. # TimeGrain.WEEK: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO WEEK) AS DATE)",
    9. # TimeGrain.MONTH: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO MONTH) AS DATE)",
    10. # TimeGrain.QUARTER: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO QUARTER) AS DATE)",
    11. # TimeGrain.YEAR: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO YEAR) AS DATE)",
    12. # }
    13. #kylin4 spark引擎支持的语法
    14. _time_grain_expressions = {
    15. None: "{col}",
    16. "PT1S": "TIMESTAMPADD(SECOND, -(SECOND(CAST({col} AS TIMESTAMP))), CAST({col} AS TIMESTAMP))",
    17. "PT1M": "TIMESTAMPADD(SECOND, -(MINUTE(CAST({col} AS TIMESTAMP))*60+SECOND(CAST({col} AS TIMESTAMP))), CAST({col} AS TIMESTAMP))",
    18. "PT1H": "TIMESTAMPADD(SECOND, -(HOUR(CAST({col} AS TIMESTAMP))*60*60+MINUTE(CAST({col} AS TIMESTAMP))*60+SECOND(CAST({col} AS TIMESTAMP))), CAST({col} AS TIMESTAMP))",
    19. "P1D": "CAST(CAST({col} AS TIMESTAMP) AS DATE)",
    20. "P1W": "CAST(TIMESTAMPADD(DAY, -(DAYOFWEEK(CAST({col} AS TIMESTAMP))-1), CAST({col} AS TIMESTAMP)) AS DATE)",
    21. "P1M": "CAST(TIMESTAMPADD(DAY, -(DAYOFMONTH(CAST({col} AS TIMESTAMP))-1), CAST({col} AS TIMESTAMP)) AS DATE)",
    22. "P0.25Y": "CAST(TIMESTAMPADD(MONTH, (QUARTER(CAST({col} AS TIMESTAMP))-1)*3, CAST((CAST(YEAR(CAST({col} AS TIMESTAMP)) AS VARCHAR) + '-01-01 00:00:00') AS TIMESTAMP)) AS DATE)",
    23. "P1Y": "CAST(TIMESTAMPADD(DAY, -(DAYOFYEAR(CAST({col} AS TIMESTAMP))-1), CAST({col} AS TIMESTAMP)) AS DATE)",
    24. }

    重启解决(转换的convert函数没详细测试)

     Superset KylinSql转换2.0(推荐)

    使用sqlparse库解析语法树token,稳定性和准确率更高

    1. import re
    2. import sqlparse
    3. def parse_select(sql: str, child=False):
    4. part_list = []
    5. sql = process_sql(sql)
    6. parse = sqlparse.parse(sql)
    7. where = "WHERE"
    8. for statement in parse:
    9. for token in statement.tokens:
    10. if not token.is_whitespace:
    11. token_value = token.value
    12. if not token.is_keyword and child:
    13. try:
    14. token_tokens = token.tokens
    15. if token_tokens[1].value != ',':
    16. part_list.append(token_value)
    17. continue
    18. except AttributeError as ex:
    19. token_tokens = []
    20. for tt in token_tokens:
    21. # print("值:", tt.value)
    22. part_list.append(tt.value)
    23. elif token_value.startswith(where):
    24. part_list.append(where)
    25. part_list.append(token_value[len(where):])
    26. else:
    27. # print("值:", token.value)
    28. part_list.append(token_value)
    29. # print("")
    30. return part_list
    31. def list2map(token: list):
    32. if token[-1] == ';':
    33. del token[-1]
    34. i = len(token)
    35. m = {}
    36. if i % 2 == 0:
    37. for ii in range(0, i, 2):
    38. # print(l[ii])
    39. m[token[ii]] = token[ii + 1]
    40. else:
    41. print("list len error")
    42. # print(m)
    43. return m
    44. def process_sql(sql: str):
    45. # 将所有换行替换为空格
    46. sql = re.sub('\r|\n|\r\n', ' ', sql)
    47. # 2个及以上的空格替换为一个空格
    48. sql = re.sub('[ ]{2,}', ' ', sql)
    49. # print(sql)
    50. return sql
    51. def parse_alias(sql):
    52. pattern = r'(.*)AS\s+("[^"]+")'
    53. result = re.search(pattern, sql)
    54. if result and len(result.groups()) > 0:
    55. return result.groups()
    56. else:
    57. return None
    58. def detail(list_map: map):
    59. select_ = list_map['SELECT']
    60. select = parse_select(select_, True)
    61. # print(select)
    62. list_map['SELECT'] = select
    63. # 解析select的字段的field sql和alias
    64. mid_select = [parse_alias(x) if x != ',' else None for x in select]
    65. select_detail = {}
    66. for x in mid_select:
    67. if x is not None:
    68. select_detail[x[0].strip()] = x[1]
    69. list_map['detail_select'] = select_detail
    70. # 使用select字段替换group by字段
    71. group_by_ = list_map['GROUP BY']
    72. group_l = parse_select(group_by_, True)
    73. new_group_by = []
    74. for g in group_l:
    75. if select_detail.keys().__contains__(g):
    76. new_group_by.append(select_detail[g])
    77. else:
    78. new_group_by.append(g)
    79. list_map['GROUP BY'] = new_group_by
    80. # print(list_map)
    81. del list_map['detail_select']
    82. return list_map
    83. def convert(sql):
    84. try:
    85. select_l = parse_select(sql)
    86. list_map: map = list2map(select_l)
    87. list_map: map = detail(list_map)
    88. new_sql = "SELECT * FROM("
    89. for k in list_map.keys():
    90. new_sql += (k + " ")
    91. v = list_map[k]
    92. if type(v) == list:
    93. new_sql += ("".join(v) + " ")
    94. else:
    95. new_sql += (v + " ")
    96. if k == 'FROM':
    97. new_sql += ')tmp_convert_table '
    98. new_sql = process_sql(new_sql)
    99. print(f"转换后的Kylin Sql:\n{new_sql}")
    100. return new_sql
    101. except Exception as ex:
    102. print(ex)
    103. print(f"未转换的Kylin Sql:\n{sql}")
    104. return sql
    105. #最终解决版本,支持统计count(*)
    106. def convert1(sql):
    107. if not process_sql(sql).lower().__contains__("group by"):
    108. return sql
    109. # 修改group by字段的别名不合原始字段重复,然后再select *将字段别名改回来
    110. select_l = parse_select(sql)
    111. list_map: map = list2map(select_l)
    112. # 获取select 字段
    113. select_ = list_map['SELECT']
    114. select = parse_select(select_, True)
    115. # 解析select的字段的field sql和alias
    116. mid_select = [parse_alias(x) if x != ',' else None for x in select]
    117. select_detail = {}
    118. for x in mid_select:
    119. if x is not None:
    120. select_detail[x[0].strip()] = x[1]
    121. # 获取group by字段
    122. group_by_ = list_map['GROUP BY']
    123. group_l = parse_select(group_by_, True)
    124. # 获取别名,获取select字段
    125. alias_map = {}
    126. for g in group_l:
    127. if select_detail.keys().__contains__(g):
    128. raw_alias = select_detail[g]
    129. new_alias = raw_alias.replace('"', '') + "_DIY"
    130. new_alias = f'"{new_alias}"'
    131. alias_map[new_alias] = raw_alias
    132. select_detail[g]=new_alias
    133. # 基于原始sql,提取group by的字段的别名,修改别名值然后获取别名字段集合
    134. # 获取字段别名,然后格式化占位符 select 字段 from 原始sql
    135. list_map['SELECT'] = ",".join([f'{k} AS {select_detail[k]}' for k in select_detail.keys()])
    136. raw_sql = ' '.join([f'{k} {list_map[k]}' for k in list_map.keys()])
    137. last_alias = [f'{v} AS {alias_map[v]}' if alias_map.keys().__contains__(v) else v for v in select_detail.values()]
    138. new_sql = f'select {",".join(last_alias)} from ({raw_sql})'
    139. new_sql = process_sql(new_sql)
    140. print(new_sql)
    141. return new_sql

    支持superset的虚拟表,主要功能替换group by后面的sql为别名字段

  • 相关阅读:
    凉鞋的 Unity 笔记 105. 第一个通识:编辑-测试 循环
    Reactive Java 技术栈
    聚焦云原生安全|如何为5G边缘云和工业互联网应用筑牢安全防线
    13-JavaSE基础巩固练习:ArrayList集合的练习
    在linux下使用ffmpeg方法
    Selenium获取百度百科旅游景点的InfoBox消息盒
    在Kubernetes(k8s)上部署整个SpringCloud微服务应用
    针对 DNS 监控的 Grafana Dashboard面板DeepFlow
    原生js做打地鼠游戏
    指针进阶(续)
  • 原文地址:https://blog.csdn.net/qq_18453581/article/details/133796526