• springboot集成canal,将数据发送至接口


    前置条件:

    1. mysql开启binlog日志
    2. 部署canal服务

    Springboot代码:

    接口工具类:

    1. import cn.hutool.http.HttpRequest;
    2. import cn.hutool.http.HttpResponse;
    3. import com.alibaba.fastjson.JSON;
    4. import com.alibaba.fastjson.JSONObject;
    5. import java.util.HashMap;
    6. import java.util.Map;
    7. import lombok.extern.slf4j.Slf4j;
    8. import org.springframework.scheduling.annotation.Async;
    9. import org.springframework.stereotype.Component;
    10. @Slf4j
    11. @Component
    12. public class HttpMethodUtil {
    13. @Async
    14. public int httpPost(String url,String Content) {
    15. // 添加请求头信息
    16. Map heads = new HashMap<>();
    17. // 使用json发送请求,下面的是必须的
    18. heads.put("Content-Type", "application/json;charset=UTF-8");
    19. HttpResponse response = HttpRequest.post(url)
    20. .headerMap(heads, false)
    21. .body(Content)
    22. .timeout(5 * 1000)
    23. .execute();
    24. JSONObject jsonObject = JSON.parseObject(response.body());
    25. int code = (int) jsonObject.get("code");
    26. if (code!=0){
    27. response = HttpRequest.post(url)
    28. .headerMap(heads, false)
    29. .body(Content)
    30. .timeout(5 * 1000)
    31. .execute();
    32. }
    33. jsonObject = JSON.parseObject(response.body());
    34. code = (int) jsonObject.get("code");
    35. return code;
    36. }

    canal工具类:

    1. import com.alibaba.fastjson.JSONObject;
    2. import com.alibaba.otter.canal.client.CanalConnector;
    3. import com.alibaba.otter.canal.client.CanalConnectors;
    4. import com.alibaba.otter.canal.protocol.CanalEntry;
    5. import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    6. import com.alibaba.otter.canal.protocol.Message;
    7. import java.net.InetSocketAddress;
    8. import java.util.List;
    9. import javax.annotation.PostConstruct;
    10. import javax.annotation.Resource;
    11. import lombok.extern.slf4j.Slf4j;
    12. import org.springframework.beans.factory.annotation.Value;
    13. import org.springframework.stereotype.Component;
    14. @Component
    15. @Slf4j
    16. public class CanalUtil {
    17. @Value("${canal-monitor-mysql.hostname}")
    18. String canalMonitorHost;
    19. @Value("${canal-monitor-mysql.port}")
    20. Integer canalMonitorPort;
    21. @Value("${canal-monitor-mysql.database}")
    22. String canalMonitorDatabaseName;
    23. private final static int BATCH_SIZE = 100;
    24. @Resource
    25. HttpMethodUtil httpMethodUtil;
    26. /**
    27. * 启动服务
    28. */
    29. @PostConstruct
    30. public void startMonitorSQL() {
    31. while (true) {
    32. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", "");
    33. try {
    34. //打开连接
    35. connector.connect();
    36. log.info("数据库检测连接成功!" + canalMonitorDatabaseName);
    37. //订阅数据库表,全部表q
    38. // connector.subscribe(canalMonitorTableName + "\\..*");
    39. connector.subscribe(canalMonitorDatabaseName+ "\\.jyz_jyzqgdwa");
    40. //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
    41. connector.rollback();
    42. while (true) {
    43. // 获取指定数量的数据
    44. Message message = connector.getWithoutAck(BATCH_SIZE);
    45. long batchId = message.getId();
    46. int size = message.getEntries().size();
    47. if (batchId == -1 || size == 0) {
    48. } else {
    49. handleDATAChange(message.getEntries());
    50. }
    51. // 提交确认
    52. connector.ack(batchId);
    53. }
    54. } catch (Exception e) {
    55. e.printStackTrace();
    56. log.error("成功断开监测连接!尝试重连");
    57. } finally {
    58. connector.disconnect();
    59. //防止频繁访问数据库链接: 线程睡眠 10秒
    60. try {
    61. Thread.sleep(10 * 1000);
    62. } catch (InterruptedException e) {
    63. e.printStackTrace();
    64. }
    65. }
    66. }
    67. }
    68. /**
    69. * 打印canal server解析binlog获得的实体类信息
    70. */
    71. private void handleDATAChange(List entrys) {
    72. for (CanalEntry.Entry entry : entrys) {
    73. if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
    74. continue;
    75. }
    76. //RowChange对象,包含了一行数据变化的所有特征
    77. CanalEntry.RowChange rowChage;
    78. try {
    79. rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    80. } catch (Exception e) {
    81. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);
    82. }
    83. //获取操作类型:insert/update/delete类型
    84. CanalEntry.EventType eventType = rowChage.getEventType();
    85. //打印Header信息
    86. log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",
    87. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
    88. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
    89. eventType);
    90. //判断是否是DDL语句
    91. if (rowChage.getIsDdl()) {
    92. log.info("================》;isDdl: true,sql:{}", rowChage.getSql());
    93. }
    94. //获取RowChange对象里的每一行数据,打印出来
    95. for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
    96. //如果是删除语句
    97. if (eventType == CanalEntry.EventType.DELETE) {
    98. log.info(">>>>>>>>>> 删除 >>>>>>>>>>");
    99. log.info("DELETE:{}", rowData.getBeforeColumnsList());
    100. //如果是新增语句
    101. } else if (eventType == CanalEntry.EventType.INSERT) {
    102. log.info(">>>>>>>>>> 新增 >>>>>>>>>>");
    103. JSONObject json = new JSONObject();
    104. for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
    105. json.put(column.getName(), column.getValue());
    106. }
    107. log.info(json.toJSONString());
    108. String url = "http://192.168.21.11:8000/api/wirte";
    109. int code = httpMethodUtil.httpPost(url,json.toJSONString());
    110. if(code==0){
    111. log.info(json.toJSONString());
    112. }else {
    113. log.error(json.toJSONString());
    114. }
    115. //如果是更新的语句
    116. } else {
    117. log.info(">>>>>>>>>> 更新 >>>>>>>>>>");
    118. //变更前的数据
    119. log.info("------->; before");
    120. JSONObject json = new JSONObject();
    121. for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
    122. json.put(column.getName(), column.getValue());
    123. }
    124. log.info(json.toJSONString());
    125. String url = "http://192.168.21.11:8000/api/wirte";
    126. int code = httpMethodUtil.httpPost(url,json.toJSONString());
    127. if(code==0){
    128. log.info(json.toJSONString());
    129. }else {
    130. log.error(json.toJSONString());
    131. }
    132. }
    133. }
    134. }
    135. }
    136. }

    application.properties配置文件:

    1. canal-monitor-mysql.hostname: 192.168.21.11
    2. canal-monitor-mysql.port: 11111
    3. canal-monitor-mysql.database: datbases_name
    4. logging.file.name=./log/rizhi.log
    5. logging.pattern.dateformat=yyyy-MM-dd # 设置日期格式化
    6. logging.logback.rollingpolicy.max-history=30

  • 相关阅读:
    开源和闭源的优劣势比较
    【翻译】Seastar 教程(四)
    stm32 I2C结构体解析
    常见的敏捷开发框架
    做地推共享wifi贴的如今都怎么样了?
    获取分布式的请求响应内容
    (附源码)php在线考试系统 毕业设计 032028
    电厂三维人员定位系统的应用与优势有哪些?
    李宏毅-机器学习-笔记-P1
    第四章 文件管理 十、文件系统的全局结构
  • 原文地址:https://blog.csdn.net/weixin_38959210/article/details/134022603