• Kafka-Connect集成clickhouse


    一、基本简述

    clickhouse支持和kafka的表双向同步,依靠kafka引擎。

    其大致情况为如下情况:Kafka主题中存在对应的数据格式,Clickhouse创建一个Kafka引擎表(即相当于一个消费者),当主题有消息进入时,获取该消息,将其进行消费,然后物化视图同步插入到MergeTree表中。

    在这里插入图片描述

     该引擎还支持反向写入到Kafka中,即往Kafka引擎表中插入数据,可以同步到Kafka中(同样可以使用物化视图将不同引擎需要的表数据同步插入到Kafka引擎表中)。

    二、修改json串规则

    kafka的配置 vim connect-distributed.properties

    key.converter.schemas.enable=false

    value.converter.schemas.enable=false

    重启kafka-connect  

    sh kafka-connect-restart.sh

    启动命令:sh kafka-connect-start.sh

    /usr/local/kafka/bin/connect-distributed.sh -daemon /usr/local/kafka/config/connect-distributed.properties

    重启命令:sh kafka-connect-restart.sh

    1. ps -ef | grep connect-distributed | grep -v grep | awk '{print $2}' | xargs kill -9
    2. sh kafka-connect-start.sh

    三、准备数据

    1、Mysql准备数据

    1. CREATE TABLE `ckhouse_input` (
    2. `uid` int(11) NOT NULL AUTO_INCREMENT,
    3. `name` varchar(20) NOT NULL,
    4. `age` int(11) NOT NULL,
    5. `phone` varchar(50) default null,
    6. `email` varchar(50) default null,
    7. PRIMARY KEY (`uid`)
    8. ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4;
    9. insert into ckhouse_input( name, age,phone,email)
    10. values('张一',10,'111111111','22222222@ccc'),
    11. ('王二',20,'111111111','22222222@ccc'),
    12. ('赵四',30,'111111111','22222222@ccc'),
    13. ('钱三',40,'111111111','22222222@ccc');

    2、Clickhouse准备数据

    1. #生成引擎表
    2. CREATE TABLE `cktest`.`ckhouse_output` (
    3. `uid` Int16 ,
    4. `name` varchar(20) ,
    5. `age` Int64 ,
    6. `phone` varchar(50) ,
    7. `email` varchar(50)
    8. ) ENGINE=Kafka
    9. SETTINGS kafka_broker_list = '192.168.10.105:9092',
    10. kafka_topic_list = 'topic-sit-ckhouse_input',
    11. kafka_group_name = 'connect-sit-ckhouse_input-download',
    12. kafka_format = 'JSONEachRow',
    13. kafka_num_consumers = 4;
    14. #生成实体表
    15. CREATE TABLE `cktest`.`ckhouse_output_mapper` (
    16. `uid` Int16 ,
    17. `name` varchar(20) ,
    18. `age` Int64 ,
    19. `phone` varchar(50) ,
    20. `email` varchar(50)
    21. ) ENGINE=MergeTree
    22. order by (name,age)
    23. SETTINGS index_granularity = 8192;
    24. #kafka引擎表 到实体表 映射
    25. CREATE MATERIALIZED VIEW `ckhouse_output_mapper_consumer` TO `ckhouse_output_mapper`
    26. AS SELECT *
    27. FROM `ckhouse_output`;
    28. #查看数据
    29. SELECT *from ckhouse_output_mapper;

    3、mysql创建connectors 连接器

    通过Postman POST请求http://192.168.10.105:8083/connectors

    参数:

    {

        "name":"topic-sit-upload-mysql2",

        "config":{

            "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",

            "connection.url":"jdbc:mysql://192.168.10.105:3306/sourcetest?user=root&password=root",

            "table.whitelist":"ckhouse_input",

            "incrementing.column.name":"uid",

            "mode":"incrementing",

            "topic.prefix":"topic-sit-"

        }

    }

    创建完成,之后可以通过连接器页面http://192.168.10.105:8083/connectors查看本连接器topic-sit-upload-mysql2

    4、创建kafka 对应topic

    由于不像Mysql,可以通过创建Sink自动生成topic,需要通过①kafka后台或者②前端 均可以创建topic,比如kafdrop页面新建topic,topic名称为 第三步的 topic.prefix 与 表名 联合体

    topic-sit-ckhouse_input

    ①后台创建topic方式:

    kafka-topics.sh --create --bootstrap-server 192.168.10.105:9092 --topic topic-sit-ckhouse_input --partitions 1 --replication-factor 1

    ②安装kafdrop通过博客: 

    建议通过方法二,不需要重复登陆kafka

    kafka可视化工具整理(七)_无敌小田田的博客-CSDN博客_kafka可视化工具

    5、执行完第三小步与第四小步之后

    对Mysql插入数据,既可以在kafka看到消息队列。又可以在Clickhouse对应表中查看到同步数据

    四、kafkaconnect+clickhouse 速度调研

     连接的最大传输速度

    第一次实验:生成100W条数据。Mysql生成数据耗时 260秒

    kafka获取100W数据,大约80秒

    到ck获取100W数据,大约20秒

    第二次实验:生成100W条数据。Mysql生成数据耗时 246秒

    kafka获取100W数据,68秒

    到ck获取100W数据,大约11秒

    总共: 325秒

  • 相关阅读:
    vue之$emit返回值
    DOM—DOM 事件基础知识点总结
    设计师必收藏的5个配色网站
    etcd的mvcc源码剖析
    java计算机毕业设计基于安卓Android/微信小程序的英语单词学习APP系统
    输入框自动保留2位小数
    Scrum敏捷认证CSM(ScrumMaster)官方课程
    Java的File文件操作案例汇总
    作业错题一
    数据结构与算法-图
  • 原文地址:https://blog.csdn.net/qq_36602951/article/details/127710024