• Python trino执行hive insert overwrite不生效的问题


    使用python的trino包执行insert overwrite,但是overwrite却没有生效的问题

    根据trino的官网介绍的insert overwrite的开启方式,开启hive的insert overwrite会话,使当前会话的insert into语句支持insert overwrite,也即支持插入数据根据分区覆盖更新的功能

    但是在使用Python代码执行时总是没有生效,在花了不少时间debug之后,终于找到了原因竟是因为开启insert overwrite session的语句没有执行fetchall()而导致没有生效

    下面是执行的的代码示例

    import trino
    
    def insert_overwrite_query(trino, ...):
        conn = trino.dbapi.connect()
        cursor: Cursor = conn.cursor()
        overwrite_sql = "SET SESSION hive.insert_existing_partitions_behavior = 'OVERWRITE'"
        cursor.execute(overwrite_sql)
        query = """insert into hive.schema.table ..."""
        cursor.execute(query)
       	res = cursor.fetchall()
       	...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    overwrite_sql在执行之后需要执行cursor.fetchall()才会生效,在此之前实在不明白这条sql为什么需要fetchall()?fetchall()的作用不是在需要返回数据时才执行吗?

    在一番搜寻之后才了解到fetchall()的真正运作机制

    • 通常查询的结果集会存储在内存之中,好的数据库连接处理一般会将查询结果集存放在服务端的内存,在接收请求时才会返回给客户端,也就是说只有在执行fetchall()之后前面的insert overwrite session也会确切地被执行

    另外就是python trino中的fetchall()的作用了,一番源码观察之后,确定了trino的查询机制

    • 查询是分批次查询的,数据量大的数据会首先使用post请求查询一批数据,然后接下来如果客户端还需要剩余的结果,会使用get请求查询接下来的所有数据之后结束或者连接中断

    根据cursor.fetchall() 找到了其数据实际上来自于一个生成器对象TrinoResult,在fetchall()的时候实际上执行了list(TrinoResult(...))取得了查询结果(实际上你执行list(cursor.execute(query))也会得到fetchall()相同的结果)

    class TrinoResult(object):
        def __init__(self, query, rows=None, experimental_python_types: bool = False):
            self._query = query
            self._rows = rows or []
            self._rownumber = 0
            self._experimental_python_types = experimental_python_types
    	
    	...
    	
        def __iter__(self):
            # Initial fetch from the first POST request
            for row in self._rows:
                self._rownumber += 1
                yield row
            self._rows = None
    
            # Subsequent fetches from GET requests until next_uri is empty.
            while not self._query.finished:
                rows = self._query.fetch()
                for row in rows:
                    self._rownumber += 1
                    logger.debug("row %s", row)
                    if not self._experimental_python_types:
                        yield row
                    else:
                        yield self._map_to_python_types(row, self._query.columns)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    总之,在session sql后面需要在执行一次fetchall()才会使开启session的sql生效,但是总觉得有点不符合直觉

  • 相关阅读:
    案例分享|生产环境MQ集群一个非常诡异的消费延迟排查
    有哪些不起眼却赚钱的行业?
    外骨骼机器人关键技术与核心问题
    Python 写网络监控
    【gitlab】本地项目上传gitlab
    Go语言躲坑经验总结
    k8s 读书笔记 - 初始化容器 Init Conatiner
    shell_57.Linux创建自己的重定向
    【力扣】304. 二维区域和检索 - 矩阵不可变 <二维前缀和>
    vue项目推荐组件/工具库清单
  • 原文地址:https://blog.csdn.net/Moelimoe/article/details/125456931