• canal1.1.7实战


    1.环境搭建

    canal可以用来监听mysql数据库的变化,用来同步数据

    先下载最新的部署版本,release地址:Releases · alibaba/canal · GitHub

    包下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

     下载完后,在linux上新建一个canal文件夹,放入tar包解压: tar -zxvf canal.xxx.tar.gz

    解压完后修改配置文件

    查看conf/canal.properties,其中canal.port是客户端连接的端口,需要放开,canal.admin.user和canal.admin.passwd是客户端连接的账号

    再打开conf/example/ instance.properties, master.address填数据库地址,dbUsername和dbPassword是数据库账号,flter.regex可以用来过滤数据库,默认是监听所有数据库,如果想监听db_开头的数据可以这么写db_.*\\..*,多个用逗号分隔

     修改完成后,进入bin目录,执行./startup.sh是启动,./stop.sh是关闭

    进入logs/example,执行tail -f -n 300 example.log,看到以下输出说明搭建成功了

     2.客户端代码

    引入依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>com.alibaba.ottergroupId>
    4. <artifactId>canal.clientartifactId>
    5. <version>1.1.7version>
    6. dependency>
    7. <dependency>
    8. <groupId>com.alibaba.ottergroupId>
    9. <artifactId>canal.protocolartifactId>
    10. <version>1.1.7version>
    11. dependency>
    12. dependencies>

    代码实现:

    1. package cn.hollycloud.iplatform;
    2. import com.alibaba.otter.canal.client.CanalConnector;
    3. import com.alibaba.otter.canal.client.CanalConnectors;
    4. import com.alibaba.otter.canal.protocol.CanalEntry;
    5. import com.alibaba.otter.canal.protocol.Message;
    6. import com.google.protobuf.ByteString;
    7. import com.google.protobuf.InvalidProtocolBufferException;
    8. import lombok.extern.slf4j.Slf4j;
    9. import org.apache.commons.lang3.StringUtils;
    10. import org.junit.Test;
    11. import java.net.InetSocketAddress;
    12. import java.util.HashMap;
    13. import java.util.List;
    14. import java.util.Map;
    15. import java.util.Objects;
    16. /**
    17. * Unit test for simple App.
    18. */
    19. @Slf4j
    20. public class CanalTest {
    21. private Map errorMap = new HashMap<>();
    22. @Test
    23. public void testCanal() {
    24. initThread();
    25. }
    26. private void initThread() {
    27. new Thread(new Runnable() {
    28. @Override
    29. public void run() {
    30. while (true) {
    31. try {
    32. initConnect();
    33. } catch (Exception e) {
    34. String key = "canal_connection_error";
    35. if (!hasSameError(key, e.getMessage())) {
    36. log.error("canal连接出错: {}", e);
    37. }
    38. }
    39. try {
    40. Thread.sleep(10000);
    41. } catch (InterruptedException e) {
    42. }
    43. }
    44. }
    45. }).start();
    46. }
    47. private void initConnect() {
    48. String canalIp = "localhost";
    49. int canalPort = 11111;
    50. String canalDestination = "example";
    51. String canalUsername = "admin";
    52. String canalPassword = "123456";
    53. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp,
    54. canalPort), canalDestination, canalUsername, canalPassword);
    55. int batchSize = 200;
    56. try {
    57. connector.connect(); // 连接到canal server
    58. connector.subscribe("db_.*\\..*"); // 订阅指定的消息
    59. connector.rollback(); // 回滚到未进行ack 的地方
    60. log.info("canal连接成功");
    61. while (true) {
    62. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
    63. long batchId = message.getId();
    64. int size = message.getEntries().size();
    65. if (batchId == -1 || size == 0) {
    66. try {
    67. //未获取到消息则睡眠
    68. Thread.sleep(2000);
    69. } catch (InterruptedException e) {
    70. }
    71. } else {
    72. try {
    73. //处理消息
    74. log.info("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());
    75. handleMessage(message.getEntries());
    76. } catch (Exception e) {
    77. connector.rollback(batchId); // 处理失败, 回滚数据
    78. String key = "canal_sync_data_error";
    79. String errMsg = e.getMessage();
    80. if (StringUtils.isEmpty(errMsg)) errMsg = e.toString();
    81. if (!hasSameError(key, errMsg)) {
    82. log.error("同步数据出错: {}", e);
    83. }
    84. //休眠一段时间继续获取数据
    85. try {
    86. Thread.sleep(10000);
    87. } catch (InterruptedException ex) {
    88. ex.printStackTrace();
    89. }
    90. continue;
    91. }
    92. }
    93. connector.ack(batchId); // 提交确认
    94. }
    95. } finally {
    96. connector.disconnect();
    97. }
    98. }
    99. private boolean hasSameError(String key, String error) {
    100. String lastError = errorMap.get(key);
    101. if (Objects.equals(lastError, error)) {
    102. return true;
    103. }
    104. errorMap.put(key, error);
    105. return false;
    106. }
    107. private void handleMessage(List entrys) throws InvalidProtocolBufferException {
    108. for (CanalEntry.Entry entry : entrys) {
    109. if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
    110. continue;
    111. }
    112. //根据数据库名获取租户名
    113. String databaseName = entry.getHeader().getSchemaName();
    114. String tableName = entry.getHeader().getTableName();
    115. log.info("数据库: {}, 表名: {}", databaseName, tableName);
    116. // 获取类型
    117. CanalEntry.EntryType entryType = entry.getEntryType();
    118. // 获取序列化后的数据
    119. ByteString storeValue = entry.getStoreValue();
    120. if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
    121. // 反序列化数据
    122. CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
    123. // 获取当前事件的操作类型
    124. CanalEntry.EventType eventType = rowChange.getEventType();
    125. if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE
    126. || eventType == CanalEntry.EventType.DELETE) {
    127. // 获取数据集
    128. List rowDataList = rowChange.getRowDatasList();
    129. // 遍历rowDataList,并打印数据集
    130. for (CanalEntry.RowData rowData : rowDataList) {
    131. List afterColumnsList = rowData.getAfterColumnsList();
    132. List beforeColumnsList = rowData.getBeforeColumnsList();
    133. // 变更前数据
    134. for (CanalEntry.Column column : beforeColumnsList) {
    135. log.info("变更前数据: name: {}, value: {}", column.getName(), column.getValue());
    136. }
    137. // 变更后数据
    138. for (CanalEntry.Column column : afterColumnsList) {
    139. log.info("变更后数据: name: {}, value: {}", column.getName(), column.getValue());
    140. }
    141. }
    142. }
    143. }
    144. }
    145. }
    146. }

  • 相关阅读:
    数据库基础入门 — SQL
    Linux下安装Foldseek并从蛋白质的PDB结构中获取 3Di Token 和 3Di Embedding
    深入理解Java中的线程安全List:CopyOnWriteArrayList原理和应用
    (数据科学学习手札161)高性能数据分析利器DuckDB在Python中的使用
    Text-to-Image with Diffusion models的巅峰之作:深入解读​ DALL·E 2​
    linux运维基础一(shell两类命令&Hash缓存表&命令行历史)
    Pyspark学习笔记小总
    [附源码]JAVA毕业设计基于web的面向公众的食品安全知识系统(系统+LW)
    工作流常用表结构总结
    Redis过期删除策略和内存淘汰策略
  • 原文地址:https://blog.csdn.net/ting4937/article/details/134464393