• 探索ClickHouse——使用MaterializedView存储kafka传递的数据



    《探索ClickHouse——连接Kafka和Clickhouse》中,我们讲解了如何使用kafka engin连接kafka,并读取topic中的数据。但是遇到了一个问题,就是数据只能读取一次,即使后面还有新数据发送到该topic,该表也读不出来。
    为了解决这个问题,我们引入MaterializedView。

    创建表

    该表结构直接借用了《探索ClickHouse——使用Projection加速查询》中的表结构。

    CREATE TABLE materialized_uk_price_paid_from_kafka ( price UInt32, date Date, postcode1 LowCardinality(String), postcode2 LowCardinality(String), type Enum8('terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4, 'other' = 0), is_new UInt8, duration Enum8('freehold' = 1, 'leasehold' = 2, 'unknown' = 0), addr1 String, addr2 String, street LowCardinality(String), locality LowCardinality(String), town LowCardinality(String), district LowCardinality(String), county LowCardinality(String) ) ENGINE = MergeTree ORDER BY (postcode1, postcode2, addr1, addr2);
    
    • 1

    CREATE TABLE materialized_uk_price_paid_from_kafka
    (
    price UInt32,
    date Date,
    postcode1 LowCardinality(String),
    postcode2 LowCardinality(String),
    type Enum8(‘terraced’ = 1, ‘semi-detached’ = 2, ‘detached’ = 3, ‘flat’ = 4, ‘other’ = 0),
    is_new UInt8,
    duration Enum8(‘freehold’ = 1, ‘leasehold’ = 2, ‘unknown’ = 0),
    addr1 String,
    addr2 String,
    street LowCardinality(String),
    locality LowCardinality(String),
    town LowCardinality(String),
    district LowCardinality(String),
    county LowCardinality(String)
    )
    ENGINE = MergeTree
    ORDER BY (postcode1, postcode2, addr1, addr2)
    Query id: 55b16049-a865-4d54-9333-d661c6280a09
    Ok.
    0 rows in set. Elapsed: 0.005 sec.

    创建MaterializedView

    CREATE MATERIALIZED VIEW uk_price_paid_from_kafka_consumer_view TO materialized_uk_price_paid_from_kafka AS SELECT splitByChar(' ', postcode) AS p, toUInt32(price_string) AS price, parseDateTimeBestEffortUS(time) AS date, p[1] AS postcode1, p[2] AS postcode2, transform(a, ['T', 'S', 'D', 'F', 'O'], ['terraced', 'semi-detached', 'detached', 'flat', 'other']) AS type, b = 'Y' AS is_new, transform(c, ['F', 'L', 'U'], ['freehold', 'leasehold', 'unknown']) AS duration, addr1, addr2, street, locality, town, district, county FROM uk_price_paid_from_kafka;
    
    • 1

    这样kafka topic中的数据被清洗到materialized_uk_price_paid_from_kafka表中。

    查询

    select * from materialized_uk_price_paid_from_kafka;
    
    • 1

    在这里插入图片描述
    我们在给topic发送下面的内容

    “{5FA8692E-537B-4278-8C67-5A060540506D}”,“19500”,“1995-01-27 00:00”,“SK10 2QW”,“T”,“N”,“L”,“38”,“”,“GARDEN STREET”,“MACCLESFIELD”,“MACCLESFIELD”,“MACCLESFIELD”,“CHESHIRE”,“A”,“A”

    再查询表

    select * from materialized_uk_price_paid_from_kafka;
    
    • 1

    在这里插入图片描述

  • 相关阅读:
    1.2 课程架构介绍:STM32H5 芯片生命周期管理与安全调试
    SpringBoot学习笔记(六)——Redis数据库
    【OpenMv】颜色模式之Lab
    LeetCode:两数之和
    [Python]Django 模型
    Halo 开源项目学习(四):发布文章与页面
    RabbitMQ(基于AMQP的开源消息代理软件)
    python习题001-----分支结构
    论文剽窃者“自爆家门”?CVPR 最后一天上演“一出好戏 ”!
    CFS内网穿透靶场实战
  • 原文地址:https://blog.csdn.net/breaksoftware/article/details/133388633