• python pandas.DataFrame 直接写入Clickhouse


    1. import pandas as pd
    2. import sqlalchemy
    3. from clickhouse_sqlalchemy import Table, engines
    4. from sqlalchemy import create_engine, MetaData, Column
    5. import urllib.parse
    6. host = '1.1.1.1'
    7. user = 'default'
    8. password = 'default'
    9. db = 'test'
    10. port = 8123 # http连接端口
    11. engine = create_engine('clickhouse://{user}:{password}@{host}:{port}/{db}'
    12. .format(user = user,
    13. host = host,
    14. password = urllib.parse.quote_plus(password),
    15. db = db,
    16. port = port),
    17. pool_size = 30,max_overflow = 0,
    18. pool_pre_ping=True , pool_recycle= 3600)
    19. port = 9000 # Tcp/Ip连接端口
    20. engine1 = create_engine('clickhouse+native://{user}:{password}@{host}:{port}/{db}'
    21. .format(user = user,
    22. host = host,
    23. password = urllib.parse.quote_plus(password),
    24. db = db,
    25. port = port),
    26. pool_size = 30,max_overflow = 0,
    27. pool_pre_ping=True , pool_recycle=3600)
    28. # https://github.com/xzkostyan/clickhouse-sqlalchemy/issues/129
    29. # 参考文档https://github.com/xzkostyan/clickhouse-sqlalchemy
    30. # pip install sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
    31. # pip install clickhouse-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
    32. class ClickhouseDf(object):
    33. def __init__(self, **kwargs):
    34. self.engines_dict = {
    35. "MergeTree": engines.MergeTree,
    36. "AggregatingMergeTree": engines.AggregatingMergeTree,
    37. "GraphiteMergeTree": engines.GraphiteMergeTree,
    38. "CollapsingMergeTree": engines.CollapsingMergeTree,
    39. "VersionedCollapsingMergeTree": engines.VersionedCollapsingMergeTree,
    40. "SummingMergeTree": engines.SummingMergeTree,
    41. "ReplacingMergeTree": engines.ReplacingMergeTree,
    42. "Distributed": engines.Distributed,
    43. "ReplicatedMergeTree": engines.ReplicatedMergeTree,
    44. "ReplicatedAggregatingMergeTree": engines.ReplicatedAggregatingMergeTree,
    45. "ReplicatedCollapsingMergeTree": engines.ReplicatedCollapsingMergeTree,
    46. "ReplicatedVersionedCollapsingMergeTree": engines.ReplicatedVersionedCollapsingMergeTree,
    47. "ReplicatedReplacingMergeTree": engines.ReplicatedReplacingMergeTree,
    48. "ReplicatedSummingMergeTree": engines.ReplicatedSummingMergeTree,
    49. "View": engines.View,
    50. "MaterializedView": engines.MaterializedView,
    51. "Buffer": engines.Buffer,
    52. "TinyLog": engines.TinyLog,
    53. "Log": engines.Log,
    54. "Memory": engines.Memory,
    55. "Null": engines.Null,
    56. "File": engines.File
    57. }
    58. self.table_engine = kwargs.get("table_engine", "MergeTree") # 默认引擎选择
    59. if self.table_engine not in self.engines_dict.keys():
    60. raise ValueError("No engine for this table")
    61. def _createORMTable(self, df, name, con, schema, **kwargs):
    62. col_dtype_dict = {
    63. "object": sqlalchemy.Text,
    64. "int64": sqlalchemy.Integer,
    65. "int32": sqlalchemy.Integer,
    66. "int16": sqlalchemy.Integer,
    67. "int8": sqlalchemy.Integer,
    68. "int": sqlalchemy.Integer,
    69. "float64": sqlalchemy.Float,
    70. "float32": sqlalchemy.Float,
    71. "float16": sqlalchemy.Float,
    72. "float8": sqlalchemy.Float,
    73. "float": sqlalchemy.Float,
    74. }
    75. primary_key = kwargs.get("primary_key", [])
    76. df_col = df.columns.tolist()
    77. metadata = MetaData(bind=con, schema=schema)
    78. _table_check_col = []
    79. for col in df_col:
    80. col_dtype = str(df.dtypes[col])
    81. if col_dtype not in col_dtype_dict.keys():
    82. if col in primary_key:
    83. _table_check_col.append(Column(col, col_dtype_dict["object"], primary_key=True))
    84. else:
    85. _table_check_col.append(Column(col, col_dtype_dict["object"]))
    86. else:
    87. if col in primary_key:
    88. _table_check_col.append(Column(col, col_dtype_dict[col_dtype], primary_key=True))
    89. else:
    90. _table_check_col.append(Column(col, col_dtype_dict[col_dtype]))
    91. _table_check = Table(name, metadata,
    92. *_table_check_col,
    93. self.engines_dict[self.table_engine](primary_key=primary_key)
    94. )
    95. return _table_check
    96. def _checkTable(self, name, con, schema):
    97. sql_str = f"EXISTS {schema}.{name}"
    98. if con.execute(sql_str).fetchall() == [(0,)]:
    99. return 0
    100. else:
    101. return 1
    102. def to_sql(self, df, name: str, con, schema=None, if_exists="fail",**kwargs):
    103. '''
    104. 将DataFrame格式数据插入Clickhouse中
    105. {'fail', 'replace', 'append'}, default 'fail'
    106. '''
    107. if self.table_engine in ["MergeTree"]: # 表格必须有主键的引擎列表-暂时只用这种,其他未测试
    108. self.primary_key = kwargs.get("primary_key", [df.columns.tolist()[0]])
    109. else:
    110. self.primary_key = kwargs.get("primary_key", [])
    111. orm_table = self._createORMTable(df, name, con, schema, primary_key=self.primary_key)
    112. tanle_exeit = self._checkTable(name, con, schema)
    113. # 创建表
    114. if if_exists == "fail":
    115. if tanle_exeit:
    116. raise ValueError(f"table already exists :{name} ")
    117. else:
    118. orm_table.create()
    119. if if_exists == "replace":
    120. if tanle_exeit:
    121. orm_table.drop()
    122. orm_table.create()
    123. else:
    124. orm_table.create()
    125. if if_exists == "append":
    126. if not tanle_exeit:
    127. orm_table.create()
    128. # http连接下会自动将None填充为空字符串以入库,tcp/ip模式下则不会,会导致引擎报错,需要手动填充。
    129. df_dict = df.to_dict(orient="records")
    130. con.execute(orm_table.insert(), df_dict)
    131. # df.to_sql(name, con, schema, index=False, if_exists="append")
    132. if __name__ == '__main__':
    133. # 使用方法
    134. cdf = ClickhouseDf()
    135. df = pd.DataFrame({'column1': [1, 2, 3],
    136. 'column2': ['A', 'B', 'C']})
    137. db = 'default'
    138. password = ''
    139. user = 'default'
    140. port = 9090
    141. host = '192.168.76.136'
    142. engine = create_engine('clickhouse+native://{user}:{password}@{host}:{port}/{db}'
    143. .format(user=user,
    144. host=host,
    145. password=urllib.parse.quote_plus(password),
    146. db=db,
    147. port=port),
    148. pool_size=30, max_overflow=0,
    149. pool_pre_ping=True, pool_recycle=3600)
    150. with engine.connect() as conn:
    151. cdf.to_sql(df, "table_name", conn, schema='default', if_exists="replace")
    152. list = engine.connect().execute("SELECT * FROM table_name").fetchall()
    153. print(list)

    1) 运行需要安装包

    # pip install sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
    # pip install clickhouse-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
     

    2)cdf.to_sql(df, "table_name", conn, schema='default', if_exists="replace")

    这里的 schema 一定要写,判断表是否存在 是用 

    if con.execute('EXISTS default.table_name') == [(0,)]: 来判断表是否存在的
    

    参考链接: SQLAlchemy_clickhouse_sqlalchemy-CSDN博客

    https://github.com/xzkostyan/clickhouse-sqlalchemy

  • 相关阅读:
    嵌入式系统,ARM微处理器特点,ARM体系结构,特征、状态、操作模式等,中断分类,JTAG调试接口
    Kafka系列之:深入理解Transformations
    monorepo
    Rasa NLU中的组件
    f-string 格式化字符串的用法
    RocketMQ 消息重复消费
    Nacos源码分析专题(二)-服务注册
    程序员地理系列知识点总结
    企业发展必不可缺——BPM系统
    volatile 类型变量提供什么保证?能使得一个非原子操作变成原子操作吗?
  • 原文地址:https://blog.csdn.net/gwd777/article/details/134008439