安装kylinpy
pip install kylinpy
修改site-packages/kylinpy/sqla_dialect.py文件
- #def _compose_select_body(self, text, select, inner_columns, froms, byfrom, kwargs):
- # text = super(KylinSQLCompiler, self)._compose_select_body(
- # text, select, inner_columns, froms, byfrom, kwargs)
- # return text
return False ==> return table_name in self.get_table_names(connection, schema)
kylin4使用的spark引擎,语法group by不支持别名前面的表达式,修改为使用别名代替
- import re
- def between(_sql, start_s, end_s):
- group_start = _sql.find(start_s)
- if group_start == -1:
- print("未找到group by")
- return
- group_end = _sql.find(end_s)
- if group_end == -1:
- group_end = len(_sql)
- groups = _sql[group_start + len(start_s):group_end]
- print(groups)
- return groups
-
-
- def select_map(_sql):
- fields = between(_sql, "select", "from")
- field_arr = fields.split(",")
- f_map = {}
- for field in field_arr:
- findall = re.findall(r"(.*)\s+as?\s+(\"[^\"]+\")", field)
- if findall:
- findall_ = findall[0]
- expression = findall_[0]
- alias = findall_[1]
- print(expression, alias)
- f_map[expression] = alias
- return f_map
- def convert(r_sql):
- lower_sql = r_sql.lower()
- if lower_sql.count("from") == 1 and lower_sql.__contains__("group by"):
- _sql = re.sub(r"\s{2,}", " ", lower_sql)
- print(_sql)
- # 获取select的字段map
- _select_map = select_map(_sql)
- groups = between(_sql, "group by", "order by")
- raw_group = "group by" + groups
- group_arr = groups.split(",")
- new_group = []
- for g in group_arr:
- g_v = _select_map[g.rstrip()]
- if g_v:
- new_group.append(groups.replace(g, g_v))
- new_group = "group by " + ",".join(new_group) + " "
- _sql = _sql.replace(raw_group, new_group).upper()
- print(_sql)
- return _sql
- else:
- return r_sql
- # #默认的时间粒度表达式
- # _time_grain_expressions = {
- # None: "{col}",
- # TimeGrain.SECOND: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO SECOND) AS TIMESTAMP)",
- # TimeGrain.MINUTE: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO MINUTE) AS TIMESTAMP)",
- # TimeGrain.HOUR: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO HOUR) AS TIMESTAMP)",
- # TimeGrain.DAY: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO DAY) AS DATE)",
- # TimeGrain.WEEK: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO WEEK) AS DATE)",
- # TimeGrain.MONTH: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO MONTH) AS DATE)",
- # TimeGrain.QUARTER: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO QUARTER) AS DATE)",
- # TimeGrain.YEAR: "CAST(FLOOR(CAST({col} AS TIMESTAMP) TO YEAR) AS DATE)",
- # }
- #kylin4 spark引擎支持的语法
-
-
- _time_grain_expressions = {
- None: "{col}",
- "PT1S": "TIMESTAMPADD(SECOND, -(SECOND(CAST({col} AS TIMESTAMP))), CAST({col} AS TIMESTAMP))",
- "PT1M": "TIMESTAMPADD(SECOND, -(MINUTE(CAST({col} AS TIMESTAMP))*60+SECOND(CAST({col} AS TIMESTAMP))), CAST({col} AS TIMESTAMP))",
- "PT1H": "TIMESTAMPADD(SECOND, -(HOUR(CAST({col} AS TIMESTAMP))*60*60+MINUTE(CAST({col} AS TIMESTAMP))*60+SECOND(CAST({col} AS TIMESTAMP))), CAST({col} AS TIMESTAMP))",
- "P1D": "CAST(CAST({col} AS TIMESTAMP) AS DATE)",
- "P1W": "CAST(TIMESTAMPADD(DAY, -(DAYOFWEEK(CAST({col} AS TIMESTAMP))-1), CAST({col} AS TIMESTAMP)) AS DATE)",
- "P1M": "CAST(TIMESTAMPADD(DAY, -(DAYOFMONTH(CAST({col} AS TIMESTAMP))-1), CAST({col} AS TIMESTAMP)) AS DATE)",
- "P0.25Y": "CAST(TIMESTAMPADD(MONTH, (QUARTER(CAST({col} AS TIMESTAMP))-1)*3, CAST((CAST(YEAR(CAST({col} AS TIMESTAMP)) AS VARCHAR) + '-01-01 00:00:00') AS TIMESTAMP)) AS DATE)",
- "P1Y": "CAST(TIMESTAMPADD(DAY, -(DAYOFYEAR(CAST({col} AS TIMESTAMP))-1), CAST({col} AS TIMESTAMP)) AS DATE)",
- }
重启解决(转换的convert函数没详细测试)
使用sqlparse库解析语法树token,稳定性和准确率更高
- import re
- import sqlparse
-
-
- def parse_select(sql: str, child=False):
- part_list = []
- sql = process_sql(sql)
- parse = sqlparse.parse(sql)
- where = "WHERE"
- for statement in parse:
- for token in statement.tokens:
- if not token.is_whitespace:
- token_value = token.value
- if not token.is_keyword and child:
- try:
- token_tokens = token.tokens
- if token_tokens[1].value != ',':
- part_list.append(token_value)
- continue
- except AttributeError as ex:
- token_tokens = []
- for tt in token_tokens:
- # print("值:", tt.value)
- part_list.append(tt.value)
- elif token_value.startswith(where):
- part_list.append(where)
- part_list.append(token_value[len(where):])
- else:
- # print("值:", token.value)
- part_list.append(token_value)
- # print("")
- return part_list
-
-
- def list2map(token: list):
- if token[-1] == ';':
- del token[-1]
- i = len(token)
- m = {}
- if i % 2 == 0:
- for ii in range(0, i, 2):
- # print(l[ii])
- m[token[ii]] = token[ii + 1]
- else:
- print("list len error")
- # print(m)
- return m
-
-
- def process_sql(sql: str):
- # 将所有换行替换为空格
- sql = re.sub('\r|\n|\r\n', ' ', sql)
- # 2个及以上的空格替换为一个空格
- sql = re.sub('[ ]{2,}', ' ', sql)
- # print(sql)
- return sql
-
-
- def parse_alias(sql):
- pattern = r'(.*)AS\s+("[^"]+")'
- result = re.search(pattern, sql)
- if result and len(result.groups()) > 0:
- return result.groups()
- else:
- return None
-
-
- def detail(list_map: map):
- select_ = list_map['SELECT']
- select = parse_select(select_, True)
- # print(select)
- list_map['SELECT'] = select
-
- # 解析select的字段的field sql和alias
- mid_select = [parse_alias(x) if x != ',' else None for x in select]
- select_detail = {}
- for x in mid_select:
- if x is not None:
- select_detail[x[0].strip()] = x[1]
- list_map['detail_select'] = select_detail
-
- # 使用select字段替换group by字段
- group_by_ = list_map['GROUP BY']
- group_l = parse_select(group_by_, True)
-
- new_group_by = []
- for g in group_l:
- if select_detail.keys().__contains__(g):
- new_group_by.append(select_detail[g])
- else:
- new_group_by.append(g)
- list_map['GROUP BY'] = new_group_by
- # print(list_map)
-
- del list_map['detail_select']
-
- return list_map
-
-
- def convert(sql):
- try:
- select_l = parse_select(sql)
- list_map: map = list2map(select_l)
- list_map: map = detail(list_map)
-
- new_sql = "SELECT * FROM("
- for k in list_map.keys():
- new_sql += (k + " ")
- v = list_map[k]
- if type(v) == list:
- new_sql += ("".join(v) + " ")
- else:
- new_sql += (v + " ")
- if k == 'FROM':
- new_sql += ')tmp_convert_table '
- new_sql = process_sql(new_sql)
- print(f"转换后的Kylin Sql:\n{new_sql}")
- return new_sql
- except Exception as ex:
- print(ex)
- print(f"未转换的Kylin Sql:\n{sql}")
- return sql
-
- #最终解决版本,支持统计count(*)
- def convert1(sql):
- if not process_sql(sql).lower().__contains__("group by"):
- return sql
- # 修改group by字段的别名不合原始字段重复,然后再select *将字段别名改回来
- select_l = parse_select(sql)
- list_map: map = list2map(select_l)
- # 获取select 字段
- select_ = list_map['SELECT']
- select = parse_select(select_, True)
- # 解析select的字段的field sql和alias
- mid_select = [parse_alias(x) if x != ',' else None for x in select]
- select_detail = {}
- for x in mid_select:
- if x is not None:
- select_detail[x[0].strip()] = x[1]
-
- # 获取group by字段
- group_by_ = list_map['GROUP BY']
- group_l = parse_select(group_by_, True)
-
- # 获取别名,获取select字段
- alias_map = {}
- for g in group_l:
- if select_detail.keys().__contains__(g):
- raw_alias = select_detail[g]
- new_alias = raw_alias.replace('"', '') + "_DIY"
- new_alias = f'"{new_alias}"'
- alias_map[new_alias] = raw_alias
- select_detail[g]=new_alias
- # 基于原始sql,提取group by的字段的别名,修改别名值然后获取别名字段集合
-
- # 获取字段别名,然后格式化占位符 select 字段 from 原始sql
- list_map['SELECT'] = ",".join([f'{k} AS {select_detail[k]}' for k in select_detail.keys()])
- raw_sql = ' '.join([f'{k} {list_map[k]}' for k in list_map.keys()])
- last_alias = [f'{v} AS {alias_map[v]}' if alias_map.keys().__contains__(v) else v for v in select_detail.values()]
-
- new_sql = f'select {",".join(last_alias)} from ({raw_sql})'
- new_sql = process_sql(new_sql)
- print(new_sql)
- return new_sql
支持superset的虚拟表,主要功能替换group by后面的sql为别名字段