• clickhouse读取kafka数据


    #1、需要下载的文件,下载后上传到服务器;如果服务器可以连接互联网,建议参考官方文档,使用在线安装的方式,点击-->:安装 | ClickHouse Docs

    clickhouse-client-20.3.9.70-2.noarch.rpm
    clickhouse-common-static-20.3.9.70-2.x86_64.rpm
    clickhouse-common-static-dbg-20.3.9.70-2.x86_64.rpm
    clickhouse-server-20.3.9.70-2.noarch.rpm

    #2、安装
    rpm -ivh clickhouse-common-static-20.3.9.70-2.x86_64.rpm
    rpm -ivh clickhouse-common-static-dbg-20.3.9.70-2.x86_64.rpm
    rpm -ivh clickhouse-client-20.3.9.70-2.noarch.rpm
    rpm -ivh clickhouse-server-20.3.9.70-2.noarch.rpm

    #3、修改配置:密码为:default
    /etc/clickhouse-server/config.xml
    /etc/clickhouse-server/users.xml

    #4、启动服务
    /etc/init.d/clickhouse-server start

    #5、登录客户端
    clickhouse-client --password 密码明文


    #6、clickhouse的目录结构
    /etc/clickhouse-server:服务端的配置文件目录,包括全局配置 config.xml 和用户配置 users.xml
    /var/lib/clickhouse:默认的数据存储目录,如果是生产环境可以将其修改到空间较大的磁盘挂载路径。可以通过修改
    /etc/clickhouse-server/config.xml 配置文件中 、 标签值来设置。
    /var/log/clickhouse-server:默认的日志保存目录。同样可以通过修改/etc/clickhouse-server/config.xml 配置文件中 和 标签值来设置。
    /etc/cron.d/clickhouse-server:clickhouse server 的一个定时配置,用于恢复因异常中断的ClickHouse 服务进程。
    ~/.clickhouse-client-history (隐藏文件) 所有通过交互式命令行执行的sql历史记录。可使用ll -a命令查看

    测试环境clickhouse地址:
    clickhouse 数据存储位置:/var/lib/clickhouse/

    #1、创建数据库
    CREATE DATABASE IF NOT EXISTS xzire;

    #2、kafka引擎表,作为一个管道接收kafka的流数据
    CREATE TABLE IF NOT EXISTS xzire.ire_web_behaviorinfo_stream (message_web_json String) ENGINE = Kafka() SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'MESSAGE_WEB', kafka_group_name = 'message_web', kafka_format = 'JSONAsString', kafka_max_block_size = 1048576, kafka_row_delimiter = '\n', kafka_num_consumers = 1

    #3、数据存储表
    CREATE TABLE IF NOT EXISTS xzire.ire_web_behaviorinfo (`uuid` String, `key` String, content_id String, content_name String, device_id String, data_time String, server_ip String, server_time String) ENGINE = MergeTree() PARTITION BY `key` PRIMARY KEY `uuid` ORDER BY (uuid,key) SETTINGS index_granularity = 8192;

    #4、物化视图,负责将kafka引擎表的数据实时同步到数据存储表中
    CREATE MATERIALIZED VIEW IF NOT EXISTS xzire.ire_web_behaviorinfo_view TO
        xzire.ire_web_behaviorinfo AS
        SELECT generateUUIDv4() as uuid,
        visitParamExtractString(message_web_json, 'key') as key,
        visitParamExtractString(message_web_json, 'content_id') as content_id,
        visitParamExtractString(message_web_json, 'content_name') as content_name,
        visitParamExtractString(message_web_json, 'device_id') as device_id,
        visitParamExtractString(message_web_json, 'data_time') as data_time,
        visitParamExtractString(message_web_json, 'server_IP') as server_ip,
        visitParamExtractString(message_web_json, 'server_time') as server_time
        FROM xzire.ire_web_behaviorinfo_stream;


    停止接收topic数据或更改转换逻辑需要停用物化视图,更改完之后再启用物化视图
    # 停用
     DETACH TABLE consumer;
    # 启用  
     ATTACH TABLE consumer;

  • 相关阅读:
    PerfView专题 (第十一篇):使用 Diff 功能洞察 C# 内存泄漏增量
    向量数据库库Milvus Cloud2.3 技术选型中性能、成本、扩展性是重点
    黑寡妇(BWO)优化算法(Matlab代码实现)
    【嵌入式linux】Ubuntu 修改用户名
    flex布局入门讲解
    HTML1:html基础
    手写签名到背景上合为1张图
    闭包详解,柯里化的含义及操作方法
    云IDE的简单使用、体验与学习
    查询快递 批量查询物流信息并筛选出无物流信单号
  • 原文地址:https://blog.csdn.net/L564458192/article/details/125504057