• 【基础篇】ClickHouse 表引擎之集成Kafka



    在这里插入图片描述

    0.前言

    ClickHouse为了方便与Kafka集成,提供了一个名为Kafka引擎的专用表引擎。Kafka引擎允许你在ClickHouse中创建一个表,这个表的数据源来自于一个或多个Kafka队列。结合使用Kafka引擎和Materialized Views,可以实现将数据从Kafka队列消费,然后将数据存储到其他引擎的表中,从而实现实时数据处理和查询。

    1.集成示例

    要创建一个Kafka引擎的表,你需要提供以下几个关键参数:

    1. kafka_broker_list:Kafka代理地址列表,用逗号分隔的字符串。
    2. kafka_topic:要订阅的Kafka主题。
    3. kafka_group_name:消费者组名称,用于标识ClickHouse实例所属的消费者组。
    4. kafka_format:消息格式,用于指定如何将Kafka中的消息解析成表的行,例如JSONEachRow等。

    创建一个Kafka引擎的表的示例:

    CREATE TABLE kafka_table
    (
        column1 String,
        column2 UInt64,
        column3 Float64
    ) ENGINE = Kafka
    SETTINGS
        kafka_broker_list = 'kafka1:9092,kafka2:9092',
        kafka_topic = 'kafka_topic_name',
        kafka_group_name = 'clickhouse_group',
        kafka_format = 'JSONEachRow';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    为了将数据从Kafka表消费并存储到其他表引擎(例如MergeTree)的表中,你可以创建一个Materialized View,例如:

    CREATE MATERIALIZED VIEW mv_kafka_to_storage
    ENGINE = MergeTree
    PARTITION BY toYYYYMMDD(column2)
    ORDER BY (column1, column2)
    AS SELECT
        column1,
        column2,
        column3
    FROM kafka_table;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    使用Kafka引擎和Materialized View,你可以在ClickHouse中实现实时数据消费、处理和查询,从而大大提高数据处理的效率。


    官方教程

    此引擎与 Apache Kafka 结合使用。

    Kafka 特性:

    • 发布或者订阅数据流。
    • 容错存储机制。
    • 处理流数据。

    老版格式:

        Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
              [, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
    
    • 1
    • 2

    新版格式:

        Kafka SETTINGS
          kafka_broker_list = 'localhost:9092',
          kafka_topic_list = 'topic1,topic2',
          kafka_group_name = 'group1',
          kafka_format = 'JSONEachRow',
          kafka_row_delimiter = '\n',
          kafka_schema = '',
          kafka_num_consumers = 2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    必要参数:

    • kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
    • kafka_topic_list – topic 列表 (my_topic)。
    • kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
    • kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow。了解详细信息,请参考 Formats 部分。

    可选参数:

    • kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
    • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。例如,普罗托船长 需要 schema 文件路径以及根对象 schema.capnp:Message 的名字。
    • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

    示例1:

      CREATE TABLE queue (
        timestamp UInt64,
        level String,
        message String
      ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
    
      SELECT * FROM queue LIMIT 5;
    
      CREATE TABLE queue2 (
        timestamp UInt64,
        level String,
        message String
      ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                                kafka_topic_list = 'topic',
                                kafka_group_name = 'group1',
                                kafka_format = 'JSONEachRow',
                                kafka_num_consumers = 4;
    
      CREATE TABLE queue2 (
        timestamp UInt64,
        level String,
        message String
      ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
                  SETTINGS kafka_format = 'JSONEachRow',
                           kafka_num_consumers = 4;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。

    消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。了解更多信息请访问 http://kafka.apache.org/intro

    SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:

    1. 使用引擎创建一个 Kafka 消费者并作为一条数据流。
    2. 创建一个结构表。
    3. 创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。

    MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。

    示例2:

      CREATE TABLE queue (
        timestamp UInt64,
        level String,
        message String
      ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
    
      CREATE TABLE daily (
        day Date,
        level String,
        total UInt64
      ) ENGINE = SummingMergeTree(day, (day, level), 8192);
    
      CREATE MATERIALIZED VIEW consumer TO daily
        AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
        FROM queue GROUP BY day, level;
    
      SELECT level, sum(total) FROM daily GROUP BY level;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    为了提高性能,接受的消息被分组为 max_insert_block_size 大小的块。如果未在 stream_flush_interval_ms 毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。

    停止接收主题数据或更改转换逻辑,请 detach 物化视图:

      DETACH TABLE consumer;
      ATTACH TABLE consumer;
    
    • 1
    • 2

    如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。

    配置

    GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka) 和 主题级别 (kafka_*)。首先应用全局配置,然后应用主题级配置(如果存在)。

      
      <kafka>
        <debug>cgrpdebug>
        <auto_offset_reset>smallestauto_offset_reset>
      kafka>
    
      
      <kafka_logs>
        <retry_backoff_ms>250retry_backoff_ms>
        <fetch_min_bytes>100000fetch_min_bytes>
      kafka_logs>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    有关详细配置选项列表,请参阅 librdkafka配置参考。在 ClickHouse 配置中使用下划线 (_) ,并不是使用点 (.)。例如,check.crcs=true 将是 true

    Kerberos 支持

    对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
    clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。

    示例:

      
      <kafka>
        <security_protocol>SASL_PLAINTEXTsecurity_protocol>
        <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytabsasl_kerberos_keytab>
        <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COMsasl_kerberos_principal>
      kafka>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    虚拟列

    • _topic – Kafka 主题。
    • _key – 信息的键。
    • _offset – 消息的偏移量。
    • _timestamp – 消息的时间戳。
    • _timestamp_ms – 消息的时间戳(毫秒)。
    • _partition – Kafka 主题的分区。

    参考文档

    • ClickHouse Kafka Engine: https://clickhouse.tech/docs/en/engines/table-engines/integrations/kafka/
    • ClickHouse + Kafka — How to Build Real-Time Data Pipelines: https://medium.com/@coderunner/debugging-with-git-7afbcd3b9f1e
  • 相关阅读:
    exp 4
    Centos7.9部署snort-2.9.20
    手把手教你配置vscode的c++开发环境(wsl + 远程)
    (只需三步)虚拟机上vm的ubuntu不能联上网怎么办
    feign 全局 与 局部拦截器的区分与使用
    论微软过程
    硬件开发笔记(十七):RK3568底板电路串口、485、usb原理图详解
    JS - 鼠标键盘配置 及 浏览器禁止操作
    P2719 搞笑世界杯 (期望dp
    Linux输入设备应用编程(触摸屏获取坐标信息)
  • 原文地址:https://blog.csdn.net/wangshuai6707/article/details/132921975