• python利用pandas.DataFram批量写入clickhouse


    python 包引入 

    1. import json
    2. import logging
    3. import math
    4. import os
    5. import pandas as pd
    6. import datetime
    7. import requests
    8. from clickhouse_driver import Client
    9. from cmc.config import config
    10. #在类外定义全局变量,这样当前这个py文件都可以共享
    11. process_date = datetime.datetime.now().strftime("%Y%m%d")
    12. class MyPyClassDemo:
    13. def __init__(self, api_key: str):
    14. self.api_key = api_key
    15. def tstFuctions(self):
    16. pass
    17. ....
    1. def getClickHouseClient(self):
    2. try:
    3. host_name = 'xxxx.xxx.com'
    4. client = Client(
    5. host=host_name,
    6. database='your db name',
    7. user='root',
    8. password='123123',
    9. send_receive_timeout=20,
    10. settings={'use_numpy': True}
    11. )
    12. return client
    13. except Exception as e:
    14. print("Error: "+str(e))
    15. return None

    注意这里一定要有 settings={'use_numpy': True} 这个设置,否则会报错:

    TypeError: Unsupported column type: ndarray'>. list or tuple is expected.

    1. def process_json_files(self):
    2. tmp_dir = 'out/cmc_data/'
    3. files = os.listdir(tmp_dir) #获取当前目录下的所有文件名称列表
    4. print(files)
    5. storage_client = self.getClickHouseClient() #加载CH数据库连接
    6. for file in files:
    7. if not file.startswith('cmc_projects'):
    8. continue
    9. with open(tmp_dir + file, 'r') as f: #根据相对路径读取json文件
    10. json_string = f.read() #获取json字符串
    11. data_list = json.loads(json_string) #转成list类型
    12. #df = pd.DataFrame.from_dict(json_normalize(data_list), orient='columns') #按列读取(也包含json格式当中嵌套的列),用全部读取到的列来构造dFrame当中的类
    13. df = pd.json_normalize(data_list) # 新版本的推荐写法
    14. #print(df.T) # 打印读取到的列
    15. insert_df = df[['id', 'name', 'symbol', 'slug', 'rank', 'is_active', 'first_historical_data', 'platform.id', 'platform.name', 'platform.symbol', 'platform.slug', 'platform.token_address']] # 抽取指定的列,重组新的dframe
    16. insert_df.insert(loc=12, column='update_time', value=process_date) # 在新重组的dframe当中插入一列(数据写入的日期)
    17. insert_df["platform.id"] = insert_df["platform.id"].apply(self.modify) # 通过apply() 修改某一列的值
    18. #insert_df = insert_df.loc[0:1] # (调试)取第0行
    19. #insert_df.iloc[:,0:12] # (调试)取0-12列
    20. insert_df.rename(columns={"platform.id": "platform_id", "platform.name": "platform_name", "platform.symbol": "platform_symbol", "platform.slug": "platform_slug", "platform.token_address": "token_address"}, inplace=True) # 要求dframe当中的列字段必须与CH数据库当中的列字段一一对应,否则报keyError错
    21. #print(insert_df)
    22. #print(type(insert_df["platform_id"]))
    23. storage_client.insert_dataframe('INSERT INTO tstdb.ods_infos (*) VALUES', insert_df) # 用CH提供的client批量将的frame当中的数据一次刷入CH当中

    注意: 这里一次批量刷入CH的条数,取决于json文件当中的数据条数,可在源文件或者data_list

    的位置做数据量的控制操作

    1. def modify(self, id):
    2. if math.isnan(id) : # python 判断是nan
    3. return int(0)
    4. else :
    5. return int(id)
    6. return id

    CH建表语句和说明 

    1. CREATE TABLE tst_db.ods_infos (
    2. id UInt32,
    3. name String,
    4. symbol String,
    5. slug String,
    6. rank UInt32,
    7. is_active UInt32,
    8. first_historical_data String,
    9. platform_id UInt32,
    10. platform_name String,
    11. platform_symbol String,
    12. platform_slug String,
    13. token_address String,
    14. update_time String
    15. )
    16. ENGINE = ReplacingMergeTree
    17. PARTITION BY update_time
    18. PRIMARY KEY id -- 这里一定要设置主键,插入相同key值的数据才会覆盖跟新,否则记录会重复
    19. ORDER BY (
    20. id,
    21. name,
    22. symbol,
    23. platform_id,
    24. platform_name
    25. )
    26. SETTINGS index_granularity = 8192,
    27. storage_policy = 'policy_name_eth';


     

  • 相关阅读:
    磁盘的结构
    ChatGPT研究报告:AIGC带来新一轮范式转移
    瞬间抠图!揭秘 ZEGO 绿幕抠图算法背后的技术
    2023-10-13 LeetCode每日一题(避免洪水泛滥)
    Linux的shell脚本在线转换为Windows的bat脚本
    746. 使用最小花费爬楼梯
    华为防火墙双机热备技术:HRP、VGMP、VRRP,三大技术值得一学!
    第十一章 api mgmnt API 参考
    Request 爬虫的 SSL 连接问题深度解析
    Oracle数据库ORA-12520问题处理
  • 原文地址:https://blog.csdn.net/gwd777/article/details/126600066