• IDEA03:数据库CDC、Kafka和连接器Debezium配置


    写在前面

    这里记录一下CDC和Kafka的协同工作过程。

    • CDC(Change Data Capture:变更数据捕获)是数据库的一项功能,能够监控数据库表的变化。
    • Kafka是一种分布式消息系统。
    • 这里协同的目的是让CDC监控数据库表的更新,然后将更新发布到Kafka,最后让消费者响应这个更新。
    • 另外还用到了一个CDC和Kafka之间的连接器,叫Debezium

    一、配置数据库CDC

    这里是针对SQL Server2019进行配置。

    一些要点如下:

    • 在SSMS的工具栏中点击新建查询,打开查询脚本;
    • 执行下面语句,打开数据库的CDC功能(默认关闭);
    exec sys.sp_cdc_enable_db 
    
    • 1
    EXEC sys.sp_cdc_enable_table
            @source_schema = 'dbo', -- source_schema
            @source_name = 'Test_Table', -- table_name
            @capture_instance = 'dbo_Test_Table', -- capture_instance
            @supports_net_changes = 1, -- supports_net_changes
            @role_name = NULL, -- role_name
            @index_name = NULL, -- index_name
            @captured_column_list = NULL -- captured_column_list
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 注意检查SQL Server代理是否开启,如果不开启是不能启用CDC的。
    • SQL Server Configuration Manager->SQL Server服务中也可以开启SQL Server代理。

    SQL Server代理

    • 成功开启CDC后,在系统表里面会有6个以cdc开头的表;
    • 执行下列语句应该有返回值。
    # select * from cdc.dbo_xxx
    select * from cdc.dbo_Test_Table
    
    • 1
    • 2
    • 此时对数据库表执行INSERTUPDATEDELETE都会触发CDC产生记录。
    • 注意:如果更改了开启CDC的数据表的结构或者某些属性的设置,CDC功能就会停用,需要重新开启。
    • 关闭表的CDC功能:
    EXEC sys.sp_cdc_disable_table
    		@source_schema = 'dbo', -- source_schema
            @source_name = 'Test_Table', -- table_name
    		@capture_instance = 'dbo_Test_Table', -- capture_instance
    
    • 1
    • 2
    • 3
    • 4

    二、配置Kafka

    Kafka是一个高可用的消息系统,适用于数据批处理和流式数据。这里主要介绍如何在Windows上配置Kafka。

    1.安装Zookeeper

    下载

    • 用cmd或者powershell来解压:
    tar -zxvf .\apache-zookeeper-xxx.tar.gz
    
    • 1
    • 解压之后将解压的文件夹整个移动到常用的软件安装目录即可。
    • conf文件夹中复制一份zoo.cfg配置文件,然后配置data文件夹和log文件夹的地址。

    zoo.cfg配置文件

    • 进入bin文件夹,启动Zookeeper

    点击zkServer.cmd启动服务端
    点击zkCli.cmd启动客户端

    2.安装Kafka

    下载

    • 用cmd或者powershell来解压:
    tar -zxvf .\apache-zookeeper-xxx.taz
    
    • 1
    • 解压之后将解压的文件夹整个移动到常用的软件安装目录即可。
    • config文件夹下配置server.properties文件。
    • 注意,运行Kafka之前一定要先启动Zookeeper
    • 回到根目录,下面是一些常用的命令:
    # 启动Kafka
    .\bin\windows\kafka-server-start.bat .\config\server.properties
    # 创建主题
    .\bin\windows\kafka-topics.bat --create --topic test --bootstrap-server localhost:9092
    # 查看所有主题
    .\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
    # 创建生产者进程
    .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
    # 创建消费者进程
    .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    三、配置Kafka Connector

    Kafka Connect用来连接Kafka和别的系统,是一种通用的连接框架标准。

    Kafka Connector的部署有两种方式:分布式(distributed)和标准(standalone),这里主要是介绍使用standalone模式部署。

    1.配置配置文件

    • 打开配置文件conf/connect-standalone.properties
    • 检查一下 bootstrap.servers是否为Kafka的地址和端口
    • 重点是设置plugin.path,之后自行下载的connector都要放在该路径下才能生效。
    • 检查offset.storage.file.filename文件是否存在,如果不存在要手动新建一个offsets文件,否则运行会出错。

    配置

    2.配置Connector测试

    • Kafka自带了FileStreamSinkConnectorFileStreamSourceConnector这两个Connector,可以用于测试。

    • conf/connect-file-source.properties中配置:

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=./test.txt  # 监控输入的文件路径
    topic=connect-test  # Kafka主题名称
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • connect-file-sink.properties 中配置:
    name=local-file-sink
    connector.class=FileStreamSink
    tasks.max=1
    file=./test.sink.txt
    topics=connect-test
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 用命令行启动这两个connector
    # 用standalone形式启动connector进程
    .\bin\windows\connect-standalone.bat config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
    
    • 1
    • 2
    • 在Kafka根目录下新建两个文件test.sink.txttest.txt。在test.txt输入内容,test.sink.txt会自动输出相同的内容。

    3.配置Debezium Connector

    部署过程

    • 下载连接器存档debezium-connector-sqlserver-1.9.5.Final-plugin.tar.gz,解压后把文件夹放到上面配置的plugin.path路径下即可。

    • 启动Debezium connector需要使用REST API格式。REST API是一种JSON格式,用于HTTP协议的数据传输,可以用Postman软件进行管理和配置。

    • Postman的一些使用教程:API测试之Postman使用完全指南(Postman教程,这篇文章就够了)

    • 官方给的REST API配置样例如下:

    {
        "name": "inventory-connector", 
        "config": {
            "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
            "database.hostname": "192.168.99.100", 
            "database.port": "1433", 
            "database.user": "sa", 
            "database.password": "Password!", 
            "database.dbname": "testDB", 
            "database.server.name": "fullfillment", 
            "table.include.list": "dbo.customers", 
            "database.history.kafka.bootstrap.servers": "kafka:9092", 
            "database.history.kafka.topic": "dbhistory.fullfillment" 
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 属性的说明如下:

    属性说明

    • 使用Postman发送配置信息即可启动Debezium Connector
    • 常用的一些REST API接口:
    GET /connectors:返回所有正在运行的 connector 名
    POST /connectors:新建一个 connector;请求体必须是 json 格式并且需要包含 name 字段和 config 字段,name 是 connector 的名字,config 是 json 格式,必须包含你的 connector 的配置信息。
    GET /connectors/{name}:获取指定 connetor 的信息
    GET /connectors/{name}/config:获取指定 connector 的配置信息
    PUT /connectors/{name}/config:更新指定 connector 的配置信息
    GET /connectors/{name}/status:获取指定 connector 的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
    GET /connectors/{name}/tasks:获取指定 connector 正在运行的 task。
    GET /connectors/{name}/tasks/{taskid}/status:获取指定 connector 的 task 的状态信息
    PUT /connectors/{name}/pause:暂停 connector 和它的 task,停止数据处理知道它被恢复。
    PUT /connectors/{name}/resume:恢复一个被暂停的 connector
    POST /connectors/{name}/restart:重启一个 connector,尤其是在一个 connector 运行失败的情况下比较常用
    POST /connectors/{name}/tasks/{taskId}/restart:重启一个 task,一般是因为它运行失败才这样做。
    DELETE /connectors/{name}:删除一个 connector,停止它的所有 task 并删除配置。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    例如:

    # 建立新的connector进程
    POST http://192.168.1.133:8083/connectors
    # 查看所有的connector进程
    GET http://192.168.1.133:8083/connectors
    # 查看已安装的connector插件
    GET http://192.168.1.133:8083/connector-plugins
    # 查看connector进程状态
    GET http://192.168.1.133:8083/connectors/test-sqlserver-connector/status
    # 删除某个connector进程
    DELETE http://192.168.1.133:8083/connectors/test-sqlserver-connector
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    4.一些注意

    • 由于Debezium Connector不是由配置文件从控制台用.bat启动的,所以在使用REST API发送POST配置之前,一定要确保已经启动了Connector服务。
    • 但由于Connector服务启动时一定要加上某个特定Connector的配置文件作为参数,所以这时候用默认自带的FileStreamSinkConnectorFileStreamSourceConnector作为参数就很合适,尽管这两个Connector我们实际中并不需要。
    • 或许Debezium Connector也可以用配置文件+.bat在控制台启动,但暂时还不清楚如何配置,官方推荐也是用REST API启动的,而且强调了在启动前必须要先启动Connector服务。

    四、创建JAVA的消费者

    因为生产者已经由上面的CDC充当,所以为了响应CDC的消息,这里用JAVA来实现生产者进程。

    1.首先是导入Maven依赖包:

    
    <dependency>
           <groupId>org.apache.kafkagroupId>
           <artifactId>kafka-clientsartifactId>
           <version>3.1.1version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.配置fastjson库

    3.Debezium Connector的CDC消息

    • 使用poll可以从订阅的主题中获取消息,它的参数的含义是如果没有消息等待多久返回empty。
    • 注意,poll不是每次只拉取一条消息的,而是可以一次性拉取很多条,默认最大是500。
    • 关于poll的详细介绍可以参考博文:Apache Kafka(九)- Kafka Consumer 消费行为
    ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
    
    • 1
    • 如果不commit消息,那么Kafka会认为这条消息还没有被处理成功,下次重启消费者进程后仍然会poll到这些消息。
    • commit消息的方式有两种:自动commit和手动commit,可以参考博文:Kafka消费消息自动提交与手动提交。。
    • 手动commit设置如下:
    // disable auto commit of offsets
     properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    // 一次性提交所有已经poll过的消息,即poll方法返回的最大偏移量
    consumer.commitSync();
    
    • 1
    • 2
    • 3
    • 4

    4.一个消费者进程demo

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class Kafka_Test {
        public static void main(String[] args) {
            Properties prop=new Properties();
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.133:9092");
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
            prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
            prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
            prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
            // latest收最新的数据 none会报错 earliest最早的数据
            prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
            prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1");
            // 每次poll只拉取2条消息
            prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
            // 创建消费者
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
            consumer.subscribe(Collections.singleton("fullfillment.dbo.Test_RDF_extractor"));//订阅
    
            while (true){
                ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : poll) {
                    System.out.println(record.offset()+"\t"+record.key()+"\t"+record.value());
    
                    // 解析JSON值
                    JSONObject json_record_value = JSON.parseObject(record.value());
                    JSONObject json_record_value_after = json_record_value.getJSONObject("after");
                    String ext_code = json_record_value_after.getString("ext_code");
                    System.out.println(ext_code);
    
                    // 细粒度提交消息Offset
                    // 构建提交参数,包括partition和offset的信息
                    Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
                    // 使用下一个偏移量作为提交的值,下一次就从这里开始拉取消息
                    offsetMap.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1, "no metadata"));
                    // 提交指定的offset
                    consumer.commitSync(offsetMap);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
  • 相关阅读:
    【Spring系列】- Spring循环依赖
    美国市场三星手机超苹果 中国第一属华为
    Kafka - 消息队列的两种模式
    如何围绕用户数字化运营?
    VUE3照本宣科——内置指令与自定义指令及插槽
    Linux17 jdk tomcat idea mysql在linux上安装
    自己动手实现rpc框架(二) 实现集群间rpc通信
    02-《人月神话》霍金敬酒和虫族战争-中译本纠错及联想
    Flask+LayUI开发手记(一):LayUI表格的前端数据分页展现
    1.4_28 Axure RP 9 for mac 高保真原型图 - 案例27【中继器 - 后台管理系统5】功能-弹窗修改数据
  • 原文地址:https://blog.csdn.net/weixin_43992162/article/details/126165369