• 安装Canal1.1.5 并监控mysql8的binlog


    创建canal用户

    这只是为了安全,只让这个用户有读数据库的权限

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

    #将用户root的host修改为%实现IP登录 update user set host = '%' where user ='canal';

    #刷新权限数据表

    flush privileges;

    查看是不是开启了bin

    show variables like 'log_bin';

    先查下镜像版本: Docker Hub

    拉取镜像

    docker pull canal/canal-server:v1.1.6

    启动镜像

    docker run --name canal -d canal/canal-server:v1.1.5

    进docker 查看文件: docker exec -it 370cd2adf99c /bin/bash

    对照是否有下面要拷贝的目录:/home/admin/canal-server/conf/

    查看docker 里的日志 tail -100f /home/admin/canal-server/logs/example/example.log

    将容器内部配置文件拷贝到外部

    docker cp [容器索引]:[内部路径] [外部路径]

    docker cp canal:/home/admin/canal-server/conf/canal.properties /usr/local/canal docker cp canal:/home/admin/canal-server/conf/example/instance.properties /usr/local/canal/example

    修改:instance.properties

     

    关闭移除容器

    #关闭容器 docker stop canal

    #移除容器 docker rm canal

    #启动新的 这里-v是将外部的文件挂载到容器内部 这样就不用每次启动都要配置参数了

    docker run --restart=always --name canal -p 11111:11111 -d -v /usr/local/canal/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /usr/local/canal/canal.properties:/home/admin/canal-server/conf/canal.properties canal/canal-server:v1.1.5

    为什么没有取到数据:

    查所有表

    这样才是对的。

    不知道为什么用本地mysql8可以,用112上的mysql8就不行。

    多mysql

    注意

    当启动两个一样的Test单元测试, 当修改数据库测试,同一时间只会有一个程序会接受数据,不会两个程序同时接受数据,所以可以分布式部署来监控bin_log, 经测试,基本上是轮训。

    代码

    1. <dependency>
    2. <groupId>com.alibaba.otter</groupId>
    3. <artifactId>canal.client</artifactId>
    4. <version>1.1.6</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>com.alibaba.otter</groupId>
    8. <artifactId>canal.protocol</artifactId>
    9. <version>1.1.6</version>
    10. </dependency>
    11. <dependency>
    12. <groupId>com.alibaba.otter</groupId>
    13. <artifactId>canal.common</artifactId>
    14. <version>1.1.6</version>
    15. </dependency>

    1. package com.lm.demo.test;
    2. import com.alibaba.otter.canal.client.CanalConnector;
    3. import com.alibaba.otter.canal.client.CanalConnectors;
    4. import com.alibaba.otter.canal.protocol.CanalEntry;
    5. import com.alibaba.otter.canal.protocol.Message;
    6. import org.junit.Test;
    7. import java.net.InetSocketAddress;
    8. import java.util.List;
    9. import java.util.concurrent.TimeUnit;
    10. /**
    11. * canal client api 的使用
    12. * https://github.com/alibaba/canal/wiki/ClientExample
    13. * 测试过程中发现,如果修改一个sql语句,但是修改的值没有发生变化,则此处不会监控到。
    14. * 同一个客户端启动多次,只有一个客户端可以获取到数据
    15. *
    16. * @author huan.fu 2021/5/31 - 上午10:31
    17. */
    18. public class CanalClientApiTest {
    19. @Test
    20. public void test2( ) {
    21. test();
    22. }
    23. @Test
    24. public void test( ) {
    25. String destination = "example";
    26. // 创建一个 canal 链接
    27. CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.112.112", 11111), destination, "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");
    28. // 链接对应的canal server
    29. canalConnector.connect();
    30. // 订阅那个库的那个表等
    31. /**
    32. * 订阅规则
    33. * 1. 所有表:.* or .*\\..*
    34. * 2. canal schema下所有表: canal\\..*
    35. * 3. canal下的以canal打头的表:canal\\.canal.*
    36. * 4. canal schema下的一张表:canal\\.test1
    37. * 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
    38. */
    39. //canalConnector.subscribe("temp_work\\.customer");
    40. canalConnector.subscribe(".*\\..*"); //test库下所有表
    41. // 回滚到未进行 #ack 的地方,下次fetch的时候,可以从最后一个没有 #ack 的地方开始拿
    42. canalConnector.rollback();
    43. int batchSize = 1000;
    44. while (true) {
    45. // 获取一批数据,不一定会获取到 batchSize 条
    46. Message message = canalConnector.getWithoutAck(batchSize);
    47. // 获取批次id
    48. long batchId = message.getId();
    49. // 获取数据
    50. List<CanalEntry.Entry> entries = message.getEntries();
    51. if (batchId == -1 || entries.isEmpty()) {
    52. System.out.println("没有获取到数据");
    53. try {
    54. TimeUnit.SECONDS.sleep(3);
    55. } catch (InterruptedException e) {
    56. e.printStackTrace();
    57. }
    58. continue;
    59. }
    60. for (CanalEntry.Entry entry : entries) {
    61. if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
    62. continue;
    63. }
    64. CanalEntry.RowChange rowChange;
    65. try {
    66. rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    67. } catch (Exception e) {
    68. throw new RuntimeException("解析binlog数据出现异常 , data:" + entry.toString(), e);
    69. }
    70. CanalEntry.EventType eventType = rowChange.getEventType();
    71. System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
    72. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
    73. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
    74. eventType));
    75. if (eventType == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {
    76. System.out.println("sql => " + rowChange.getSql());
    77. }
    78. for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
    79. if (eventType == CanalEntry.EventType.DELETE) {
    80. printColumn(rowData.getBeforeColumnsList());
    81. } else if (eventType == CanalEntry.EventType.INSERT) {
    82. printColumn(rowData.getAfterColumnsList());
    83. } else {
    84. System.out.println("-------> before");
    85. printColumn(rowData.getBeforeColumnsList());
    86. System.out.println("-------> after");
    87. printColumn(rowData.getAfterColumnsList());
    88. }
    89. }
    90. }
    91. canalConnector.ack(batchId);
    92. }
    93. }
    94. private static void printColumn(List<CanalEntry.Column> columns) {
    95. for (CanalEntry.Column column : columns) {
    96. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    97. }
    98. }
    99. }

  • 相关阅读:
    Java的JDBC编程
    uniapp音乐播放整理
    使用vue-cli搭建SPA项目
    如何使用zx编写shell脚本
    IIS 解析漏洞复现
    基于MATLAB的无人机路径规划设计
    【已解决】pycharm 突然每次点击都开新页面,关不掉怎么办?
    Nginx实现高并发原理是什么?该考虑如何优化?
    VScode配置文件launch.json 和 tasks.json配置项详细说明
    美瞳小程序经营配送商城的作用是什么
  • 原文地址:https://blog.csdn.net/liuming690452074/article/details/125611022