- import pandas as pd
- import sqlalchemy
- from clickhouse_sqlalchemy import Table, engines
- from sqlalchemy import create_engine, MetaData, Column
- import urllib.parse
-
- host = '1.1.1.1'
- user = 'default'
- password = 'default'
- db = 'test'
- port = 8123 # http连接端口
- engine = create_engine('clickhouse://{user}:{password}@{host}:{port}/{db}'
- .format(user = user,
- host = host,
- password = urllib.parse.quote_plus(password),
- db = db,
- port = port),
- pool_size = 30,max_overflow = 0,
- pool_pre_ping=True , pool_recycle= 3600)
- port = 9000 # Tcp/Ip连接端口
- engine1 = create_engine('clickhouse+native://{user}:{password}@{host}:{port}/{db}'
- .format(user = user,
- host = host,
- password = urllib.parse.quote_plus(password),
- db = db,
- port = port),
- pool_size = 30,max_overflow = 0,
- pool_pre_ping=True , pool_recycle=3600)
-
- # https://github.com/xzkostyan/clickhouse-sqlalchemy/issues/129
- # 参考文档https://github.com/xzkostyan/clickhouse-sqlalchemy
- # pip install sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
- # pip install clickhouse-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
-
- class ClickhouseDf(object):
- def __init__(self, **kwargs):
- self.engines_dict = {
- "MergeTree": engines.MergeTree,
- "AggregatingMergeTree": engines.AggregatingMergeTree,
- "GraphiteMergeTree": engines.GraphiteMergeTree,
- "CollapsingMergeTree": engines.CollapsingMergeTree,
- "VersionedCollapsingMergeTree": engines.VersionedCollapsingMergeTree,
- "SummingMergeTree": engines.SummingMergeTree,
- "ReplacingMergeTree": engines.ReplacingMergeTree,
- "Distributed": engines.Distributed,
- "ReplicatedMergeTree": engines.ReplicatedMergeTree,
- "ReplicatedAggregatingMergeTree": engines.ReplicatedAggregatingMergeTree,
- "ReplicatedCollapsingMergeTree": engines.ReplicatedCollapsingMergeTree,
- "ReplicatedVersionedCollapsingMergeTree": engines.ReplicatedVersionedCollapsingMergeTree,
- "ReplicatedReplacingMergeTree": engines.ReplicatedReplacingMergeTree,
- "ReplicatedSummingMergeTree": engines.ReplicatedSummingMergeTree,
- "View": engines.View,
- "MaterializedView": engines.MaterializedView,
- "Buffer": engines.Buffer,
- "TinyLog": engines.TinyLog,
- "Log": engines.Log,
- "Memory": engines.Memory,
- "Null": engines.Null,
- "File": engines.File
- }
- self.table_engine = kwargs.get("table_engine", "MergeTree") # 默认引擎选择
- if self.table_engine not in self.engines_dict.keys():
- raise ValueError("No engine for this table")
-
- def _createORMTable(self, df, name, con, schema, **kwargs):
- col_dtype_dict = {
- "object": sqlalchemy.Text,
- "int64": sqlalchemy.Integer,
- "int32": sqlalchemy.Integer,
- "int16": sqlalchemy.Integer,
- "int8": sqlalchemy.Integer,
- "int": sqlalchemy.Integer,
- "float64": sqlalchemy.Float,
- "float32": sqlalchemy.Float,
- "float16": sqlalchemy.Float,
- "float8": sqlalchemy.Float,
- "float": sqlalchemy.Float,
- }
- primary_key = kwargs.get("primary_key", [])
- df_col = df.columns.tolist()
- metadata = MetaData(bind=con, schema=schema)
-
- _table_check_col = []
- for col in df_col:
- col_dtype = str(df.dtypes[col])
- if col_dtype not in col_dtype_dict.keys():
- if col in primary_key:
- _table_check_col.append(Column(col, col_dtype_dict["object"], primary_key=True))
- else:
- _table_check_col.append(Column(col, col_dtype_dict["object"]))
- else:
- if col in primary_key:
- _table_check_col.append(Column(col, col_dtype_dict[col_dtype], primary_key=True))
- else:
- _table_check_col.append(Column(col, col_dtype_dict[col_dtype]))
- _table_check = Table(name, metadata,
- *_table_check_col,
- self.engines_dict[self.table_engine](primary_key=primary_key)
- )
- return _table_check
-
-
- def _checkTable(self, name, con, schema):
- sql_str = f"EXISTS {schema}.{name}"
- if con.execute(sql_str).fetchall() == [(0,)]:
- return 0
- else:
- return 1
-
-
- def to_sql(self, df, name: str, con, schema=None, if_exists="fail",**kwargs):
- '''
- 将DataFrame格式数据插入Clickhouse中
- {'fail', 'replace', 'append'}, default 'fail'
- '''
- if self.table_engine in ["MergeTree"]: # 表格必须有主键的引擎列表-暂时只用这种,其他未测试
- self.primary_key = kwargs.get("primary_key", [df.columns.tolist()[0]])
- else:
- self.primary_key = kwargs.get("primary_key", [])
-
- orm_table = self._createORMTable(df, name, con, schema, primary_key=self.primary_key)
- tanle_exeit = self._checkTable(name, con, schema)
- # 创建表
- if if_exists == "fail":
- if tanle_exeit:
- raise ValueError(f"table already exists :{name} ")
- else:
- orm_table.create()
- if if_exists == "replace":
- if tanle_exeit:
- orm_table.drop()
- orm_table.create()
- else:
- orm_table.create()
- if if_exists == "append":
- if not tanle_exeit:
- orm_table.create()
-
- # http连接下会自动将None填充为空字符串以入库,tcp/ip模式下则不会,会导致引擎报错,需要手动填充。
- df_dict = df.to_dict(orient="records")
- con.execute(orm_table.insert(), df_dict)
- # df.to_sql(name, con, schema, index=False, if_exists="append")
-
-
- if __name__ == '__main__':
-
- # 使用方法
- cdf = ClickhouseDf()
-
- df = pd.DataFrame({'column1': [1, 2, 3],
- 'column2': ['A', 'B', 'C']})
-
- db = 'default'
- password = ''
- user = 'default'
- port = 9090
- host = '192.168.76.136'
- engine = create_engine('clickhouse+native://{user}:{password}@{host}:{port}/{db}'
- .format(user=user,
- host=host,
- password=urllib.parse.quote_plus(password),
- db=db,
- port=port),
- pool_size=30, max_overflow=0,
- pool_pre_ping=True, pool_recycle=3600)
-
- with engine.connect() as conn:
- cdf.to_sql(df, "table_name", conn, schema='default', if_exists="replace")
-
- list = engine.connect().execute("SELECT * FROM table_name").fetchall()
- print(list)
1) 运行需要安装包
# pip install sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
# pip install clickhouse-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
2)cdf.to_sql(df, "table_name", conn, schema='default', if_exists="replace")
这里的 schema 一定要写,判断表是否存在 是用
if con.execute('EXISTS default.table_name') == [(0,)]: 来判断表是否存在的
参考链接: SQLAlchemy_clickhouse_sqlalchemy-CSDN博客
https://github.com/xzkostyan/clickhouse-sqlalchemy