目录
七、安装配置 Debezium-Connector-MySQL 插件
(3)以 distributed 方式启动 Kafka connect
(4)确认 connector 插件和自动生成的 topic
本文介绍从 MySQL 作为源到 ClickHouse 作为目标的整个过程。MySQL 数据库更改通过 Debezium 捕获,并作为事件发布在到 Kafka 上。ClickHouse 通过 Kafka 表引擎按部分顺序应用这些更改,实时并保持最终一致性。相关软件版本如下:
这种方案的优点之一是可以做到 ClickHouse 与 MySQL 的数据最终严格一致。
总体结构如下图所示。
ClickHouse 是由四个实例构成的两分片、每分片两副本集群,票选和协调器使用 ClickHouse 自带的 keeper 组件。分片、副本、keeper 节点、Zookeeper集群、Kafaka集群、Debezium-Connector-MySQL 插件的部署如下表所示。
IP | 主机名 | 实例角色 | ClickHouse Keeper | Zookeeper | Kafka | Debezium Connector MySQL |
172.18.4.126 | node1 | 分片1副本1 | * | |||
172.18.4.188 | node2 | 分片1副本2 | * | * | * | * |
172.18.4.71 | node3 | 分片2副本1 | * | * | * | * |
172.18.4.86 | node4 | 分片2副本2 | * | * | * |
配置好主从复制后,在主库创建测试库表及数据:
- -- 建库
- create database test;
-
- -- 建表
- create table test.t1 (
- id bigint(20) not null auto_increment,
- remark varchar(32) default null comment '备注',
- createtime timestamp not null default current_timestamp comment '创建时间',
- primary key (id));
-
- -- 插入三条测试数据
- insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');
- commit;
在 node2 上执行以下步骤。
mkdir $KAFKA_HOME/plugins
- cd ~
- # debezium-connector-mysql
- unzip debezium-debezium-connector-mysql-2.4.2.zip -d $KAFKA_HOME/plugins/
- # 先备份
- cp $KAFKA_HOME/config/connect-distributed.properties $KAFKA_HOME/config/connect-distributed.properties.bak
- # 编辑 connect-distributed.properties 文件
- vim $KAFKA_HOME/config/connect-distributed.properties
内容如下:
- bootstrap.servers=node2:9092,node3:9092,node4:9092
- group.id=connect-cluster
- key.converter=org.apache.kafka.connect.json.JsonConverter
- value.converter=org.apache.kafka.connect.json.JsonConverter
- key.converter.schemas.enable=false
- value.converter.schemas.enable=false
- offset.storage.topic=connect-offsets
- offset.storage.replication.factor=3
- offset.storage.partitions=3
- config.storage.topic=connect-configs
- config.storage.replication.factor=3
- status.storage.topic=connect-status
- status.storage.replication.factor=3
- status.storage.partitions=3
- offset.flush.interval.ms=10000
- plugin.path=/root/kafka_2.13-3.7.0/plugins
- scp $KAFKA_HOME/config/connect-distributed.properties node3:$KAFKA_HOME/config/
- scp $KAFKA_HOME/config/connect-distributed.properties node4:$KAFKA_HOME/config/
- scp -r $KAFKA_HOME/plugins node3:$KAFKA_HOME/
- scp -r $KAFKA_HOME/plugins node4:$KAFKA_HOME/
- connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
- # 确认日志是否有 ERROR
- grep ERROR ~/kafka_2.13-3.7.0/logs/connectDistributed.out
查看连接器插件:
curl -X GET http://node2:8083/connector-plugins | jq
从输出中可以看到,Kafka connect 已经识别到了 MySqlConnector source 插件:
- [root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connector-plugins | jq
- % Total % Received % Xferd Average Speed Time Time Time Current
- Dload Upload Total Spent Left Speed
- 100 403 100 403 0 0 3820 0 --:--:-- --:--:-- --:--:-- 3838
- [
- {
- "class": "io.debezium.connector.mysql.MySqlConnector",
- "type": "source",
- "version": "2.4.2.Final"
- },
- {
- "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
- "type": "source",
- "version": "3.7.0"
- },
- {
- "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
- "type": "source",
- "version": "3.7.0"
- },
- {
- "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
- "type": "source",
- "version": "3.7.0"
- }
- ]
- [root@vvml-yz-hbase-test~]#
查看 topic:
kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
从输出中可以看到,Kafka connect 启动时自动创建了 connect-configs、connect-offsets、connect-status 三个 topic:
- [root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
- __consumer_offsets
- connect-configs
- connect-offsets
- connect-status
- [root@vvml-yz-hbase-test~]#
Debezium 是一个众所周知的用于读取和解析 MySQL Binlog 的工具。它将 KafkaConnect 作为一个连接器进行集成,并对 Kafka 主题进行每一次更改。
默认情况下,Debezium 会向 Kafka 发出每个操作的前状态和后状态的每条记录,这很难被 ClickHouse Kafka 表解析。此外,在执行删除操作的情况下(Clickhouse 同样无法解析),它会创建 tombstone 记录,即具有 Null 值的记录。下表展示了这个行为。
操作 | 操作前 | 操作后 | 附加记录 |
Create | Null | 新纪录 | - |
Update | 更新前的记录 | 更新后的记录 | - |
Delete | 删除前的记录 | Null | 墓碑记录 |
在 Debezium 配置中使用 ExtractNewRecod 转换器来处理此问题。由于有了这个选项,Debezium 只为创建/更新操作保留 after 状态,而忽略 before 状态。但缺点是,它删除了包含先前状态的 Delete 记录和墓碑记录,换句话说就是不再捕获删除操作。紧接着说明如何解决这个问题。
- "transforms": "unwrap",
- "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
要捕获删除操作,必须添加如下所示的重写配置:
"transforms.unwrap.delete.handling.mode":"rewrite"
Debezium 使用此配置添加字段 __deleted,对于 delete 操作为 true,对于其他操作为 false。因此,删除将包含以前的状态以及 __deleted:true 字段。
在提供上述配置的情况下,更新记录(主键除外的每一列)会发出一个具有新状态的简单记录。通常在关系数据库系统中,更新后的记录会替换前一个记录,但在 ClickHouse 不行。出于性能考虑,ClickHouse 将行级更新变为多版本插入。在本示例中,MySQL 中的 test.t1 表以 id 列为主键,如果更新了 remark 列,在 ClikHouse 中,最终会得到重复的记录,这意味着 id 相同,但 remark 不同!
幸运的是有办法应付这种情况。默认情况下,Debezium 会创建一个删除记录和一个创建记录,用于更新主键。因此,如果源更新 id,它会发出一个带有前一个 id 的删除记录和一个带有新 id 的创建记录。带有 __deleted=ture 字段的前一个记录将替换 CH 中的 stall 记录。然后,可以在视图中过滤暗示删除的记录。可以使用以下选项将此行为扩展到其他列:
"message.key.columns": "test.t1:id;test.t1:remark;test.t1:createtime"
注意:
通过更改连接器的键列,Debezium 将这些列用作主键,而不是源表的默认主键。因此,与数据库的一条记录相关的不同操作可能最终会出现在 Kafka 中的其他分区。由于记录在不同分区中失去顺序,除非确保 ClickHouse 顺序键和 Debezium 消息键相同,否则可能会导致 Clikchouse 中的数据不一致。
经验法则如下:
现在,通过将上述所有选项和常用选项放在一起,将拥有一个功能齐全的 Debezium 配置,能够处理 ClickHouse 所需的任何更改。
- # 编辑文件
- vim $KAFKA_HOME/plugins/source-mysql.json
内容如下:
- {
- "name": "mysql-source-connector",
- "config": {
- "connector.class": "io.debezium.connector.mysql.MySqlConnector",
- "database.hostname": "172.18.16.156",
- "database.port": "3307",
- "database.user": "dba",
- "database.password": "123456",
- "database.server.id": "1563307",
- "database.server.name": "dbserver1",
- "database.include.list": "test",
- "table.include.list": "test.t1",
- "topic.prefix": "mysql-clickhouse-test",
- "schema.history.internal.kafka.bootstrap.servers": "node2:9092,node3:9092,node4:9092",
- "schema.history.internal.kafka.topic": "schemahistory.mysql-clickhouse-test",
- "message.key.columns": "test.t1:id;test.t1:remark;test.t1:createtime",
- "transforms":"unwrap",
- "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
- "transforms.unwrap.delete.handling.mode": "rewrite"
- }
- }
- # 创建 connector
- curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"; echo
- # 查看 connector 状态
- curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
- # 查看 topic
- kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
从输出中可以看到,mysql-source-connector 状态为 RUNNING,并自动创建了三个 topic:
- [root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"; echo
- HTTP/1.1 201 Created
- Date: Thu, 25 Apr 2024 03:47:26 GMT
- Location: http://node2:8083/connectors/mysql-source-connector
- Content-Type: application/json
- Content-Length: 818
- Server: Jetty(9.4.53.v20231009)
-
- {"name":"mysql-source-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"172.18.16.156","database.port":"3307","database.user":"dba","database.password":"123456","database.server.id":"1563307","database.server.name":"dbserver1","database.include.list":"test","table.include.list":"test.t1","topic.prefix":"mysql-clickhouse-test","schema.history.internal.kafka.bootstrap.servers":"node2:9092,node3:9092,node4:9092","schema.history.internal.kafka.topic":"schemahistory.mysql-clickhouse-test","message.key.columns":"test.t1:id;test.t1:remark;test.t1:createtime","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.delete.handling.mode":"rewrite","name":"mysql-source-connector"},"tasks":[],"type":"source"}
- [root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
- % Total % Received % Xferd Average Speed Time Time Time Current
- Dload Upload Total Spent Left Speed
- 100 182 100 182 0 0 24045 0 --:--:-- --:--:-- --:--:-- 26000
- {
- "name": "mysql-source-connector",
- "connector": {
- "state": "RUNNING",
- "worker_id": "172.18.4.188:8083"
- },
- "tasks": [
- {
- "id": 0,
- "state": "RUNNING",
- "worker_id": "172.18.4.188:8083"
- }
- ],
- "type": "source"
- }
- [root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
- __consumer_offsets
- connect-configs
- connect-offsets
- connect-status
- mysql-clickhouse-test
- mysql-clickhouse-test.test.t1
- schemahistory.mysql-clickhouse-test
- [root@vvml-yz-hbase-test~]#
ClickHouse 可以利用 Kafka 表引擎将 Kafka 记录放入一个表中。需要定义三个对象:Kafka 表、主表和消费者物化视图。
create database db2 on cluster cluster_2S_2R;
- CREATE TABLE db2.kafka_t1 on cluster cluster_2S_2R
- (
- `id` Int64,
- `remark` Nullable(String),
- `createtime` String,
- `__deleted` String
- )
- ENGINE = Kafka('node2:9092,node3:9092,node4:9092', 'mysql-clickhouse-test.test.t1', 'clickhouse', 'JSONEachRow');
主表具有源结构和 __deleted 字段。这里使用的是 ReplicatedReplacingMergeTree,因为需要用已删除或更新的记录替换 stall 记录。
- -- 创建本地表
- CREATE TABLE db2.stream_t1 on cluster cluster_2S_2R
- (
- `id` Int64,
- `remark` Nullable(String),
- `createtime` timestamp,
- `__deleted` String
- )
- ENGINE = ReplicatedReplacingMergeTree(
- '/clickhouse/tables/{shard}/db2/t1',
- '{replica}'
- )
- ORDER BY (id, createtime)
- SETTINGS index_granularity = 8192;
-
- -- 创建分布式表,以源表的主键 id 作为分片键,保证同一 id 的数据落在同一分片上
- create table db2.t1_replica_all on cluster cluster_2S_2R
- as db2.stream_t1
- engine = Distributed(cluster_2S_2R, db2, stream_t1, id);
在创建物化视图前,先停止MySQL从库的复制。从库停止复制,不影响主库的正常使用,也就不会影响业务。此时从库的数据处于静止状态,不会产生变化,这使得获取存量数据变得轻而易举。然后创建物化视图时会自动将数据写入 db2.t1_replica_all 对应的本地表中。之后在 ClickHouse 集群中的任一实例上,都能从物化视图中查询到一致的 MySQL 存量数据。
- -- MySQL 从库停止复制
- stop slave;
Kafka 表的每一条记录只读取一次,因为它的消费者组会改变偏移量,不能读取两次。因此,需要定义一个主表,并通过物化视图将每个 Kafka 表记录具化到它:
- -- 注意时间戳的处理
- CREATE MATERIALIZED VIEW db2.consumer_t1 on cluster cluster_2S_2R
- TO db2.t1_replica_all
- (
- `id` Int64,
- `remark` Nullable(String),
- `createtime` timestamp,
- `__deleted` String
- ) AS
- SELECT id, remark, addHours(toDateTime(substring(createtime,1,length(createtime)-1)),8) createtime, __deleted FROM db2.kafka_t1;
最后需要过滤每个被删除的记录,并拥有最新的记录,以防不同的记录具有相同的排序键。可以定义一个简单的视图来隐式完成这项工作:
- CREATE VIEW db2.t1 on cluster cluster_2S_2R
- (
- `id` Int64,
- `remark` Nullable(String),
- `createtime` String,
- `__deleted` String
- ) AS
- SELECT *
- FROM db2.consumer_t1
- FINAL
- WHERE __deleted = 'false';
从 clickhouse 视图查询存量数据:
- vvml-yz-hbase-test.172.18.4.126 :) select * from db2.t1;
-
- SELECT *
- FROM db2.t1
-
- Query id: 2a51fd5e-6b4f-4b78-b522-62b7be32535b
-
- ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
- │ 2 │ 第二行:row2 │ 2024-04-25 11:51:07 │ false │
- └────┴──────────────┴─────────────────────┴───────────┘
- ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
- │ 1 │ 第一行:row1 │ 2024-04-25 11:51:07 │ false │
- │ 3 │ 第三行:row3 │ 2024-04-25 11:51:07 │ false │
- └────┴──────────────┴─────────────────────┴───────────┘
-
- 3 rows in set. Elapsed: 0.007 sec.
-
- vvml-yz-hbase-test.172.18.4.126 :)
可以看到,存量数据已经与 MySQL 同步。
- -- MySQL 主库修改数据
- insert into test.t1 (remark) values ('第四行:row4');
- update test.t1 set remark = '第五行:row5' where id = 4;
- delete from test.t1 where id =1;
- insert into test.t1 (remark) values ('第六行:row6');
-
- -- MySQL 从库启动复制
- start slave;
此时 MySQL 的数据如下:
- mysql> select * from test.t1;
- +----+------------------+---------------------+
- | id | remark | createtime |
- +----+------------------+---------------------+
- | 2 | 第二行:row2 | 2024-04-25 11:51:07 |
- | 3 | 第三行:row3 | 2024-04-25 11:51:07 |
- | 4 | 第五行:row5 | 2024-04-25 11:56:29 |
- | 5 | 第六行:row6 | 2024-04-25 11:56:29 |
- +----+------------------+---------------------+
- 4 rows in set (0.00 sec)
从 clickhouse 视图查询增量数据:
- vvml-yz-hbase-test.172.18.4.126 :) select * from db2.t1;
-
- SELECT *
- FROM db2.t1
-
- Query id: b34bb37b-091b-490e-b55b-a0e9eedf5573
-
- ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
- │ 2 │ 第二行:row2 │ 2024-04-25 11:51:07 │ false │
- └────┴──────────────┴─────────────────────┴───────────┘
- ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
- │ 4 │ 第五行:row5 │ 2024-04-25 11:56:29 │ false │
- └────┴──────────────┴─────────────────────┴───────────┘
- ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
- │ 3 │ 第三行:row3 │ 2024-04-25 11:51:07 │ false │
- └────┴──────────────┴─────────────────────┴───────────┘
- ┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
- │ 5 │ 第六行:row6 │ 2024-04-25 11:56:29 │ false │
- └────┴──────────────┴─────────────────────┴───────────┘
-
- 4 rows in set. Elapsed: 0.008 sec.
-
- vvml-yz-hbase-test.172.18.4.126 :)
可以看到,增量数据已经与 MySQL 同步,现在从 ClickHouse 视图查询的数据与 MySQL 一致。
查看 Kafka 消费:
kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group clickhouse
输出如下:
- [root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group clickhouse
-
- GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
- clickhouse mysql-clickhouse-test.test.t1 0 8 8 0 ClickHouse-vvml-yz-hbase-test.172.18.4.126-db2-kafka_t1-26e6aa8e-1f08-4491-8af7-f1822f1a7e94 /172.18.4.126 ClickHouse-vvml-yz-hbase-test.172.18.4.126-db2-kafka_t1
- [root@vvml-yz-hbase-test~]#
可以看到,最后被消费的消息偏移量是8,MySQL 的存量、增量数据都已经通过 Kafka 消息同步到了 ClickHouse。