• python利用pandas.DataFram批量写入clickhouse


    python 包引入 

    1. import json
    2. import logging
    3. import math
    4. import os
    5. import pandas as pd
    6. import datetime
    7. import requests
    8. from clickhouse_driver import Client
    9. from cmc.config import config
    10. #在类外定义全局变量,这样当前这个py文件都可以共享
    11. process_date = datetime.datetime.now().strftime("%Y%m%d")
    12. class MyPyClassDemo:
    13. def __init__(self, api_key: str):
    14. self.api_key = api_key
    15. def tstFuctions(self):
    16. pass
    17. ....
    1. def getClickHouseClient(self):
    2. try:
    3. host_name = 'xxxx.xxx.com'
    4. client = Client(
    5. host=host_name,
    6. database='your db name',
    7. user='root',
    8. password='123123',
    9. send_receive_timeout=20,
    10. settings={'use_numpy': True}
    11. )
    12. return client
    13. except Exception as e:
    14. print("Error: "+str(e))
    15. return None

    注意这里一定要有 settings={'use_numpy': True} 这个设置,否则会报错:

    TypeError: Unsupported column type: ndarray'>. list or tuple is expected.

    1. def process_json_files(self):
    2. tmp_dir = 'out/cmc_data/'
    3. files = os.listdir(tmp_dir) #获取当前目录下的所有文件名称列表
    4. print(files)
    5. storage_client = self.getClickHouseClient() #加载CH数据库连接
    6. for file in files:
    7. if not file.startswith('cmc_projects'):
    8. continue
    9. with open(tmp_dir + file, 'r') as f: #根据相对路径读取json文件
    10. json_string = f.read() #获取json字符串
    11. data_list = json.loads(json_string) #转成list类型
    12. #df = pd.DataFrame.from_dict(json_normalize(data_list), orient='columns') #按列读取(也包含json格式当中嵌套的列),用全部读取到的列来构造dFrame当中的类
    13. df = pd.json_normalize(data_list) # 新版本的推荐写法
    14. #print(df.T) # 打印读取到的列
    15. insert_df = df[['id', 'name', 'symbol', 'slug', 'rank', 'is_active', 'first_historical_data', 'platform.id', 'platform.name', 'platform.symbol', 'platform.slug', 'platform.token_address']] # 抽取指定的列,重组新的dframe
    16. insert_df.insert(loc=12, column='update_time', value=process_date) # 在新重组的dframe当中插入一列(数据写入的日期)
    17. insert_df["platform.id"] = insert_df["platform.id"].apply(self.modify) # 通过apply() 修改某一列的值
    18. #insert_df = insert_df.loc[0:1] # (调试)取第0行
    19. #insert_df.iloc[:,0:12] # (调试)取0-12列
    20. insert_df.rename(columns={"platform.id": "platform_id", "platform.name": "platform_name", "platform.symbol": "platform_symbol", "platform.slug": "platform_slug", "platform.token_address": "token_address"}, inplace=True) # 要求dframe当中的列字段必须与CH数据库当中的列字段一一对应,否则报keyError错
    21. #print(insert_df)
    22. #print(type(insert_df["platform_id"]))
    23. storage_client.insert_dataframe('INSERT INTO tstdb.ods_infos (*) VALUES', insert_df) # 用CH提供的client批量将的frame当中的数据一次刷入CH当中

    注意: 这里一次批量刷入CH的条数,取决于json文件当中的数据条数,可在源文件或者data_list

    的位置做数据量的控制操作

    1. def modify(self, id):
    2. if math.isnan(id) : # python 判断是nan
    3. return int(0)
    4. else :
    5. return int(id)
    6. return id

    CH建表语句和说明 

    1. CREATE TABLE tst_db.ods_infos (
    2. id UInt32,
    3. name String,
    4. symbol String,
    5. slug String,
    6. rank UInt32,
    7. is_active UInt32,
    8. first_historical_data String,
    9. platform_id UInt32,
    10. platform_name String,
    11. platform_symbol String,
    12. platform_slug String,
    13. token_address String,
    14. update_time String
    15. )
    16. ENGINE = ReplacingMergeTree
    17. PARTITION BY update_time
    18. PRIMARY KEY id -- 这里一定要设置主键,插入相同key值的数据才会覆盖跟新,否则记录会重复
    19. ORDER BY (
    20. id,
    21. name,
    22. symbol,
    23. platform_id,
    24. platform_name
    25. )
    26. SETTINGS index_granularity = 8192,
    27. storage_policy = 'policy_name_eth';


     

  • 相关阅读:
    SpringBoot整合redis
    R语言可视化散点图(scatter plot)图、为图中的部分数据点添加标签、始终显示所有标签,即使它们有太多重叠、ggrepel包来帮忙
    PostMan传时间参数&一次性发送多次请求
    MONAI Label 安装流程及使用攻略
    Docker容器之间的通信
    Springboot 之 JPA 多数据源实现
    【统计和图形分析】上海道宁为您带来测试、分析、改进和控制自身服务、交易和制造流程的强大工具——SigmaXL
    每一天的努力,都会让远方变得更近——中国人民大学与加拿大女王大学金融硕士
    LeetCode 86 双周赛
    adb server version (19045) doesn‘t match this client (41); killing.的解决办法
  • 原文地址:https://blog.csdn.net/gwd777/article/details/126600066