这里记录一下CDC和Kafka的协同工作过程。
这里是针对SQL Server2019进行配置。
一些要点如下:
exec sys.sp_cdc_enable_db
@capture_instance
,否则后面无法关闭表的CDC功能。EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo', -- source_schema
@source_name = 'Test_Table', -- table_name
@capture_instance = 'dbo_Test_Table', -- capture_instance
@supports_net_changes = 1, -- supports_net_changes
@role_name = NULL, -- role_name
@index_name = NULL, -- index_name
@captured_column_list = NULL -- captured_column_list
cdc
开头的表;# select * from cdc.dbo_xxx
select * from cdc.dbo_Test_Table
INSERT
,UPDATE
和DELETE
都会触发CDC产生记录。EXEC sys.sp_cdc_disable_table
@source_schema = 'dbo', -- source_schema
@source_name = 'Test_Table', -- table_name
@capture_instance = 'dbo_Test_Table', -- capture_instance
Kafka是一个高可用的消息系统,适用于数据批处理和流式数据。这里主要介绍如何在Windows上配置Kafka。
tar -zxvf .\apache-zookeeper-xxx.tar.gz
conf
文件夹中复制一份zoo.cfg
配置文件,然后配置data文件夹和log文件夹的地址。bin
文件夹,启动Zookeeper点击zkServer.cmd启动服务端
点击zkCli.cmd启动客户端
tar -zxvf .\apache-zookeeper-xxx.taz
config
文件夹下配置server.properties
文件。# 启动Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
# 创建主题
.\bin\windows\kafka-topics.bat --create --topic test --bootstrap-server localhost:9092
# 查看所有主题
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
# 创建生产者进程
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
# 创建消费者进程
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
Kafka Connect用来连接Kafka和别的系统,是一种通用的连接框架标准。
Kafka Connector的部署有两种方式:分布式(distributed)和标准(standalone),这里主要是介绍使用standalone模式部署。
conf/connect-standalone.properties
。plugin.path
,之后自行下载的connector都要放在该路径下才能生效。offset.storage.file.filename
文件是否存在,如果不存在要手动新建一个offsets文件,否则运行会出错。Kafka自带了FileStreamSinkConnector和FileStreamSourceConnector这两个Connector,可以用于测试。
在conf/connect-file-source.properties
中配置:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=./test.txt # 监控输入的文件路径
topic=connect-test # Kafka主题名称
connect-file-sink.properties
中配置:name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=./test.sink.txt
topics=connect-test
# 用standalone形式启动connector进程
.\bin\windows\connect-standalone.bat config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
test.sink.txt
和test.txt
。在test.txt
输入内容,test.sink.txt
会自动输出相同的内容。下载连接器存档debezium-connector-sqlserver-1.9.5.Final-plugin.tar.gz
,解压后把文件夹放到上面配置的plugin.path
路径下即可。
启动Debezium connector需要使用REST API格式。REST API是一种JSON格式,用于HTTP协议的数据传输,可以用Postman软件进行管理和配置。
Postman的一些使用教程:API测试之Postman使用完全指南(Postman教程,这篇文章就够了) 。
官方给的REST API配置样例如下:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "192.168.99.100",
"database.port": "1433",
"database.user": "sa",
"database.password": "Password!",
"database.dbname": "testDB",
"database.server.name": "fullfillment",
"table.include.list": "dbo.customers",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.fullfillment"
}
}
GET /connectors:返回所有正在运行的 connector 名
POST /connectors:新建一个 connector;请求体必须是 json 格式并且需要包含 name 字段和 config 字段,name 是 connector 的名字,config 是 json 格式,必须包含你的 connector 的配置信息。
GET /connectors/{name}:获取指定 connetor 的信息
GET /connectors/{name}/config:获取指定 connector 的配置信息
PUT /connectors/{name}/config:更新指定 connector 的配置信息
GET /connectors/{name}/status:获取指定 connector 的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks:获取指定 connector 正在运行的 task。
GET /connectors/{name}/tasks/{taskid}/status:获取指定 connector 的 task 的状态信息
PUT /connectors/{name}/pause:暂停 connector 和它的 task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume:恢复一个被暂停的 connector
POST /connectors/{name}/restart:重启一个 connector,尤其是在一个 connector 运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart:重启一个 task,一般是因为它运行失败才这样做。
DELETE /connectors/{name}:删除一个 connector,停止它的所有 task 并删除配置。
例如:
# 建立新的connector进程
POST http://192.168.1.133:8083/connectors
# 查看所有的connector进程
GET http://192.168.1.133:8083/connectors
# 查看已安装的connector插件
GET http://192.168.1.133:8083/connector-plugins
# 查看connector进程状态
GET http://192.168.1.133:8083/connectors/test-sqlserver-connector/status
# 删除某个connector进程
DELETE http://192.168.1.133:8083/connectors/test-sqlserver-connector
.bat
启动的,所以在使用REST API发送POST配置之前,一定要确保已经启动了Connector服务。.bat
在控制台启动,但暂时还不清楚如何配置,官方推荐也是用REST API启动的,而且强调了在启动前必须要先启动Connector服务。因为生产者已经由上面的CDC充当,所以为了响应CDC的消息,这里用JAVA来实现生产者进程。
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-clientsartifactId>
<version>3.1.1version>
dependency>
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
// disable auto commit of offsets
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 一次性提交所有已经poll过的消息,即poll方法返回的最大偏移量
consumer.commitSync();
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class Kafka_Test {
public static void main(String[] args) {
Properties prop=new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.133:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
// latest收最新的数据 none会报错 earliest最早的数据
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1");
// 每次poll只拉取2条消息
prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("fullfillment.dbo.Test_RDF_extractor"));//订阅
while (true){
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : poll) {
System.out.println(record.offset()+"\t"+record.key()+"\t"+record.value());
// 解析JSON值
JSONObject json_record_value = JSON.parseObject(record.value());
JSONObject json_record_value_after = json_record_value.getJSONObject("after");
String ext_code = json_record_value_after.getString("ext_code");
System.out.println(ext_code);
// 细粒度提交消息Offset
// 构建提交参数,包括partition和offset的信息
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
// 使用下一个偏移量作为提交的值,下一次就从这里开始拉取消息
offsetMap.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata"));
// 提交指定的offset
consumer.commitSync(offsetMap);
}
}
}
}