• Debezium监控Oracle数据库


    1. springboot配置

    1. 1.9.0.Final
    2. 8
    3. 8
    4. io.debezium
    5. debezium-api
    6. ${debezium.version}
    7. io.debezium
    8. debezium-embedded
    9. ${debezium.version}
    10. slf4j-log4j12
    11. org.slf4j
    12. io.debezium
    13. debezium-connector-oracle
    14. ${debezium.version}
    15. org.apache.kafka
    16. connect-api
    17. 3.2.1
    1. # CDC配置
    2. debezium:
    3. datasource:
    4. hostname: 10.XX.1.XX
    5. port: 1036
    6. user: XXX
    7. password: XXX
    8. tableWhitelist: MYDB.PM_PROJECT_INFO
    9. # tableWhitelist: MYDB.PM_PR_PROJECT_INFO,MYDB.PM_PROJECT_INFO
    10. storageFile: D:/debezium/test/offsets/offset.dat
    11. historyFile: D:/debezium/test/history/custom-file-db-history.dat
    12. flushInterval: 10000
    13. serverId: 1
    14. serverName: name-1
    15. dbname: ythtest
    16. connectionAdapter: logminer
    17. snapshotMode: schema_only
    18. tablenameCaseInsensitive: false
    19. databaseServerTimezone: UTC
    20. logMiningStrategy: online_catalog
    21. logMiningContinuousMine: true
    22. keyConverterSchemasEnable: false
    23. valueConverterSchemasEnable: false
    1. import io.debezium.connector.oracle.OracleConnector;
    2. import io.debezium.relational.history.FileDatabaseHistory;
    3. import lombok.Data;
    4. import org.apache.kafka.connect.storage.FileOffsetBackingStore;
    5. import org.springframework.boot.context.properties.ConfigurationProperties;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. import java.io.File;
    9. import java.io.IOException;
    10. @Configuration
    11. @ConfigurationProperties(prefix ="debezium.datasource")
    12. @Data
    13. public class CdcConfig {
    14. private String hostname;
    15. private String port;
    16. private String user;
    17. private String password;
    18. private String tableWhitelist;
    19. private String storageFile;
    20. private String historyFile;
    21. private Long flushInterval;
    22. private String serverId;
    23. private String serverName;
    24. private String dbname;
    25. private String connectionAdapter;
    26. private String snapshotMode;
    27. private String tablenameCaseInsensitive;
    28. private String keyConverterSchemasEnable;
    29. private String valueConverterSchemasEnable;
    30. private String databaseServerTimezone;
    31. private String logMiningStrategy;
    32. private String logMiningContinuousMine;
    33. @Bean(name = "cdcOracleConfig")
    34. public io.debezium.config.Configuration cdcOracleConfig() throws Exception {
    35. // checkFile();
    36. io.debezium.config.Configuration configuration = io.debezium.config.Configuration.create()
    37. .with("name", "oracle_connector")
    38. .with("connector.class", OracleConnector.class)
    39. // .with("offset.storage", KafkaOffsetBackingStore.class)
    40. .with("offset.storage", FileOffsetBackingStore.class)
    41. .with("offset.storage.file.filename", storageFile)
    42. .with("offset.flush.interval.ms", flushInterval)
    43. .with("database.history", FileDatabaseHistory.class.getName())
    44. .with("database.history.file.filename", historyFile)
    45. .with("snapshot.mode", "Schema_only")
    46. .with("database.server.id", serverId)
    47. .with("database.server.name", serverName)
    48. .with("database.hostname", hostname)
    49. .with("database.dbname", dbname)
    50. .with("database.port", port)
    51. .with("database.user", user)
    52. .with("database.password", password)
    53. .with("table.whitelist", tableWhitelist)
    54. .with("column.include.list", "MYDB.PM_PROJECT_INFO.PRO_ID,MYDB.PM_PROJECT_INFO.PRO_CODE")
    55. .with("table.include.list", "MYDB.PM_PROJECT_INFO")
    56. // .with("column.include.list", "MYDB.PM_PR_PROJECT_INFO.PRO_ID,MYDB.PM_PR_PROJECT_INFO.PRO_CODE,MYDB.PM_PROJECT_INFO.PRO_ID,MYDB.PM_PROJECT_INFO.PRO_CODE")
    57. // .with("table.include.list", "MYDB.PM_PR_PROJECT_INFO,MYDB.PM_PROJECT_INFO")
    58. // .with("database.include.list", "MYDB")
    59. .with("database.connection.adapter", connectionAdapter)
    60. .with("snapshot.mode", snapshotMode)
    61. .with("database.tablename.case.insensitive", tablenameCaseInsensitive)
    62. .with("database.serverTimezone", databaseServerTimezone)
    63. // 解决延迟
    64. .with("log.mining.strategy", logMiningStrategy)
    65. // .with("log.mining.continuous.mine", logMiningContinuousMine)
    66. .with("key.converter.schemas.enable", keyConverterSchemasEnable)
    67. .with("value.converter.schemas.enable", valueConverterSchemasEnable)
    68. .build();
    69. return configuration;
    70. }
    71. private void checkFile() throws IOException {
    72. String dir = storageFile.substring(0, storageFile.lastIndexOf("/"));
    73. File dirFile = new File(dir);
    74. if(!dirFile.exists()){
    75. dirFile.mkdirs();
    76. }
    77. File file = new File(storageFile);
    78. if(!file.exists()){
    79. file.createNewFile();
    80. }
    81. }
    82. }
    1. import com.alibaba.fastjson.JSON;
    2. import io.debezium.config.Configuration;
    3. import io.debezium.data.Envelope;
    4. import io.debezium.engine.ChangeEvent;
    5. import io.debezium.engine.DebeziumEngine;
    6. import io.debezium.engine.format.Json;
    7. import lombok.Builder;
    8. import lombok.Data;
    9. import lombok.extern.slf4j.Slf4j;
    10. import org.apache.commons.lang3.StringUtils;
    11. import org.springframework.beans.factory.annotation.Qualifier;
    12. import org.springframework.stereotype.Component;
    13. import javax.annotation.PostConstruct;
    14. import javax.annotation.PreDestroy;
    15. import javax.annotation.Resource;
    16. import java.io.IOException;
    17. import java.util.ArrayList;
    18. import java.util.List;
    19. import java.util.Map;
    20. import java.util.Objects;
    21. import java.util.concurrent.Executors;
    22. @Component
    23. @Slf4j
    24. public class CdcListener {
    25. @Qualifier(value = "cdcOracleConfig")
    26. @Resource
    27. private Configuration configuration;
    28. private final List>> engineList = new ArrayList<>();
    29. private void receiveChangeEvent(String value) {
    30. if (Objects.nonNull(value)) {
    31. Map payload = getPayload(value);
    32. if (Objects.isNull(payload)) {
    33. payload = JSON.parseObject(value, Map.class);
    34. }
    35. String op = JSON.parseObject(JSON.toJSONString(payload.get("op")), String.class);
    36. if (!(StringUtils.isBlank(op) || Envelope.Operation.READ.equals(op))) {
    37. ChangeData changeData = getChangeData(payload);
    38. // 这里抛出异常会导致后面的日志监听失败
    39. try {
    40. // Map source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
    41. log.info("payload ===> {}", payload);
    42. // mysqlBinlogService.service(changeData);
    43. } catch (Exception e) {
    44. log.error("binlog处理异常,原数据: " + changeData, e);
    45. }
    46. }
    47. }
    48. }
    49. @PostConstruct
    50. private void start() {
    51. this.engineList.add(
    52. DebeziumEngine.create(Json.class)
    53. .using(configuration.asProperties())
    54. .notifying(record -> receiveChangeEvent(record.value()))
    55. .build());
    56. for (DebeziumEngine> engine : engineList) {
    57. Executors.newFixedThreadPool(3).execute(engine);
    58. }
    59. }
    60. @PreDestroy
    61. private void stop() {
    62. for (DebeziumEngine> engine : engineList) {
    63. if (engine != null) {
    64. try {
    65. engine.close();
    66. } catch (IOException e) {
    67. log.error("", e);
    68. }
    69. }
    70. }
    71. }
    72. public static Map getPayload(String value) {
    73. Map map = JSON.parseObject(value, Map.class);
    74. Map payload = JSON.parseObject(JSON.toJSONString(map.get("payload")), Map.class);
    75. return payload;
    76. }
    77. public static ChangeData getChangeData(Map payload) {
    78. Map source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
    79. return ChangeData.builder()
    80. .op(payload.get("op").toString())
    81. .table(source.get("table").toString())
    82. .after(JSON.parseObject(JSON.toJSONString(payload.get("after")), Map.class))
    83. .source(JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class))
    84. .before(JSON.parseObject(JSON.toJSONString(payload.get("before")), Map.class))
    85. .build();
    86. }
    87. @Data
    88. @Builder
    89. public static class ChangeData {
    90. /**
    91. * 更改前数据
    92. */
    93. private Map after;
    94. private Map source;
    95. /**
    96. * 更改后数据
    97. */
    98. private Map before;
    99. /**
    100. * 更改的表名
    101. */
    102. private String table;
    103. /**
    104. * 操作类型, 枚举 Envelope.Operation
    105. * READ("r"),
    106. * CREATE("c"),
    107. * UPDATE("u"),
    108. * DELETE("d"),
    109. * TRUNCATE("t");
    110. */
    111. private String op;
    112. }
    113. }

    参考:

    不想引入MQ?不妨试试 Debezium

    Debezium监控Oracle数据库遇到的坑_debezium 代码监听oracle cdc-CSDN博客

    基于 LogMiner 和 Debezium 构建可用于生产实践的 Oracle 实时数据采集工具_架构_丁杨_InfoQ精选文章

    Debezium 日志挖掘策略2

    2. 一些报错

    1) java.sql.SQLException: ORA-01325: 要构建日志流, 必须启用“归档日志”模式 

    Oracle数据库开启归档日志和补充日志

    使用OGG实现Oracle到MySQL数据平滑迁移red hat中文文档

    3.debezium文档

    red hat中文文档

    官方文档

    4.logminer分析日志

    如何使用logminer查看日志

    如何利用DBMS_LOGMNR包挖掘在线日志

  • 相关阅读:
    i.MX 6ULL 驱动开发 二十八:网络设备
    Git 教程详解
    Audio track和OpenSL ES有啥区别?哪个好?
    欢迎使用Markdown编辑器
    扬帆牧哲:shopee店铺运营干货分享
    推荐算法学习笔记2.1:基于深度学习的推荐算法-基于共线矩阵的深度推荐算法-AutoRec模型
    springboot毕设项目超市收银与会员管理系统6l826(java+VUE+Mybatis+Maven+Mysql)
    【python】raise Exception使用方式示例
    深拷贝-浅拷贝-引用赋值的写法
    SpringBoot 如何使用 JProfiler 进行性能测试
  • 原文地址:https://blog.csdn.net/weixin_41085114/article/details/133883495