• ClickHouse distributed表引擎


    Distributed Table Engine | ClickHouse DocsTables with Distributed engine do not store any data of their own, but allow distributed query processing on multiple servers.https://clickhouse.com/docs/en/engines/table-engines/special/distributed具有分布式引擎的表不存储自己的任何数据,但允许在多个服务器上进行分布式查询处理。 读取是自动并行的。读取数据会自动使用表索引。

    建表语句

    1. CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
    2. (
    3. name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    4. name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    5. ...
    6. ) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
    7. [SETTINGS name=value, ...]

    根据已存在的表建立

    如果distributed表指向服务器中已经存在的表:

    1. CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
    2. AS [db2.]name2
    3. ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
    4. [SETTINGS name=value, ...]

    分布式表引擎的参数

    • cluster - 集群名

    • database - 数据库名,除了使用字符串,还能够使用currentDatabase()

    • table - 表名

    • sharding_key - (可选)分片键

    • policy_name - (可选)策略名称,它将用于存储异步发送的临时文件

     Distributed相关的参数

    • fsync_after_insert - 异步插入到分布式表之后,再对文件数据进行 fsync(fsync指同步内存中所有已修改的文件数据到储存设备)。保证操作系统将能将所有数据刷到磁盘文件中。

    • fsync_directories - do the fsync for directories.

    • bytes_to_throw_insert - 等待异步 INSERT的数据字节数超过这个值,就会抛出异常。默认为0,取消限制。

    • bytes_to_delay_insert - 等待异步 INSERT的数据字节数超过这个值,插入将被延迟。默认为0,取消限制。

    • max_delay_to_insert - 上面参数延迟的时间,默认为60(s)

    • monitor_batch_inserts - same as distributed_directory_monitor_batch_inserts

      启用/禁用批量发送插入的数据。

      启用批量发送后,分布式表引擎会尝试在一次操作中发送多个插入数据的文件,而不是单独发送。 批量发送通过更好地利用服务器和网络资源来提高集群性能。

    • monitor_split_batch_on_failure - same as distributed_directory_monitor_split_batch_on_failure

      启用/禁用在失败时拆分批次。

      有时由于超出内存限制或类似错误导致某些复杂的管道(即带有 GROUP BY 的 MATERIALIZED VIEW)之后,将特定批次发送到远程分片可能会失败。 在这种情况下,重试将无济于事(这将卡住表的分布式发送),但从该批次中一一发送文件可能会成功插入。

    • monitor_sleep_time_ms - same as distributed_directory_monitor_sleep_time_ms,分布式表引擎发送数据的基本间隔。 如果出现错误,实际间隔会呈指数增长。

    • monitor_max_sleep_time_ms - same as distributed_directory_monitor_max_sleep_time_ms,分布式表引擎发送数据的最大间隔。

    distributed不仅会从远程服务器上读取数据,而且会在允许的范围内在远程服务器上对数据进行部分处理。 

    比如,带group by的查询语句,将会在远程服务器上进行初步聚合,然后将聚合的中间结果发送过来进行下一步聚合。

    Clusters

    Clusters会定义在数据库的config.xml中。(在实际生产中,clusters的内容一般会拿出来单独在metrika.xml中设定,然后使用<include_from>/etc/clickhouse/metrika.xml</include_from>将配置包含在config.xml中)

    1. <remote_servers>
    2. <logs>
    3. <!-- Inter-server per-cluster secret for Distributed queries
    4. default: no secret (no authentication will be performed)
    5. If set, then Distributed queries will be validated on shards, so at least:
    6. - such cluster should exist on the shard,
    7. - such cluster should have the same secret.
    8. And also (and which is more important), the initial_user will
    9. be used as current user for the query.
    10. -->
    11. <!-- <secret></secret> -->
    12. <shard>
    13. <!-- Optional. Shard weight when writing data. Default: 1. -->
    14. <weight>1</weight>
    15. <!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
    16. <internal_replication>false</internal_replication>
    17. <replica>
    18. <!-- Optional. Priority of the replica for load balancing (see also load_balancing setting). Default: 1 (less value has more priority). -->
    19. <priority>1</priority>
    20. <host>example01-01-1</host>
    21. <port>9000</port>
    22. </replica>
    23. <replica>
    24. <host>example01-01-2</host>
    25. <port>9000</port>
    26. </replica>
    27. </shard>
    28. <shard>
    29. <weight>2</weight>
    30. <internal_replication>false</internal_replication>
    31. <replica>
    32. <host>example01-02-1</host>
    33. <port>9000</port>
    34. </replica>
    35. <replica>
    36. <host>example01-02-2</host>
    37. <secure>1</secure>
    38. <port>9440</port>
    39. </replica>
    40. </shard>
    41. </logs>
    42. </remote_servers>

    这里定义了一个名为 logs 的集群,该集群由两个分片组成,每个分片包含两个副本。

    分片:是指包含不同部分数据的服务器(为了读取所有数据,您必须访问所有分片)。

    副本:是复制服务器(为了读取所有数据,您可以访问任何一个副本上的数据)。

    Cluster的名称不能包含'.' 。

    每个服务器都需要指定参数:

    • 主机(host):必选。
      • 远程服务器的地址。您可以使用域或 IPv4 或 IPv6 地址。如果指定域,则服务器在启动时会发出 DNS 请求,只要服务器正在运行,就会存储结果。如果 DNS 请求失败,则服务器不会启动。如果更改 DNS 记录,请重新启动服务器。
    • 端口(port):必选。
      • 发送信息的TCP端口(配置中的 tcp_port,通常设置为 9000)。 不要与 http_port 混淆。
    • 用户(user):可选。
      • 用于连接到远程服务器的用户名。 默认值是配置中的default用户。此用户必须有权连接到指定的服务器。 访问权限在 users.xml 文件中配置。 有关详细信息,请参阅访问权限部分。
    • 密码(password):可选。
      • ​​​​​​​连接到远程服务器的密码(未屏蔽)。 默认值:空字符串。
    • 安全(secure):可选。
      • ​​​​​​​是否使用安全的 SSL/TLS 连接。 通常还需要指定端口(默认安全端口为 9440)。 服务器应该监听 <tcp_port_secure>9440</tcp_port_secure> 并配置正确的证书。
    • 压缩方式(compression):可选。
      • ​​​​​​​使用数据压缩。,默认值true

    如果设置了副本,那么读取数据的时候,会为每个分片选择一个可用副本。(参考 load_balancing 设置可以设置数据负载均衡算法。)

    如果选择的副本未建立与当前服务器的连接,在一小段时间内将尝试连接。如果连接不上,那么就会选择下一个副本,以此类推。如果所有副本的连接尝试都失败,则尝试以相同的方式重复几次。

    这有利于弹性,但不提供完整的容错能力: 远程服务器可能接受连接后无法正常工作,或者工作不佳。

    ck支持仅指定一个分片(在这种情况下,查询处理应称为远程处理,而不是分布式处理)或指定任意数量的分片。在每个分片中,可以指定任意数量的副本。 也可以为每个分片指定不同数量的副本。

    配置中可以定义任意数量的集群(clusters)。

    可以通过sql查询 system.clusters表来查看当前的集群信息。

    Distributed引擎可以像操作本地服务器一样操作分布式集群,但是需要在配置中配置集群信息。

    通常情况下,集群中的所有服务器都应该有相同的集群配置。

    配置文件中的集群会即时更新,无需重新启动服务器。

    如果每次发送的查询选择的分片是随机的,那么就不需要创建分布式表了,可以选择远程表函数的功能,参考 Table functions.

    向Distributed表中写入数据

    直接写入分片

    可以定义将哪些数据写入哪些服务器并直接在每个分片上执行写入。 换句话说,对分布式表指向的集群中的远程表执行直接 INSERT 语句。

    这是最灵活的解决方案,因为您可以使用任何分片方案,即使是由于主题领域的要求而并非微不足道的分片方案。

    这也是最优化的解决方案,因为数据可以完全独立地写入不同的分片。

    写入distributed表

    对分布式表执行 INSERT 语句。

    在这种情况下,distributed表将在服务器本身之间分配插入的数据。

    为了写入分布式表,它必须配置 sharding_key 参数(除非只有一个分片)。

    权重的配置与数据分发

    每个分片都可以在配置文件中定义一个 <weight>。 默认情况下,权重为 1。

    数据以与分片权重成比例的数量分布在分片中。

    比例就是,将所有分片权重相加,然后除以所有分片的总和,以确定每个分片的比例。 例如,如果有两个分片,第一个的权重为 1,而第二个的权重为 2,第一个将被发送三分之一 (1 / 3) 的插入行,第二个将被发送三分之二 (2 / 3)。

    internal_replication 参数

    每个分片都可以在配置文件中定义 internal_replication 参数。

    如果此参数设置为 true,则写入操作会选择第一个健康的副本并向其写入数据。如果distributed表下的表引擎是replicated系列的,那么就开启这个值,一个表副本将接收写入,并将自动复制到其他副本。

    如果 internal_replication 设置为 false(默认值),则数据将写入所有副本。 在这种情况下,分布式表自己复制数据,由于不会检查一致性,一段时间后必然会出现数据不一致的情况。

    如何选择数据插入的分片

    要选择一行数据插入哪个分片,需要计算该行对应的分片表达式。

    分片表达式可以是返回整数的常量或者表的列中的任何表达式。

    例如,您可以使用表达式 rand() 来进行数据的随机分布,或者使用 UserID 来通过除用户 ID 得到的余数进行分布(那么单个用户的数据将驻留在单个分片上,这样可以简化运行 IN 和 JOIN由用户)。如果其中一列分布不够均匀,您可以将其包装在哈希函数中,例如 intHash64(UserID)。

    在计算完分片表达式后,将其除以分片的总权重得到余数。根据余数的范围就可以选择插入到哪个分片中。

    比如:

    计算的分片表达式值为n。如果有两个分片,第一个的权重为 9,而第二个的权重为 10,则余数就是n%19。

    该行发送到第一个分片的范围余数范围为 [0, 9) ,发送到第二个分片的范围为 [9, 19) 。

    做除法取余的这种方式解决的场景有限,在其他场景的情况下是不合适的。这种方式适合中大型的场景(数十台服务器),但是对于特大型场景(数百台或更多服务器)就不试用了。这种情况应该对区域分区,不应该使用distributed表。

    在以下情况下应该关注一下选择分片的方案:

    1. join或者in的查询,这时候如果join或者in的两张表能够有相同的分发策略,那么join或者in查询就能够在本地实现,不用再使用global join/in,这样会更有效率。
    2. 使用大量服务器(数百台),并且包含大量的小查询。例如查询单个客户(例如网站、广告商或合作伙伴)的数据。
      1. 这种情况下将这个用户的所有数据放置在一个分片上会更有效率。
      2. 或者将一些分片分在一起,称之为一个“layer”,数据在每个layer随机分布,然后对这些“layer”做分布式表,每个客户的数据放置在一个layer中。为全局查询创建一个共享分布式表。

    数据的写入方式

    数据是异步写入的。

    当插入数据的时候,数据块只会写入本地文件系统。然后在后台,数据会尽快发到各个shard。

    发送数据的周期由下面两个配置项定义:

    distributed_directory_monitor_sleep_time_ms 

    distributed_directory_monitor_max_sleep_time_ms 

    两个参数的含义上面写了。

    Distributed 引擎会分别发送插入的文件数据,设置 distributed_directory_monitor_batch_inserts 可以批量发送文件。此设置通过更好地利用本地服务器和网络资源来提高集群性能。

    可以通过查看表目录/var/lib/clickhouse/data/database/table/ 来检查数据是否发送成功。

    执行后台任务的线程数可以通过  background_distributed_schedule_pool_size 来设置。

    如果在 INSERT 到分布式表之后,服务器因为某些原因停止服务或者重启了,那么插入的数据可能会丢失。如果在表目录中检测到损坏的数据部分,则将其转移到损坏的子目录中,不再使用。

    从Distributed表中读取数据

    查询分布式表时,SELECT 查询会被发送到所有分片。

    添加新分片时,不必传输旧数据。可以通过使用更大的权重向其写入新数据——数据将稍微不均匀地分布,但查询将正确有效地工作。

    启用 max_parallel_replicas 后,查询处理将在单个分片内的所有副本之间进行并行处理。

    Virtual Columns

    _shard_num:包含表 system.clusters 中的 shard_num 值。 类型:UInt32。

    由于远程和集群表函数在内部创建临时分布式表,所以 _shard_num 在那里也可用。

  • 相关阅读:
    HDR图像处理软件 Photomatix Pro mac中文版新增功能
    【Redis】SSM整合Redis&注解式缓存的使用
    有孚网络混合云,加速企业数字化转型升级
    spring framework spring-boot spring-cloud 官方文档入口
    stress、mpstat、pidstat
    宏offsetof的使用及其模拟实现
    14、Python -- 列表推导式(for表达式)与控制循环
    PUK码怎么解锁
    JUC系列(七) ForkJion任务拆分与异步回调
    XGBoost算法讲解和公式推导
  • 原文地址:https://blog.csdn.net/qq_35423190/article/details/125337861