1. springboot配置:
-
1.9.0.Final -
8 -
8 -
-
-
-
-
io.debezium -
debezium-api -
${debezium.version} -
-
-
io.debezium -
debezium-embedded -
${debezium.version} -
-
-
slf4j-log4j12 -
org.slf4j -
-
-
-
-
io.debezium -
debezium-connector-oracle -
${debezium.version} -
-
-
org.apache.kafka -
connect-api -
3.2.1 -
-
-
- # CDC配置
- debezium:
- datasource:
- hostname: 10.XX.1.XX
- port: 1036
- user: XXX
- password: XXX
- tableWhitelist: MYDB.PM_PROJECT_INFO
- # tableWhitelist: MYDB.PM_PR_PROJECT_INFO,MYDB.PM_PROJECT_INFO
- storageFile: D:/debezium/test/offsets/offset.dat
- historyFile: D:/debezium/test/history/custom-file-db-history.dat
- flushInterval: 10000
- serverId: 1
- serverName: name-1
- dbname: ythtest
- connectionAdapter: logminer
- snapshotMode: schema_only
- tablenameCaseInsensitive: false
- databaseServerTimezone: UTC
- logMiningStrategy: online_catalog
- logMiningContinuousMine: true
- keyConverterSchemasEnable: false
- valueConverterSchemasEnable: false
-
- import io.debezium.connector.oracle.OracleConnector;
- import io.debezium.relational.history.FileDatabaseHistory;
- import lombok.Data;
- import org.apache.kafka.connect.storage.FileOffsetBackingStore;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import java.io.File;
- import java.io.IOException;
-
- @Configuration
- @ConfigurationProperties(prefix ="debezium.datasource")
- @Data
- public class CdcConfig {
-
- private String hostname;
- private String port;
- private String user;
- private String password;
- private String tableWhitelist;
- private String storageFile;
- private String historyFile;
- private Long flushInterval;
- private String serverId;
- private String serverName;
- private String dbname;
- private String connectionAdapter;
- private String snapshotMode;
- private String tablenameCaseInsensitive;
- private String keyConverterSchemasEnable;
- private String valueConverterSchemasEnable;
- private String databaseServerTimezone;
- private String logMiningStrategy;
- private String logMiningContinuousMine;
-
-
- @Bean(name = "cdcOracleConfig")
- public io.debezium.config.Configuration cdcOracleConfig() throws Exception {
- // checkFile();
- io.debezium.config.Configuration configuration = io.debezium.config.Configuration.create()
- .with("name", "oracle_connector")
- .with("connector.class", OracleConnector.class)
- // .with("offset.storage", KafkaOffsetBackingStore.class)
- .with("offset.storage", FileOffsetBackingStore.class)
- .with("offset.storage.file.filename", storageFile)
- .with("offset.flush.interval.ms", flushInterval)
- .with("database.history", FileDatabaseHistory.class.getName())
- .with("database.history.file.filename", historyFile)
- .with("snapshot.mode", "Schema_only")
- .with("database.server.id", serverId)
- .with("database.server.name", serverName)
- .with("database.hostname", hostname)
- .with("database.dbname", dbname)
- .with("database.port", port)
- .with("database.user", user)
- .with("database.password", password)
-
- .with("table.whitelist", tableWhitelist)
- .with("column.include.list", "MYDB.PM_PROJECT_INFO.PRO_ID,MYDB.PM_PROJECT_INFO.PRO_CODE")
- .with("table.include.list", "MYDB.PM_PROJECT_INFO")
- // .with("column.include.list", "MYDB.PM_PR_PROJECT_INFO.PRO_ID,MYDB.PM_PR_PROJECT_INFO.PRO_CODE,MYDB.PM_PROJECT_INFO.PRO_ID,MYDB.PM_PROJECT_INFO.PRO_CODE")
- // .with("table.include.list", "MYDB.PM_PR_PROJECT_INFO,MYDB.PM_PROJECT_INFO")
-
- // .with("database.include.list", "MYDB")
-
- .with("database.connection.adapter", connectionAdapter)
- .with("snapshot.mode", snapshotMode)
- .with("database.tablename.case.insensitive", tablenameCaseInsensitive)
- .with("database.serverTimezone", databaseServerTimezone)
- // 解决延迟
- .with("log.mining.strategy", logMiningStrategy)
- // .with("log.mining.continuous.mine", logMiningContinuousMine)
- .with("key.converter.schemas.enable", keyConverterSchemasEnable)
- .with("value.converter.schemas.enable", valueConverterSchemasEnable)
- .build();
- return configuration;
-
- }
-
- private void checkFile() throws IOException {
- String dir = storageFile.substring(0, storageFile.lastIndexOf("/"));
- File dirFile = new File(dir);
- if(!dirFile.exists()){
- dirFile.mkdirs();
- }
- File file = new File(storageFile);
- if(!file.exists()){
- file.createNewFile();
- }
- }
- }
- import com.alibaba.fastjson.JSON;
- import io.debezium.config.Configuration;
- import io.debezium.data.Envelope;
- import io.debezium.engine.ChangeEvent;
- import io.debezium.engine.DebeziumEngine;
- import io.debezium.engine.format.Json;
- import lombok.Builder;
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import javax.annotation.Resource;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.concurrent.Executors;
-
- @Component
- @Slf4j
- public class CdcListener {
-
- @Qualifier(value = "cdcOracleConfig")
- @Resource
- private Configuration configuration;
-
- private final List
>> engineList = new ArrayList<>(); -
- private void receiveChangeEvent(String value) {
- if (Objects.nonNull(value)) {
- Map
payload = getPayload(value); - if (Objects.isNull(payload)) {
- payload = JSON.parseObject(value, Map.class);
- }
- String op = JSON.parseObject(JSON.toJSONString(payload.get("op")), String.class);
- if (!(StringUtils.isBlank(op) || Envelope.Operation.READ.equals(op))) {
- ChangeData changeData = getChangeData(payload);
- // 这里抛出异常会导致后面的日志监听失败
- try {
- // Map source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
- log.info("payload ===> {}", payload);
- // mysqlBinlogService.service(changeData);
- } catch (Exception e) {
- log.error("binlog处理异常,原数据: " + changeData, e);
- }
-
- }
- }
- }
-
- @PostConstruct
- private void start() {
- this.engineList.add(
- DebeziumEngine.create(Json.class)
- .using(configuration.asProperties())
- .notifying(record -> receiveChangeEvent(record.value()))
- .build());
- for (DebeziumEngine
> engine : engineList) { - Executors.newFixedThreadPool(3).execute(engine);
- }
- }
-
- @PreDestroy
- private void stop() {
- for (DebeziumEngine
> engine : engineList) { - if (engine != null) {
- try {
- engine.close();
- } catch (IOException e) {
- log.error("", e);
- }
- }
- }
- }
-
-
- public static Map
getPayload(String value) { - Map
map = JSON.parseObject(value, Map.class); - Map
payload = JSON.parseObject(JSON.toJSONString(map.get("payload")), Map.class); - return payload;
- }
-
- public static ChangeData getChangeData(Map
payload) { - Map
source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class); - return ChangeData.builder()
- .op(payload.get("op").toString())
- .table(source.get("table").toString())
- .after(JSON.parseObject(JSON.toJSONString(payload.get("after")), Map.class))
- .source(JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class))
- .before(JSON.parseObject(JSON.toJSONString(payload.get("before")), Map.class))
- .build();
- }
-
- @Data
- @Builder
- public static class ChangeData {
- /**
- * 更改前数据
- */
- private Map
after; - private Map
source; - /**
- * 更改后数据
- */
- private Map
before; - /**
- * 更改的表名
- */
- private String table;
- /**
- * 操作类型, 枚举 Envelope.Operation
- * READ("r"),
- * CREATE("c"),
- * UPDATE("u"),
- * DELETE("d"),
- * TRUNCATE("t");
- */
- private String op;
- }
-
- }
参考:
Debezium监控Oracle数据库遇到的坑_debezium 代码监听oracle cdc-CSDN博客
基于 LogMiner 和 Debezium 构建可用于生产实践的 Oracle 实时数据采集工具_架构_丁杨_InfoQ精选文章
2. 一些报错
1) java.sql.SQLException: ORA-01325: 要构建日志流, 必须启用“归档日志”模式
使用OGG实现Oracle到MySQL数据平滑迁移red hat中文文档
3.debezium文档
4.logminer分析日志