• Debezium实现mysql监听


     pom:

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <modelVersion>4.0.0</modelVersion>
    5. <groupId>com.zy</groupId>
    6. <artifactId>data-sync</artifactId>
    7. <version>0.0.1-SNAPSHOT</version>
    8. <name>data-sync</name>
    9. <properties>
    10. <java.version>1.8</java.version>
    11. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    12. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    13. <spring-boot.version>2.3.7.RELEASE</spring-boot.version>
    14. <commons-pool2.version>2.6.2</commons-pool2.version>
    15. <fastjson.version>1.2.79</fastjson.version>
    16. <hutool.version>5.8.16</hutool.version>
    17. <mybatis-plus.version>3.2.0</mybatis-plus.version>
    18. <druid.version>1.2.6</druid.version>
    19. <mysql.version>8.0.30</mysql.version>
    20. <orika-core.version>1.5.1</orika-core.version>
    21. <aspectjrt.version>1.9.7</aspectjrt.version>
    22. <commons-lang3.version>3.12.0</commons-lang3.version>
    23. <debezium.version>1.9.5.Final</debezium.version>
    24. </properties>
    25. <dependencies>
    26. <!-- spring-boot-starter-web -->
    27. <dependency>
    28. <groupId>org.springframework.boot</groupId>
    29. <artifactId>spring-boot-starter-web</artifactId>
    30. </dependency>
    31. <!-- 阿里JSON解析器 -->
    32. <dependency>
    33. <groupId>com.alibaba</groupId>
    34. <artifactId>fastjson</artifactId>
    35. <version>${fastjson.version}</version>
    36. </dependency>
    37. <!-- hutool -->
    38. <dependency>
    39. <groupId>cn.hutool</groupId>
    40. <artifactId>hutool-all</artifactId>
    41. <version>${hutool.version}</version>
    42. </dependency>
    43. <!-- mybatis-plus -->
    44. <dependency>
    45. <groupId>com.baomidou</groupId>
    46. <artifactId>mybatis-plus-boot-starter</artifactId>
    47. <version>${mybatis-plus.version}</version>
    48. </dependency>
    49. <!-- 阿里数据库连接池 -->
    50. <dependency>
    51. <groupId>com.alibaba</groupId>
    52. <artifactId>druid-spring-boot-starter</artifactId>
    53. <version>${druid.version}</version>
    54. </dependency>
    55. <!-- Mysql驱动包 -->
    56. <dependency>
    57. <groupId>mysql</groupId>
    58. <artifactId>mysql-connector-java</artifactId>
    59. <version>${mysql.version}</version>
    60. </dependency>
    61. <!-- 对象转换工具类 -->
    62. <dependency>
    63. <groupId>ma.glasnost.orika</groupId>
    64. <artifactId>orika-core</artifactId>
    65. <version>${orika-core.version}</version>
    66. </dependency>
    67. <!-- aop切面 -->
    68. <dependency>
    69. <groupId>org.aspectj</groupId>
    70. <artifactId>aspectjrt</artifactId>
    71. <version>${aspectjrt.version}</version>
    72. </dependency>
    73. <!-- commons-lang3-->
    74. <dependency>
    75. <groupId>org.apache.commons</groupId>
    76. <artifactId>commons-lang3</artifactId>
    77. <version>${commons-lang3.version}</version>
    78. </dependency>
    79. <!-- aop-->
    80. <dependency>
    81. <groupId>org.springframework.boot</groupId>
    82. <artifactId>spring-boot-starter-aop</artifactId>
    83. </dependency>
    84. <!-- dynamic-datasource-spring-boot-starter -->
    85. <dependency>
    86. <groupId>com.baomidou</groupId>
    87. <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
    88. <version>3.5.0</version>
    89. </dependency>
    90. <!--瀚高驱动-->
    91. <dependency>
    92. <groupId>com.highgo</groupId>
    93. <artifactId>HgdbJdbc</artifactId>
    94. <version>6.2.2</version>
    95. </dependency>
    96. <!-- clickhouse -->
    97. <dependency>
    98. <groupId>com.clickhouse</groupId>
    99. <artifactId>clickhouse-jdbc</artifactId>
    100. <version>0.3.2</version>
    101. </dependency>
    102. <dependency>
    103. <groupId>io.debezium</groupId>
    104. <artifactId>debezium-api</artifactId>
    105. <version>${debezium.version}</version>
    106. </dependency>
    107. <dependency>
    108. <groupId>io.debezium</groupId>
    109. <artifactId>debezium-embedded</artifactId>
    110. <version>${debezium.version}</version>
    111. </dependency>
    112. <dependency>
    113. <groupId>io.debezium</groupId>
    114. <artifactId>debezium-connector-mysql</artifactId>
    115. <version>${debezium.version}</version>
    116. <exclusions>
    117. <exclusion>
    118. <groupId>mysql</groupId>
    119. <artifactId>mysql-connector-java</artifactId>
    120. </exclusion>
    121. </exclusions>
    122. </dependency>
    123. <dependency>
    124. <groupId>io.debezium</groupId>
    125. <artifactId>debezium-connector-postgres</artifactId>
    126. <version>${debezium.version}</version>
    127. </dependency>
    128. <dependency>
    129. <groupId>org.projectlombok</groupId>
    130. <artifactId>lombok</artifactId>
    131. </dependency>
    132. <dependency>
    133. <groupId>org.projectlombok</groupId>
    134. <artifactId>lombok</artifactId>
    135. </dependency>
    136. </dependencies>
    137. <repositories>
    138. <repository>
    139. <id>nexus-aliyun</id>
    140. <name>nexus-aliyun</name>
    141. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    142. <releases>
    143. <enabled>true</enabled>
    144. </releases>
    145. <snapshots>
    146. <enabled>false</enabled>
    147. </snapshots>
    148. </repository>
    149. </repositories>
    150. <dependencyManagement>
    151. <dependencies>
    152. <dependency>
    153. <groupId>org.springframework.boot</groupId>
    154. <artifactId>spring-boot-dependencies</artifactId>
    155. <version>${spring-boot.version}</version>
    156. <type>pom</type>
    157. <scope>import</scope>
    158. </dependency>
    159. </dependencies>
    160. </dependencyManagement>
    161. <build>
    162. <plugins>
    163. <plugin>
    164. <groupId>org.apache.maven.plugins</groupId>
    165. <artifactId>maven-compiler-plugin</artifactId>
    166. <version>3.8.1</version>
    167. <configuration>
    168. <source>1.8</source>
    169. <target>1.8</target>
    170. <encoding>UTF-8</encoding>
    171. </configuration>
    172. </plugin>
    173. <plugin>
    174. <groupId>org.springframework.boot</groupId>
    175. <artifactId>spring-boot-maven-plugin</artifactId>
    176. <version>${spring-boot.version}</version>
    177. <configuration>
    178. <mainClass>com.zy.data.sync.DataSyncApplication</mainClass>
    179. <skip>true</skip>
    180. </configuration>
    181. <executions>
    182. <execution>
    183. <id>repackage</id>
    184. <goals>
    185. <goal>repackage</goal>
    186. </goals>
    187. </execution>
    188. </executions>
    189. </plugin>
    190. </plugins>
    191. </build>
    192. </project>

     CdcInitService:

    1. package com.zy.data.sync.moudles.init.service;
    2. import com.zy.data.sync.utils.DebeziumDataHande;
    3. import com.zy.data.sync.utils.DebeziumSqlHande;
    4. import io.debezium.engine.ChangeEvent;
    5. import io.debezium.engine.DebeziumEngine;
    6. import io.debezium.engine.format.Json;
    7. import lombok.extern.slf4j.Slf4j;
    8. import org.codehaus.plexus.util.StringUtils;
    9. import org.springframework.stereotype.Service;
    10. import javax.annotation.PostConstruct;
    11. import java.util.HashMap;
    12. import java.util.Map;
    13. import java.util.Properties;
    14. import java.util.concurrent.ExecutorService;
    15. import java.util.concurrent.Executors;
    16. @Service
    17. @Slf4j
    18. public class CdcInitService
    19. {
    20. //参数参考: https://blog.51cto.com/maxiaobian/3014474
    21. //日期转换参考: https://blog.csdn.net/qq_30529079/article/details/127809317
    22. @PostConstruct
    23. public void init() {
    24. final Properties props = new Properties();
    25. props.setProperty("name", "instala-core");
    26. props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
    27. //偏移量持久化,用来容错
    28. props.setProperty("offset.storage","org.apache.kafka.connect.storage.FileOffsetBackingStore");
    29. //偏移量持久化文件路径,默认/tmp/offsets.dat,如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
    30. props.setProperty("offset.storage.file.filename", "D:/tmp/offsets.dat");
    31. //如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
    32. props.setProperty("offset.flush.interval.ms", "60000");
    33. //需要监听的数据库名称
    34. props.setProperty("database.whitelist", "report_sharing_center");
    35. //initial(默认) 连接器执行数据库的初始一致性快照,快照完成后,连接器开始为后续数据库更改流式传输事件记录。
    36. //initial_only 连接器只执行数据库的初始一致性快照,不允许捕获任何后续更改的事件。
    37. //schema_only 连接器只捕获所有相关表的表结构,不捕获初始数据,但是会同步后续数据库的更改记录
    38. //schema_only_recovery 设置此选项可恢复丢失或损坏的数据库历史主题(database.history.kafka.topic)。
    39. props.setProperty("snapshot.mode", "schema_only");
    40. //数据库地址
    41. props.setProperty("database.hostname", "xxx");
    42. //数据库端口
    43. props.setProperty("database.port","3306");
    44. //数据库用户名
    45. props.setProperty("database.user", "xxx");
    46. //数据库密码
    47. props.setProperty("database.password", "xxx");
    48. //server.id起到唯一标识作用,随意起
    49. props.setProperty("database.server.id", "xxx");
    50. //server.name起到唯一标识作用,随意起
    51. props.setProperty("database.server.name", "xxx");
    52. //历史变更记录
    53. props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
    54. //历史变更记录存储位置
    55. props.setProperty("database.history.file.filename", "D:/tmp/dbhistory.dat");
    56. //格式化日期
    57. props.setProperty("converters", "dateConverters");
    58. props.setProperty("dateConverters.type", "com.zy.data.sync.common.conver.MySqlDateTimeConverter");
    59. props.setProperty("dateConverters.format.date", "yyyy-MM-dd");
    60. props.setProperty("dateConverters.format.time", "HH:mm:ss");
    61. props.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss");
    62. props.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss");
    63. props.setProperty("dateConverters.format.timestamp.zone", "UTC+8");
    64. //全局读写锁,会影响在线业务,所以跳过锁设置
    65. props.setProperty("debezium.snapshot.locking.mode","none");
    66. //是否包含数据库表结构层面的变更,建议使用默认值true
    67. props.setProperty("include.schema.changes", "true");
    68. //指定 BIGINT UNSIGNED 列应如何在更改事件中表示。可能的设置有
    69. //long使用 Java 的 表示值long,这可能无法提供精确度,但在消费者中易于使用。long通常是首选设置。
    70. props.setProperty("bigint.unsigned.handling.mode","long");
    71. //decimal类型转换为double
    72. props.setProperty("decimal.handling.mode","double");
    73. DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
    74. .using(props)
    75. .notifying(record -> {
    76. String recordStr = record.value();
    77. if(StringUtils.isNotEmpty(recordStr)){
    78. System.out.println("recordStr-->"+recordStr);
    79. Map<String, Object> payload = DebeziumDataHande.getPayload(recordStr);
    80. //to do something
    81. if(payload.containsKey("op")&&payload.get("op").equals("c")){
    82. }
    83. }
    84. }).build();
    85. Executors.newSingleThreadExecutor().execute(engine);
    86. // Run the engine asynchronously ...
    87. ExecutorService executor = Executors.newSingleThreadExecutor();
    88. executor.execute(engine);
    89. }
    90. }

    MySqlDateTimeConverter:

    1. package com.zy.data.sync.common.conver;
    2. import io.debezium.spi.converter.CustomConverter;
    3. import io.debezium.spi.converter.RelationalColumn;
    4. import org.apache.kafka.connect.data.SchemaBuilder;
    5. import org.slf4j.Logger;
    6. import org.slf4j.LoggerFactory;
    7. import java.time.*;
    8. import java.time.format.DateTimeFormatter;
    9. import java.util.Properties;
    10. import java.util.function.Consumer;
    11. public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
    12. private final static Logger logger = LoggerFactory.getLogger(MySqlDateTimeConverter.class);
    13. private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
    14. private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
    15. private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
    16. private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
    17. private ZoneId timestampZoneId = ZoneId.systemDefault();
    18. @Override
    19. public void configure(Properties props) {
    20. readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p));
    21. readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p));
    22. readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));
    23. readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
    24. readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z));
    25. }
    26. private void readProps(Properties properties, String settingKey, Consumer<String> callback) {
    27. String settingValue = (String) properties.get(settingKey);
    28. if (settingValue == null || settingValue.length() == 0) {
    29. return;
    30. }
    31. try {
    32. callback.accept(settingValue.trim());
    33. } catch (IllegalArgumentException | DateTimeException e) {
    34. logger.error("The {} setting is illegal: {}",settingKey,settingValue);
    35. throw e;
    36. }
    37. }
    38. @Override
    39. public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
    40. String sqlType = column.typeName().toUpperCase();
    41. SchemaBuilder schemaBuilder = null;
    42. Converter converter = null;
    43. if ("DATE".equals(sqlType)) {
    44. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
    45. converter = this::convertDate;
    46. }
    47. if ("TIME".equals(sqlType)) {
    48. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
    49. converter = this::convertTime;
    50. }
    51. if ("DATETIME".equals(sqlType)) {
    52. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
    53. converter = this::convertDateTime;
    54. }
    55. if ("TIMESTAMP".equals(sqlType)) {
    56. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
    57. converter = this::convertTimestamp;
    58. }
    59. if (schemaBuilder != null) {
    60. registration.register(schemaBuilder, converter);
    61. }
    62. }
    63. private String convertDate(Object input) {
    64. if (input instanceof LocalDate) {
    65. return dateFormatter.format((LocalDate) input);
    66. }
    67. if (input instanceof Integer) {
    68. LocalDate date = LocalDate.ofEpochDay((Integer) input);
    69. return dateFormatter.format(date);
    70. }
    71. return null;
    72. }
    73. private String convertTime(Object input) {
    74. if (input instanceof Duration) {
    75. Duration duration = (Duration) input;
    76. long seconds = duration.getSeconds();
    77. int nano = duration.getNano();
    78. LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
    79. return timeFormatter.format(time);
    80. }
    81. return null;
    82. }
    83. private String convertDateTime(Object input) {
    84. if (input instanceof LocalDateTime) {
    85. return datetimeFormatter.format((LocalDateTime) input);
    86. }
    87. return null;
    88. }
    89. private String convertTimestamp(Object input) {
    90. if (input instanceof ZonedDateTime) {
    91. // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
    92. ZonedDateTime zonedDateTime = (ZonedDateTime) input;
    93. LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
    94. return timestampFormatter.format(localDateTime);
    95. }
    96. return null;
    97. }
    98. }
    DebeziumDataHande:
    1. package com.zy.data.sync.utils;
    2. import com.alibaba.fastjson.JSON;
    3. import java.util.Map;
    4. public class DebeziumDataHande {
    5. public static Map<String, Object> getPayload(String value) {
    6. Map<String, Object> map = JSON.parseObject(value, Map.class);
    7. if(map.containsKey("payload")){
    8. return JSON.parseObject(JSON.toJSONString(map.get("payload")), Map.class);
    9. }
    10. return null;
    11. }
    12. public static Map<String, Object> getBefore(Map<String, Object> payload) {
    13. Map<String, Object> map = JSON.parseObject(payload.get("before").toString(), Map.class);
    14. return map;
    15. }
    16. public static Map<String, Object> getAfter(Map<String, Object> payload) {
    17. Map<String, Object> map = JSON.parseObject(payload.get("after").toString(), Map.class);
    18. return map;
    19. }
    20. public static Map<String, Object> getSource(Map<String, Object> payload) {
    21. Map<String, Object> map = JSON.parseObject(payload.get("source").toString(), Map.class);
    22. return map;
    23. }
    24. public static String getTableName(Map<String, Object> payload) {
    25. Map<String, Object> sourceMap = getSource(payload);
    26. String tableName = sourceMap.get("table").toString();
    27. return tableName;
    28. }
    29. }
    DebeziumSqlHande:
    1. package com.zy.data.sync.utils;
    2. import java.util.Map;
    3. public class DebeziumSqlHande {
    4. public static String getInsertSql(Map<String, Object> insertParam,String tableName) {
    5. String insertSql = SQLUtils.genSqlInsert(insertParam, tableName);
    6. return insertSql;
    7. }
    8. public static String getDeleteSql(Map<String, Object> deleteParam, String tableName) {
    9. String deleteSql = SQLUtils.genSqlDelete(deleteParam, tableName);
    10. return deleteSql;
    11. }
    12. }
    SQLUtils:
    1. package com.zy.data.sync.utils;
    2. import java.util.Map;
    3. public class SQLUtils {
    4. public static String genSqlInsert(Map<String, Object> dataMap,String tableName) {
    5. if(dataMap.size()==0) {
    6. return null;
    7. }
    8. //生成INSERT INTO table(field1,field2) 部分
    9. StringBuffer sbField = new StringBuffer();
    10. //生成VALUES('value1','value2') 部分
    11. StringBuffer sbValue = new StringBuffer();
    12. sbField.append("INSERT INTO " + tableName.toLowerCase() + "(");
    13. for(Map.Entry<String, Object> entry : dataMap.entrySet()){
    14. String mapKey = entry.getKey();
    15. Object mapValue = entry.getValue();
    16. if(mapValue instanceof String){
    17. sbValue.append("'" + mapValue + "',");
    18. }else{
    19. sbValue.append(mapValue + ",");
    20. }
    21. sbField.append("`" + mapKey + "`,");
    22. }
    23. String sbFieldStr = sbField.toString();
    24. String sbValueStr = sbValue.toString();
    25. if(sbFieldStr.endsWith(",")){
    26. sbFieldStr = sbFieldStr.substring(0,sbFieldStr.length()-1);
    27. }
    28. if(sbValueStr.endsWith(",")){
    29. sbValueStr = sbValueStr.substring(0,sbValueStr.length()-1);
    30. }
    31. return sbFieldStr + ") VALUES(" + sbValueStr + ")";
    32. }
    33. public static String genSqlDelete(Map<String, Object> dataMap,String tableName) {
    34. if(dataMap.size()==0) {
    35. return null;
    36. }
    37. //生成DELETE FROM xxx where 部分
    38. StringBuffer sbField = new StringBuffer();
    39. //生成id = zhangsan and sex = 1
    40. StringBuffer sbValue = new StringBuffer();
    41. sbField.append("DELETE FROM " + tableName.toLowerCase() + " where ");
    42. for(Map.Entry<String, Object> entry : dataMap.entrySet()){
    43. String mapKey = entry.getKey();
    44. Object mapValue = entry.getValue();
    45. if(mapValue instanceof String){
    46. sbValue.append("and "+mapKey+"='" + mapValue + "' ");
    47. }else{
    48. sbValue.append("and "+mapKey+"=" + mapValue + " ");
    49. }
    50. }
    51. String sbFieldStr = sbField.toString();
    52. String sbValueStr = sbValue.toString();
    53. if(sbValueStr.startsWith("and")){
    54. sbValueStr = sbValueStr.substring(3);
    55. }
    56. if(sbValueStr.endsWith(",")){
    57. sbValueStr = sbValueStr.substring(0,sbValueStr.length()-1);
    58. }
    59. return sbFieldStr + sbValueStr ;
    60. }
    61. }

  • 相关阅读:
    matlab simulink仿真
    复盘-----vue
    JVM线程的几种状态
    DASCTF2022.07赋能赛 web 复现
    阿里云搭建博客之如何设置网页为中文
    实验(四):LCD1602显示实验
    轻松学习Python:基础知识汇总
    数据结构 - AVL树
    一个简单的删除,我发现这么多知识...
    仿真科普|CAE技术赋能无人机 低空经济蓄势起飞
  • 原文地址:https://blog.csdn.net/u013008898/article/details/132827141