• python Clickhouse 分布式表介绍和创建、插入和查询数据,以及解决遇到的问题


    目录

    一、分布式表和本地表

    原理解析:

    二、Clickhouse创建分布式表结构

    三、python代码实现(亲测有效)

    四、解决遇到的问题

    解决 DB::Exception: Missing columns: 'wefgrgrfew' while processing query: 'wefgrgrfew', required columns: 'wefgrgrfew' 'wefgrgrfew': While executing ValuesBlockInputFormat. Stack trace:


    一、分布式表和本地表

    clickhouse中的表可以分为分布式表和本地表。

    分布表包括逻辑表和物理表,,逻辑表就是表机构用于查询,物理表是实际存储数据的。

    (1)分布式表:逻辑存在的表,自身不存储数据,可以理解为数据库中的视图, 一般建议使用分布式表做查询操作,分布式表引擎会将我们的查询请求路由本地表进行查询, 然后进行汇总最终返回给用户。

    (2)本地表:真正存储数据的表。

    原理解析:

    分布式(Distributed)表引擎是分布式表的代名词,它⾃身不存储任何数据,⽽是作为数据分⽚的透明代理,能够⾃动的路由数据⾄集群中的各个节点,即分布式表需要和其他数据表⼀起协同⼯作。分布式表会将接收到的读写任务,分发到各个本地表,而实际上数据的存储也是保存在各个节点的本地表中。原理如下图:

    二、Clickhouse创建分布式表结构

    分布式表创建规则:

        使用on cluster语句在集群的某台机器上执行以下代码,即可在每台机器上创建本地表和分布式表,其中⼀张本地表对应着⼀个数据分⽚,分布式表通常以本地表加“_all”命名。它与本地表形成⼀对多的映射关系,之后可以通过分布式表代理操作多张本地表。
        这里有个要注意的点,就是分布式表的表结构尽量和本地表的结构一致。如果不一致,在建表时不会报错,但在查询或者插入时可能会抛出异常。
        在集群中使用,我们要加上on cluster 的ddl,这样我们的建表语句在某一台clickhouse实例上执行一次即可分发到集群中所有实例上执行。

    (1)先在每一个分片上创建本地表:

    1. -- auto-generated definition 物理表
    2. CREATE TABLE IF NOT EXISTS test.test_log ON CLUSTER '{cluster}' (
    3. ts_date Date,
    4. ts_date_time DateTime,
    5. user_id Int64,
    6. event_type String,
    7. site_id Int64,
    8. groupon_id Int64,
    9. category_id Int64,
    10. merchandise_id Int64,
    11. search_text String
    12. -- A lot more columns...
    13. )ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test/events_local','{replica}')
    14. PARTITION BY ts_date
    15. ORDER BY (ts_date,toStartOfHour(ts_date_time),site_id,event_type)
    16. SETTINGS index_granularity = 8192;

    ReplicatedMergeTree引擎族接收两个参数:

    • ZK中该表相关数据的存储路径,ClickHouse官方建议规范化,如上面的格式/clickhouse/tables/{shard}/[database_name]/[table_name]
    • 副本名称,一般用{replica}即可。

    观察一下上述ZK路径下的znode结构与内容。

    1. [zk: localhost:2181(CONNECTED) 0] ls /clickhouse/tables/01/test/events_local
    2. [metadata, temp, mutations, log, leader_election, columns, blocks, nonincrement_block_numbers, replicas, quorum, block_numbers]
    3. [zk: localhost:2181(CONNECTED) 1] get /clickhouse/tables/04/test/events_local/columns
    4. columns format version: 1
    5. 9 columns:
    6. `ts_date` Date
    7. `ts_date_time` DateTime
    8. `user_id` Int64
    9. `event_type` String
    10. `site_id` Int64
    11. `groupon_id` Int64
    12. `category_id` Int64
    13. `merchandise_id` Int64
    14. `search_text` String
    15. # ...................
    16. [zk: localhost:2181(CONNECTED) 2] get /clickhouse/tables/07/test/events_local/metadata
    17. metadata format version: 1
    18. date column:
    19. sampling expression:
    20. index granularity: 8192
    21. mode: 0
    22. sign column:
    23. primary key: ts_date, toStartOfHour(ts_date_time), site_id, event_type
    24. data format version: 1
    25. partition key: ts_date
    26. granularity bytes: 10485760
    27. # ...................

     ReplicatedMergeTree引擎族在ZK中存储大量数据,包括且不限于表结构信息、元数据、操作日志、副本状态、数据块校验值、数据part merge过程中的选主信息等等。可见,ZK在复制表机制下扮演了元数据存储、日志框架、分布式协调服务三重角色,任务很重,所以需要额外保证ZK集群的可用性以及资源(尤其是硬盘资源)。

    (2)分布式表引擎的创建模板:

    支持分布式表的引擎是Distributed,建表DDL语句示例如下,_all只是分布式表名比较通用的后缀而已。

    1. -- auto-generated definition 逻辑表
    2. CREATE TABLE IF NOT EXISTS test.test_log_all ON CLUSTER sht_ck_cluster_1
    3. AS test.events_local
    4. ENGINE = Distributed(sht_ck_cluster_1,test,events_local,rand());

    至此,ck_cluster集群的本地表test_log和分布式表test_log_all就创建完成了。

    Distributed引擎需要以下几个参数,参数描述:

    • cluster:集群名称,在对分布式表执⾏读写的过程中,它会使⽤集群的配置信息来找到相应的host节点。 
    • database,table:本地表所在的数据库和本地表名称,用于将分布式表映射到本地表上。 
    • 本地表名称,
    • sharding_key(可选的): 分⽚键,分布式表会按照这个规则,将数据分发到各个本地表中。该键与config.xml中配置的分片权重(weight)一同决定写入分布式表时的路由,即数据最终落到哪个物理表上。它可以是表中一列的原始数据(如site_id),也可以是函数调用的结果,如上面的SQL语句采用了随机值rand()。注意该键要尽量保证数据均匀分布,另外一个常用的操作是采用区分度较高的列的哈希值,如intHash64(user_id)
    • --创建分布式表test_log_all,数据在读写时会根据rand()随机函数的取值,决定数据写⼊哪个分⽚,也可以用hash取值。 create table test_log_all on cluster ck_cluster ( totalDate Date, unikey    String ) engine = Distributed('ck_cluster', 'test', 'test_log', rand());

    三、python代码实现(亲测有效)

    1. from clickhouse_driver import Client
    2. # 连接clickhouse数据库:host为ip地址,port为端口号,database为数据库名
    3. client = Client(host='host', port='9000', database='database', user="user", password="xxxxxx")
    4. # 创建本地表
    5. sql = """
    6. CREATE TABLE if not exists test_local on cluster 【集群名称】
    7. (
    8. id Int32,
    9. user_name String,
    10. age Int32
    11. ) ENGINE = MergeTree()
    12. PARTITION BY id
    13. ORDER BY id
    14. SETTINGS index_granularity = 8192
    15. """
    16. res = client.execute(sql)
    17. print('#######res:',res)
    18. # 创建分布式表
    19. sql = """
    20. CREATE TABLE if not exists test_all on cluster 【集群名称】
    21. AS database.test_local
    22. ENGINE = Distributed(【集群名称】, database, test_local, rand());
    23. """ # cluster集群名,数据库名,本地表名,在读写时会根据rand()随机函数的取值来决定数据写⼊哪个分⽚
    24. res = client.execute(sql)
    25. print('#######res:',res)
    26. # 打印表结构
    27. sql1 = 'desc test_all'
    28. sql2 = 'desc test_local'
    29. ans = client.execute(sql1)
    30. print('#######ans:',ans)
    31. ans = client.execute(sql2)
    32. print('#######ans:',ans)
    33. # 写入数据
    34. insert_sql = "insert into database.test_all(id,user_name,age) values ('%s','%s','%s')" % (6,"wefgrgw",23)
    35. ans = client.execute(insert_sql)
    36. # 查询数据
    37. sql = """SELECT * FROM test_all where id=6"""
    38. print('#######result:', client.execute(sql))

    可以看到,插入成功了。

    注意:如果恰好存到连接的这台服务器的话就可以在本地表中查询到,否则在本地表中查询不到该条信息。

    四、解决遇到的问题

    解决 DB::Exception: Missing columns: 'wefgrgrfew' while processing query: 'wefgrgrfew', required columns: 'wefgrgrfew' 'wefgrgrfew': While executing ValuesBlockInputFormat. Stack trace:

    问题原因:insert语句写的不对,所以导致插入失败。

    解决

    1. from clickhouse_driver import Client
    2. # 连接clickhouse数据库:host为ip地址,port为端口号,database为数据库名
    3. client = Client(host='host', port='9000', database='database', user="user", password="xxxxxx")
    4. # 写入数据
    5. insert_sql = "insert into database.test_all(id,user_name,age) values ('%s','%s','%s')" % (6,"wefgrgw",23)
    6. ans = client.execute(insert_sql)
    7. print('#######ans:',ans)

    可以看到,成功插入数据了。

  • 相关阅读:
    【教3妹学java】10.Java对象在内存中是怎样存储的?
    【目标跟踪-卡尔曼滤波】基于扩展卡尔曼滤波实现目标跟踪定位附Matlab源码
    SpringCloud Alibaba系列 Gateway(四)
    点击化学 PEG 试剂:1802907-92-1,Mtz-PEG4-NHS,甲基四嗪-四聚乙醇-活性酯
    代码随想录算法训练营第一天
    L1-023 输出GPLT C++解法【全网最细讲解】
    嵌入式web 服务器boa的编译和移植
    公会发展计划(GAP):经过实战考验的 Web3 任务模式
    Python开发的Web在线学习教育培训网课系统
    C++ 算法基础课 01 —— 基础算法_快速排序/归并排序/二分查找/高精度
  • 原文地址:https://blog.csdn.net/qq_45956730/article/details/127794874