• Debezium的基本使用(以MySQL为例)


    • GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。
    • GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。

      一、Debezium介绍

      摘自官网:

      Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

    简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。

    二、基本使用

    下面以MySQL为例介绍Debezium的基本使用。

    1. MySQL的准备工作

    1. 准备一个MySQL用户并且拥有相应权限,像这样:
      CREATE USER 'dbz'@'%' IDENTIFIED BY 'dbzpwd';
      
      • 1

    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'dbz' IDENTIFIED BY 'dbzpwd';

    
    2. 检查MySQL是否开启`log-bin`
    ```sql
    SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';
    
    -- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled...
    -- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如果是OFF则需要修改MySQL配置文件,类似下面这样:

    server-id         = 223344        #必须有
    log_bin           = mysql-bin    #log_bin的值是binlog文件序列的基本名称
    binlog_format     = ROW                #必须是ROW
    binlog_row_image  = FULL            #必须是FULL
    expire_logs_days  = 10                #依据实际情况而定
    • 1
    • 2
    • 3
    • 4
    1. 准备数据库&表
      create database inventory;
      create table inventory.a (id bigint primary key auto_increment, name varchar(32));
      insert into inventory.a values (null, 'n1'),(null, 'n2'),(null, 'n3');
      • 1
      • 2

    2. 编写程序

    2.1. 工程依赖(Maven)

    pom.xml

    
        io.debezium
        debezium-api
        ${version.debezium}
    
    
        io.debezium
        debezium-embedded
        ${version.debezium}
    
    
    
        io.debezium
        debezium-connector-mysql
        ${version.debezium}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    目前Debezium最新稳定版本为:1.9.5.Final

    2.2. 准备数据库&表
    create database inventory;
    create table inventory.a (id bigint primary key, name varchar(32));
    insert into inventory.a values (1, 'n1'),(2, 'n2'),(3, 'n3');
    • 1
    • 2
    2.3. 代码编写
    package com.greatdb.dbzdemo;
    
    import java.io.IOException;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    import io.debezium.engine.ChangeEvent;
    import io.debezium.engine.DebeziumEngine;
    import io.debezium.engine.format.Json;
    
    /**
     * @author wang.jianwen
     * @version 1.0
     * @date 2022/07/29
     */
    public class DebeziumTest {
    
        private static DebeziumEngine> engine;
    
        public static void main(String[] args) throws Exception {
            final Properties props = new Properties();
            props.setProperty("name", "dbz-engine");
            props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
    
            //offset config begin - 使用文件来存储已处理的binlog偏移量
            props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
            props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat");
            props.setProperty("offset.flush.interval.ms", "0");
            //offset config end
    
            props.setProperty("database.server.name", "mysql-connector");
            props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
            props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt");
    
            props.setProperty("database.server.id", "122112");    //需要与MySQL的server-id不同
            props.setProperty("database.hostname", "tmg");
            props.setProperty("database.port", "3306");
            props.setProperty("database.user", "mysqluser");
            props.setProperty("database.password", "mysqlpw");
            props.setProperty("database.include.list", "inventory");//要捕获的数据库名
            props.setProperty("table.include.list", "inventory.a");//要捕获的数据表
    
            props.setProperty("snapshot.mode", "initial");//全量+增量
    
            // 使用上述配置创建Debezium引擎,输出样式为Json字符串格式
            engine = DebeziumEngine.create(Json.class)
                    .using(props)
                    .notifying(record -> {
                        System.out.println(record);//输出到控制台
                    })
                    .using((success, message, error) -> {
                        if (error != null) {
                            // 报错回调
                            System.out.println("------------error, message:" + message + "exception:" + error);
                        }
                        closeEngine(engine);
                    })
                    .build();
    
            ExecutorService executor = Executors.newSingleThreadExecutor();
            executor.execute(engine);
            addShutdownHook(engine);
            awaitTermination(executor);
    
            System.out.println("------------main finished.");
        }
    
        private static void closeEngine(DebeziumEngine> engine) {
            try {
                engine.close();
            } catch (IOException ignored) {
            }
        }
    
        private static void addShutdownHook(DebeziumEngine> engine) {
            Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));
        }
    
        private static void awaitTermination(ExecutorService executor) {
            if (executor != null) {
                try {
                    executor.shutdown();
                    while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    3. 测试

    程序跑起来后,可以看到控制台输出:

    ...(省略)
    EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
    EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
    EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
    ...(省略)
    • 1
    • 2
    • 3
    • 4

    可以看到全量的数据已经输出,关键的数据如下:

    ..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"...
    ..."payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"...
    ..."payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...
    • 1
    • 2
    • 接下来新增一条数据:

      insert into inventory.a values (4, 'n4');

        控制台输出:

        ..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...
        • 修改一条数据:

          update inventory.a set name = 'n4-upd' where id = 4;

            控制台输出:

            ..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...
            • 删除一条数据:

              delete from inventory.a where id = 1;

                控制台输出:

                ..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...

                  三、总结

                  本文以MySQL为例介绍了Debezium在代码中基本使用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕获这些数据行的变化,并记录了数据行变化前后的数据,并对外提供事件流,外部可以获取并对事件进行相应处理。

                参考:https://debezium.io/documentation/reference/1.8/index.html

              • 相关阅读:
                ansible及其模块
                CorelDRAW2024最新版本的图形设计软件
                Java流与链表:探索java.util.stream与LinkedList的交汇点
                程序员的自我修养-编译链接
                下载、安装CAN-EYE植被参数工具
                JAVA学习实战(十二)分库分表学习
                strongSwan对接H3C
                计算机毕业设计Java大学生学科竞赛管理系统(源码+系统+mysql数据库+lw文档)
                格雷希尔GripSeal密封测试接头,你了解多少?
                虹科新闻 | 2022年度TOP智能网联创新企业名单新鲜出炉,虹科榜上有名
              • 原文地址:https://blog.csdn.net/GreatSQL2021/article/details/126436044