GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。
GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。
一、Debezium介绍
摘自官网:
Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream , and applications simply read these streams to see the change events in the same order in which they occurred.
简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。
二、基本使用
下面以MySQL为例介绍Debezium的基本使用。
1. MySQL的准备工作
准备一个MySQL用户并且拥有相应权限,像这样:
CREATE USER 'dbz' @'%' IDENTIFIED BY 'dbzpwd' ;
GRANT SELECT , RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON * .* TO 'dbz' IDENTIFIED BY 'dbzpwd' ;
检查MySQL是否开启log-bin
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name= 'log_bin' ;
如果是OFF
则需要修改MySQL配置文件,类似下面这样:
server- id = 223344 #必须有
log_bin = mysql- bin #log_bin的值是binlog文件序列的基本名称
binlog_format = ROW #必须是ROW
binlog_row_image = FULL #必须是FULL
expire_logs_days = 10 #依据实际情况而定
准备数据库&表
create database inventory;
create table inventory.a (id bigint primary key auto_increment, name varchar (32 ));
insert into inventory.a values (null , 'n1' ),(null , 'n2' ),(null , 'n3' );
2. 编写程序
2.1. 工程依赖(Maven)
pom.xml
<dependency >
<groupId > io.debeziumgroupId >
<artifactId > debezium-apiartifactId >
<version > ${version.debezium}version >
dependency >
<dependency >
<groupId > io.debeziumgroupId >
<artifactId > debezium-embeddedartifactId >
<version > ${version.debezium}version >
dependency >
<dependency >
<groupId > io.debeziumgroupId >
<artifactId > debezium-connector-mysqlartifactId >
<version > ${version.debezium}version >
dependency >
目前Debezium最新稳定版本为:1.9.5.Final
2.2. 准备数据库&表
create database inventory;
create table inventory.a (id bigint primary key, name varchar (32 ));
insert into inventory.a values (1 , 'n1' ),(2 , 'n2' ),(3 , 'n3' );
2.3. 代码编写
package com.greatdb.dbzdemo;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
public class DebeziumTest {
private static DebeziumEngine> engine;
public static void main (String[] args) throws Exception {
final Properties props = new Properties ();
props.setProperty("name" , "dbz-engine" );
props.setProperty("connector.class" , "io.debezium.connector.mysql.MySqlConnector" );
props.setProperty("offset.storage" , "org.apache.kafka.connect.storage.FileOffsetBackingStore" );
props.setProperty("offset.storage.file.filename" , "/tmp/dbz/storage/mysql_offsets.dat" );
props.setProperty("offset.flush.interval.ms" , "0" );
props.setProperty("database.server.name" , "mysql-connector" );
props.setProperty("database.history" , "io.debezium.relational.history.FileDatabaseHistory" );
props.setProperty("database.history.file.filename" , "/tmp/dbz/storage/mysql_dbhistory.txt" );
props.setProperty("database.server.id" , "122112" );
props.setProperty("database.hostname" , "tmg" );
props.setProperty("database.port" , "3306" );
props.setProperty("database.user" , "mysqluser" );
props.setProperty("database.password" , "mysqlpw" );
props.setProperty("database.include.list" , "inventory" );
props.setProperty("table.include.list" , "inventory.a" );
props.setProperty("snapshot.mode" , "initial" );
engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying(record -> {
System.out.println(record);
})
.using((success, message, error) -> {
if (error != null ) {
System.out.println("------------error, message:" + message + "exception:" + error);
}
closeEngine(engine);
})
.build();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
addShutdownHook(engine);
awaitTermination(executor);
System.out.println("------------main finished." );
}
private static void closeEngine (DebeziumEngine> engine) {
try {
engine.close();
} catch (IOException ignored) {
}
}
private static void addShutdownHook (DebeziumEngine> engine) {
Runtime.getRuntime().addShutdownHook(new Thread (() -> closeEngine(engine)));
}
private static void awaitTermination (ExecutorService executor) {
if (executor != null ) {
try {
executor.shutdown();
while (!executor.awaitTermination(5 , TimeUnit.SECONDS)) {
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
3. 测试
程序跑起来后,可以看到控制台输出:
...(省略)
EmbeddedEngineChangeEvent [key= {"schema":{"type":"struct","fields":[{"type":"int64","optional":false ,"field":"id"}],"optional":false ,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1 }}, value = {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false ,"field":"id"},{"type":"string","optional":true ,"field":"name"}],"optional":true ,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false ,"field":"id"},{"type":"string","optional":true ,"field":"name"}],"optional":true ,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false ,"field":"version"},{"type":"string","optional":false ,"field":"connector"},{"type":"string","optional":false ,"field":"name"},{"type":"int64","optional":false ,"field":"ts_ms"},{"type":"string","optional":true ,"name":"io.debezium.data.Enum","version":1 ,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false ,"field":"db"},{"type":"string","optional":true ,"field":"sequence"},{"type":"string","optional":true ,"field":"table"},{"type":"int64","optional":false ,"field":"server_id"},{"type":"string","optional":true ,"field":"gtid"},{"type":"string","optional":false ,"field":"file"},{"type":"int64","optional":false ,"field":"pos"},{"type":"int32","optional":false ,"field":"row"},{"type":"int64","optional":true ,"field":"thread"},{"type":"string","optional":true ,"field":"query"}],"optional":false ,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false ,"field":"op"},{"type":"int64","optional":true ,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false ,"field":"id"},{"type":"int64","optional":false ,"field":"total_order"},{"type":"int64","optional":false ,"field":"data_collection_order"}],"optional":true ,"field":"transaction"}],"optional":false ,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null ,"after":{"id":1 ,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186 ,"snapshot":"true","db":"inventory","sequence":null ,"table":"a","server_id":0 ,"gtid":null ,"file":"mysql-bin.000001","pos":154 ,"row":0 ,"thread":null ,"query":null },"op":"r","ts_ms":1659064005191 ,"transaction":null }}, sourceRecord= SourceRecord{sourcePartition= {server= mysql- connector}, sourceOffset= {ts_sec= 1659064005 , file= mysql- bin.000001 , pos= 154 , snapshot= true }} ConnectRecord{topic= 'mysql-connector.inventory.a' , kafkaPartition= null , key= Struct{id= 1 }, keySchema= Schema{mysql_connector.inventory.a.Key:STRUCT}, value = Struct{after= Struct{id= 1 ,name= n1},source= Struct{version= 1.8 .1 .Final ,connector= mysql,name= mysql- connector,ts_ms= 1659064005186 ,snapshot= true ,db= inventory,table = a,server_id= 0 ,file= mysql- bin.000001 ,pos= 154 ,row = 0 },op= r,ts_ms= 1659064005191 }, valueSchema= Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp = null , headers= ConnectHeaders(headers= )}]
EmbeddedEngineChangeEvent [key= {"schema":{"type":"struct","fields":[{"type":"int64","optional":false ,"field":"id"}],"optional":false ,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2 }}, value = {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false ,"field":"id"},{"type":"string","optional":true ,"field":"name"}],"optional":true ,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false ,"field":"id"},{"type":"string","optional":true ,"field":"name"}],"optional":true ,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false ,"field":"version"},{"type":"string","optional":false ,"field":"connector"},{"type":"string","optional":false ,"field":"name"},{"type":"int64","optional":false ,"field":"ts_ms"},{"type":"string","optional":true ,"name":"io.debezium.data.Enum","version":1 ,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false ,"field":"db"},{"type":"string","optional":true ,"field":"sequence"},{"type":"string","optional":true ,"field":"table"},{"type":"int64","optional":false ,"field":"server_id"},{"type":"string","optional":true ,"field":"gtid"},{"type":"string","optional":false ,"field":"file"},{"type":"int64","optional":false ,"field":"pos"},{"type":"int32","optional":false ,"field":"row"},{"type":"int64","optional":true ,"field":"thread"},{"type":"string","optional":true ,"field":"query"}],"optional":false ,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false ,"field":"op"},{"type":"int64","optional":true ,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false ,"field":"id"},{"type":"int64","optional":false ,"field":"total_order"},{"type":"int64","optional":false ,"field":"data_collection_order"}],"optional":true ,"field":"transaction"}],"optional":false ,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null ,"after":{"id":2 ,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195 ,"snapshot":"true","db":"inventory","sequence":null ,"table":"a","server_id":0 ,"gtid":null ,"file":"mysql-bin.000001","pos":154 ,"row":0 ,"thread":null ,"query":null },"op":"r","ts_ms":1659064005196 ,"transaction":null }}, sourceRecord= SourceRecord{sourcePartition= {server= mysql- connector}, sourceOffset= {ts_sec= 1659064005 , file= mysql- bin.000001 , pos= 154 , snapshot= true }} ConnectRecord{topic= 'mysql-connector.inventory.a' , kafkaPartition= null , key= Struct{id= 2 }, keySchema= Schema{mysql_connector.inventory.a.Key:STRUCT}, value = Struct{after= Struct{id= 2 ,name= n2},source= Struct{version= 1.8 .1 .Final ,connector= mysql,name= mysql- connector,ts_ms= 1659064005195 ,snapshot= true ,db= inventory,table = a,server_id= 0 ,file= mysql- bin.000001 ,pos= 154 ,row = 0 },op= r,ts_ms= 1659064005196 }, valueSchema= Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp = null , headers= ConnectHeaders(headers= )}]
EmbeddedEngineChangeEvent [key= {"schema":{"type":"struct","fields":[{"type":"int64","optional":false ,"field":"id"}],"optional":false ,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3 }}, value = {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false ,"field":"id"},{"type":"string","optional":true ,"field":"name"}],"optional":true ,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false ,"field":"id"},{"type":"string","optional":true ,"field":"name"}],"optional":true ,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false ,"field":"version"},{"type":"string","optional":false ,"field":"connector"},{"type":"string","optional":false ,"field":"name"},{"type":"int64","optional":false ,"field":"ts_ms"},{"type":"string","optional":true ,"name":"io.debezium.data.Enum","version":1 ,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false ,"field":"db"},{"type":"string","optional":true ,"field":"sequence"},{"type":"string","optional":true ,"field":"table"},{"type":"int64","optional":false ,"field":"server_id"},{"type":"string","optional":true ,"field":"gtid"},{"type":"string","optional":false ,"field":"file"},{"type":"int64","optional":false ,"field":"pos"},{"type":"int32","optional":false ,"field":"row"},{"type":"int64","optional":true ,"field":"thread"},{"type":"string","optional":true ,"field":"query"}],"optional":false ,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false ,"field":"op"},{"type":"int64","optional":true ,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false ,"field":"id"},{"type":"int64","optional":false ,"field":"total_order"},{"type":"int64","optional":false ,"field":"data_collection_order"}],"optional":true ,"field":"transaction"}],"optional":false ,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null ,"after":{"id":3 ,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196 ,"snapshot":"last","db":"inventory","sequence":null ,"table":"a","server_id":0 ,"gtid":null ,"file":"mysql-bin.000001","pos":154 ,"row":0 ,"thread":null ,"query":null },"op":"r","ts_ms":1659064005196 ,"transaction":null }}, sourceRecord= SourceRecord{sourcePartition= {server= mysql- connector}, sourceOffset= {ts_sec= 1659064005 , file= mysql- bin.000001 , pos= 154 }} ConnectRecord{topic= 'mysql-connector.inventory.a' , kafkaPartition= null , key= Struct{id= 3 }, keySchema= Schema{mysql_connector.inventory.a.Key:STRUCT}, value = Struct{after= Struct{id= 3 ,name= n3},source= Struct{version= 1.8 .1 .Final ,connector= mysql,name= mysql- connector,ts_ms= 1659064005196 ,snapshot= last ,db= inventory,table = a,server_id= 0 ,file= mysql- bin.000001 ,pos= 154 ,row = 0 },op= r,ts_ms= 1659064005196 }, valueSchema= Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp = null , headers= ConnectHeaders(headers= )}]
...(省略)
可以看到全量的数据已经输出,关键的数据如下:
..."payload":{"before":null ,"after":{"id":1 ,"name":"n1"}..."op":"r"...
..."payload":{"before":null ,"after":{"id":2 ,"name":"n2"}..."op":"r"...
..."payload":{"before":null ,"after":{"id":3 ,"name":"n3"}..."op":"r"...
insert into inventory.a values (4 , 'n4' );
控制台输出:
..."payload":{"before":null ,"after":{"id":4 ,"name":"n4"}..."op":"c"...
update inventory.a set name = 'n4-upd' where id = 4 ;
控制台输出:
..."payload":{"before":{"id":4 ,"name":"n4"},"after":{"id":4 ,"name":"n4-upd"}..."op":"u"...
delete from inventory.a where id = 1 ;
控制台输出:
..."payload":{"before":{"id":1 ,"name":"n1"},"after":null..."op":"d"...
三、总结
本文以MySQL为例介绍了Debezium在代码中基本使用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕获这些数据行的变化,并记录了数据行变化前后的数据,并对外提供事件流,外部可以获取并对事件进行相应处理。
参考:https://debezium.io/documentation/reference/1.8/index.html