目录
clickhouse中的表可以分为分布式表和本地表。
分布表包括逻辑表和物理表,,逻辑表就是表机构用于查询,物理表是实际存储数据的。
(1)分布式表:逻辑存在的表,自身不存储数据,可以理解为数据库中的视图, 一般建议使用分布式表做查询操作,分布式表引擎会将我们的查询请求路由本地表进行查询, 然后进行汇总最终返回给用户。
(2)本地表:真正存储数据的表。
分布式(Distributed)表引擎是分布式表的代名词,它⾃身不存储任何数据,⽽是作为数据分⽚的透明代理,能够⾃动的路由数据⾄集群中的各个节点,即分布式表需要和其他数据表⼀起协同⼯作。分布式表会将接收到的读写任务,分发到各个本地表,而实际上数据的存储也是保存在各个节点的本地表中。原理如下图:
分布式表创建规则:
使用on cluster语句在集群的某台机器上执行以下代码,即可在每台机器上创建本地表和分布式表,其中⼀张本地表对应着⼀个数据分⽚,分布式表通常以本地表加“_all”命名。它与本地表形成⼀对多的映射关系,之后可以通过分布式表代理操作多张本地表。
这里有个要注意的点,就是分布式表的表结构尽量和本地表的结构一致。如果不一致,在建表时不会报错,但在查询或者插入时可能会抛出异常。
在集群中使用,我们要加上on cluster
(1)先在每一个分片上创建本地表:
- -- auto-generated definition 物理表
- CREATE TABLE IF NOT EXISTS test.test_log ON CLUSTER '{cluster}' (
- ts_date Date,
- ts_date_time DateTime,
- user_id Int64,
- event_type String,
- site_id Int64,
- groupon_id Int64,
- category_id Int64,
- merchandise_id Int64,
- search_text String
- -- A lot more columns...
- )ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test/events_local','{replica}')
- PARTITION BY ts_date
- ORDER BY (ts_date,toStartOfHour(ts_date_time),site_id,event_type)
- SETTINGS index_granularity = 8192;
ReplicatedMergeTree引擎族接收两个参数:
/clickhouse/tables/{shard}/[database_name]/[table_name]
。{replica}
即可。观察一下上述ZK路径下的znode结构与内容。
- [zk: localhost:2181(CONNECTED) 0] ls /clickhouse/tables/01/test/events_local
- [metadata, temp, mutations, log, leader_election, columns, blocks, nonincrement_block_numbers, replicas, quorum, block_numbers]
-
- [zk: localhost:2181(CONNECTED) 1] get /clickhouse/tables/04/test/events_local/columns
- columns format version: 1
- 9 columns:
- `ts_date` Date
- `ts_date_time` DateTime
- `user_id` Int64
- `event_type` String
- `site_id` Int64
- `groupon_id` Int64
- `category_id` Int64
- `merchandise_id` Int64
- `search_text` String
- # ...................
-
- [zk: localhost:2181(CONNECTED) 2] get /clickhouse/tables/07/test/events_local/metadata
- metadata format version: 1
- date column:
- sampling expression:
- index granularity: 8192
- mode: 0
- sign column:
- primary key: ts_date, toStartOfHour(ts_date_time), site_id, event_type
- data format version: 1
- partition key: ts_date
- granularity bytes: 10485760
- # ...................
ReplicatedMergeTree引擎族在ZK中存储大量数据,包括且不限于表结构信息、元数据、操作日志、副本状态、数据块校验值、数据part merge过程中的选主信息等等。可见,ZK在复制表机制下扮演了元数据存储、日志框架、分布式协调服务三重角色,任务很重,所以需要额外保证ZK集群的可用性以及资源(尤其是硬盘资源)。
(2)分布式表引擎的创建模板:
支持分布式表的引擎是Distributed,建表DDL语句示例如下,_all
只是分布式表名比较通用的后缀而已。
- -- auto-generated definition 逻辑表
- CREATE TABLE IF NOT EXISTS test.test_log_all ON CLUSTER sht_ck_cluster_1
- AS test.events_local
- ENGINE = Distributed(sht_ck_cluster_1,test,events_local,rand());
至此,ck_cluster集群的本地表test_log和分布式表test_log_all就创建完成了。
Distributed引擎需要以下几个参数,参数描述:
site_id
),也可以是函数调用的结果,如上面的SQL语句采用了随机值rand()
。注意该键要尽量保证数据均匀分布,另外一个常用的操作是采用区分度较高的列的哈希值,如intHash64(user_id)
。- from clickhouse_driver import Client
-
- # 连接clickhouse数据库:host为ip地址,port为端口号,database为数据库名
- client = Client(host='host', port='9000', database='database', user="user", password="xxxxxx")
-
-
- # 创建本地表
- sql = """
- CREATE TABLE if not exists test_local on cluster 【集群名称】
- (
- id Int32,
- user_name String,
- age Int32
- ) ENGINE = MergeTree()
- PARTITION BY id
- ORDER BY id
- SETTINGS index_granularity = 8192
- """
- res = client.execute(sql)
- print('#######res:',res)
-
-
- # 创建分布式表
- sql = """
- CREATE TABLE if not exists test_all on cluster 【集群名称】
- AS database.test_local
- ENGINE = Distributed(【集群名称】, database, test_local, rand());
- """ # cluster集群名,数据库名,本地表名,在读写时会根据rand()随机函数的取值来决定数据写⼊哪个分⽚
-
- res = client.execute(sql)
- print('#######res:',res)
-
-
- # 打印表结构
- sql1 = 'desc test_all'
- sql2 = 'desc test_local'
- ans = client.execute(sql1)
- print('#######ans:',ans)
- ans = client.execute(sql2)
- print('#######ans:',ans)
-
-
- # 写入数据
- insert_sql = "insert into database.test_all(id,user_name,age) values ('%s','%s','%s')" % (6,"wefgrgw",23)
- ans = client.execute(insert_sql)
-
-
-
- # 查询数据
- sql = """SELECT * FROM test_all where id=6"""
- print('#######result:', client.execute(sql))
可以看到,插入成功了。
注意:如果恰好存到连接的这台服务器的话就可以在本地表中查询到,否则在本地表中查询不到该条信息。
问题原因:insert语句写的不对,所以导致插入失败。
解决:
- from clickhouse_driver import Client
-
- # 连接clickhouse数据库:host为ip地址,port为端口号,database为数据库名
- client = Client(host='host', port='9000', database='database', user="user", password="xxxxxx")
-
-
- # 写入数据
- insert_sql = "insert into database.test_all(id,user_name,age) values ('%s','%s','%s')" % (6,"wefgrgw",23)
- ans = client.execute(insert_sql)
- print('#######ans:',ans)
可以看到,成功插入数据了。