Distributed Table Engine | ClickHouse DocsTables with Distributed engine do not store any data of their own, but allow distributed query processing on multiple servers.https://clickhouse.com/docs/en/engines/table-engines/special/distributed具有分布式引擎的表不存储自己的任何数据,但允许在多个服务器上进行分布式查询处理。 读取是自动并行的。读取数据会自动使用表索引。
- CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
- (
- name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
- name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
- ...
- ) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
- [SETTINGS name=value, ...]
如果distributed表指向服务器中已经存在的表:
- CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
- AS [db2.]name2
- ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
- [SETTINGS name=value, ...]
cluster - 集群名
database - 数据库名,除了使用字符串,还能够使用currentDatabase()
table - 表名
sharding_key - (可选)分片键
policy_name - (可选)策略名称,它将用于存储异步发送的临时文件
fsync_after_insert - 异步插入到分布式表之后,再对文件数据进行 fsync(fsync指同步内存中所有已修改的文件数据到储存设备)。保证操作系统将能将所有数据刷到磁盘文件中。
fsync_directories - do the fsync for directories.
bytes_to_throw_insert - 等待异步 INSERT的数据字节数超过这个值,就会抛出异常。默认为0,取消限制。
bytes_to_delay_insert - 等待异步 INSERT的数据字节数超过这个值,插入将被延迟。默认为0,取消限制。
max_delay_to_insert - 上面参数延迟的时间,默认为60(s)
monitor_batch_inserts - same as distributed_directory_monitor_batch_inserts,
启用/禁用批量发送插入的数据。
启用批量发送后,分布式表引擎会尝试在一次操作中发送多个插入数据的文件,而不是单独发送。 批量发送通过更好地利用服务器和网络资源来提高集群性能。
monitor_split_batch_on_failure - same as distributed_directory_monitor_split_batch_on_failure,
启用/禁用在失败时拆分批次。
有时由于超出内存限制或类似错误导致某些复杂的管道(即带有 GROUP BY 的 MATERIALIZED VIEW)之后,将特定批次发送到远程分片可能会失败。 在这种情况下,重试将无济于事(这将卡住表的分布式发送),但从该批次中一一发送文件可能会成功插入。
monitor_sleep_time_ms - same as distributed_directory_monitor_sleep_time_ms,分布式表引擎发送数据的基本间隔。 如果出现错误,实际间隔会呈指数增长。
monitor_max_sleep_time_ms - same as distributed_directory_monitor_max_sleep_time_ms,分布式表引擎发送数据的最大间隔。
distributed不仅会从远程服务器上读取数据,而且会在允许的范围内在远程服务器上对数据进行部分处理。
比如,带group by的查询语句,将会在远程服务器上进行初步聚合,然后将聚合的中间结果发送过来进行下一步聚合。
Clusters会定义在数据库的config.xml中。(在实际生产中,clusters的内容一般会拿出来单独在metrika.xml中设定,然后使用<include_from>/etc/clickhouse/metrika.xml</include_from>将配置包含在config.xml中)
- <remote_servers>
- <logs>
- <!-- Inter-server per-cluster secret for Distributed queries
- default: no secret (no authentication will be performed)
- If set, then Distributed queries will be validated on shards, so at least:
- - such cluster should exist on the shard,
- - such cluster should have the same secret.
- And also (and which is more important), the initial_user will
- be used as current user for the query.
- -->
- <!-- <secret></secret> -->
- <shard>
- <!-- Optional. Shard weight when writing data. Default: 1. -->
- <weight>1</weight>
- <!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
- <internal_replication>false</internal_replication>
- <replica>
- <!-- Optional. Priority of the replica for load balancing (see also load_balancing setting). Default: 1 (less value has more priority). -->
- <priority>1</priority>
- <host>example01-01-1</host>
- <port>9000</port>
- </replica>
- <replica>
- <host>example01-01-2</host>
- <port>9000</port>
- </replica>
- </shard>
- <shard>
- <weight>2</weight>
- <internal_replication>false</internal_replication>
- <replica>
- <host>example01-02-1</host>
- <port>9000</port>
- </replica>
- <replica>
- <host>example01-02-2</host>
- <secure>1</secure>
- <port>9440</port>
- </replica>
- </shard>
- </logs>
- </remote_servers>
这里定义了一个名为 logs 的集群,该集群由两个分片组成,每个分片包含两个副本。
分片:是指包含不同部分数据的服务器(为了读取所有数据,您必须访问所有分片)。
副本:是复制服务器(为了读取所有数据,您可以访问任何一个副本上的数据)。
Cluster的名称不能包含'.' 。
每个服务器都需要指定参数:
如果设置了副本,那么读取数据的时候,会为每个分片选择一个可用副本。(参考 load_balancing 设置可以设置数据负载均衡算法。)
如果选择的副本未建立与当前服务器的连接,在一小段时间内将尝试连接。如果连接不上,那么就会选择下一个副本,以此类推。如果所有副本的连接尝试都失败,则尝试以相同的方式重复几次。
这有利于弹性,但不提供完整的容错能力: 远程服务器可能接受连接后无法正常工作,或者工作不佳。
ck支持仅指定一个分片(在这种情况下,查询处理应称为远程处理,而不是分布式处理)或指定任意数量的分片。在每个分片中,可以指定任意数量的副本。 也可以为每个分片指定不同数量的副本。
配置中可以定义任意数量的集群(clusters)。
可以通过sql查询 system.clusters表来查看当前的集群信息。
Distributed引擎可以像操作本地服务器一样操作分布式集群,但是需要在配置中配置集群信息。
通常情况下,集群中的所有服务器都应该有相同的集群配置。
配置文件中的集群会即时更新,无需重新启动服务器。
如果每次发送的查询选择的分片是随机的,那么就不需要创建分布式表了,可以选择远程表函数的功能,参考 Table functions.
直接写入分片
可以定义将哪些数据写入哪些服务器并直接在每个分片上执行写入。 换句话说,对分布式表指向的集群中的远程表执行直接 INSERT 语句。
这是最灵活的解决方案,因为您可以使用任何分片方案,即使是由于主题领域的要求而并非微不足道的分片方案。
这也是最优化的解决方案,因为数据可以完全独立地写入不同的分片。
对分布式表执行 INSERT 语句。
在这种情况下,distributed表将在服务器本身之间分配插入的数据。
为了写入分布式表,它必须配置 sharding_key 参数(除非只有一个分片)。
每个分片都可以在配置文件中定义一个 <weight>。 默认情况下,权重为 1。
数据以与分片权重成比例的数量分布在分片中。
比例就是,将所有分片权重相加,然后除以所有分片的总和,以确定每个分片的比例。 例如,如果有两个分片,第一个的权重为 1,而第二个的权重为 2,第一个将被发送三分之一 (1 / 3) 的插入行,第二个将被发送三分之二 (2 / 3)。
每个分片都可以在配置文件中定义 internal_replication 参数。
如果此参数设置为 true,则写入操作会选择第一个健康的副本并向其写入数据。如果distributed表下的表引擎是replicated系列的,那么就开启这个值,一个表副本将接收写入,并将自动复制到其他副本。
如果 internal_replication 设置为 false(默认值),则数据将写入所有副本。 在这种情况下,分布式表自己复制数据,由于不会检查一致性,一段时间后必然会出现数据不一致的情况。
要选择一行数据插入哪个分片,需要计算该行对应的分片表达式。
分片表达式可以是返回整数的常量或者表的列中的任何表达式。
例如,您可以使用表达式 rand() 来进行数据的随机分布,或者使用 UserID 来通过除用户 ID 得到的余数进行分布(那么单个用户的数据将驻留在单个分片上,这样可以简化运行 IN 和 JOIN由用户)。如果其中一列分布不够均匀,您可以将其包装在哈希函数中,例如 intHash64(UserID)。
在计算完分片表达式后,将其除以分片的总权重得到余数。根据余数的范围就可以选择插入到哪个分片中。
比如:
计算的分片表达式值为n。如果有两个分片,第一个的权重为 9,而第二个的权重为 10,则余数就是n%19。
该行发送到第一个分片的范围余数范围为 [0, 9) ,发送到第二个分片的范围为 [9, 19) 。
做除法取余的这种方式解决的场景有限,在其他场景的情况下是不合适的。这种方式适合中大型的场景(数十台服务器),但是对于特大型场景(数百台或更多服务器)就不试用了。这种情况应该对区域分区,不应该使用distributed表。
在以下情况下应该关注一下选择分片的方案:
数据是异步写入的。
当插入数据的时候,数据块只会写入本地文件系统。然后在后台,数据会尽快发到各个shard。
发送数据的周期由下面两个配置项定义:
distributed_directory_monitor_sleep_time_ms
distributed_directory_monitor_max_sleep_time_ms
两个参数的含义上面写了。
Distributed 引擎会分别发送插入的文件数据,设置 distributed_directory_monitor_batch_inserts 可以批量发送文件。此设置通过更好地利用本地服务器和网络资源来提高集群性能。
可以通过查看表目录/var/lib/clickhouse/data/database/table/ 来检查数据是否发送成功。
执行后台任务的线程数可以通过 background_distributed_schedule_pool_size 来设置。
如果在 INSERT 到分布式表之后,服务器因为某些原因停止服务或者重启了,那么插入的数据可能会丢失。如果在表目录中检测到损坏的数据部分,则将其转移到损坏的子目录中,不再使用。
查询分布式表时,SELECT 查询会被发送到所有分片。
添加新分片时,不必传输旧数据。可以通过使用更大的权重向其写入新数据——数据将稍微不均匀地分布,但查询将正确有效地工作。
启用 max_parallel_replicas 后,查询处理将在单个分片内的所有副本之间进行并行处理。
_shard_num:包含表 system.clusters 中的 shard_num 值。 类型:UInt32。
由于远程和集群表函数在内部创建临时分布式表,所以 _shard_num 在那里也可用。