• canal实操应用


    一、MySQL的binlog日志

    1.1、binlog的分类

    binlog一般分为三类:statement语句级,记录一条一条的SQL,一条SQL可能更改多行,且SQL语句中如果用到now()函数或者random()函数,会存在数据不一致的问题。row行级,记录一行行的数据,记录特别细致,但是日志文件会比较大。mixed:混合模式,默认还是statement,某些情况下,如UUID()函数就会用row的方式进行处理。

    1.2、准备好数据库及表

    我这里准备了一个canal库,一张student表

    CREATE TABLE `student` (
      `id` int NOT NULL,
      `name` varchar(255) DEFAULT NULL,
      `sex` bit(1) DEFAULT NULL COMMENT '0:女,1:男',
      `age` int DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    小知识:mysql utf8mb4_0900_ai_ci和utf8mb4_general_ci区别?

    utf8mb4_general_ci:
    utf8mb4_general_ci 是较为简单的排序规则,对字符的比较是基于字符的二进制值的。
    不考虑语言特定的规则,适用于一般的字符比较,但在某些语境下可能导致不符合期望的排序结果。
    不区分大小写,但对于一些特殊字符,可能不按照某些语言的预期顺序排序。
    utf8mb4_0900_ai_ci:
    utf8mb4_0900_ai_ci 是 MySQL 8.0.0 版本后引入的排序规则,采用 Unicode 9.0.0 版本的规则。
    它考虑了更多的语言和语境,提供更精确的字符排序,适用于多语言环境。
    支持大小写不敏感的比较,并且对于一些特殊字符的排序更符合语言特定的规范。
    选择哪种排序规则取决于你的具体需求。如果你的应用面向的是特定语言环境,需要更精确和符合语言规范的字符比较,那么 utf8mb4_0900_ai_ci 可能更适合。如果你对语言特定的排序规则没有特别的要求,或者你希望比较快速并且不涉及复杂的字符排序问题,那么 utf8mb4_general_ci 也是一个可行的选择。

    CREATE TABLE my_table (
        column1 VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
        column2 VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
    );
    
    • 1
    • 2
    • 3
    • 4

    注:但是utf8只有utf8_general_ci这种字符集排序规则
    在这里插入图片描述

    二、实操

    2.1、修改配置文件开启binlog

    # For advice on how to change settings please see
    # http://dev.mysql.com/doc/refman/8.0/en/server-configuration-defaults.html
    
    [mysqld]
    # [必须]主服务器唯一ID,千万注意做mysql集群的时候这个id不能重复,
    # 而当前我们用canal同步也是,canal是伪装成mysql的slave,所以也不能和canal配置文件中的id重复
    server-id=1
    # [必须]启动二进制日志,指明路径。比如:自己本地的路径/log/mysqlbin
    log-bin=mysql-bin
    # [可选]设置需要复制的数据库,默认全部记录。比如
    # 开启需要监控的数据库
    binlog-do-db=replica_master_slave
    binlog-do-db=canal
    # binlog日志级别statement,row,mixed
    #binlog_format=STATEMENT
    binlog_format=ROW
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    修改完MySQL配置文件之后记得重启MySQL服务, systemctl restart mysqld
    重启之后开是否真正开启binlog日志,可以插入一条数据前看一下日志大小,插入数据之后再查看一下binlog文件的大小,linu直接安装的一般在/var/lib/mysql文件夹下就可以看到binlog日志

    2.2、canal用户赋值MySQL权限

    最小权限原则,赋予canl用户读的权限即可,mysql -u root -p123456 密码写你自己的,登录客户端执行如下语句

    # MySQL5.7需要执行如下2条语句
    set global validate_password_length=4;
    set global validate_password_policy=0;
    #8.0使用下面一条语句即可
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
    #也许你也和我一样,上面那条语句根本执行不成功,执行如下语句即可——先创建用户,再赋予权限
    CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    如下图只开放了读的权限
    在这里插入图片描述

    2.3、下载canal并使用

    网址,我用的当前最新版,冲冲冲
    下载deploy版本即可。
    解压tar -zxvf canal.deployer-1.1.7.tar.gz -C /usr/local/cancal-1.1.7/之后如下图更改关键配置
    tcp是自己写代码想怎么操作随自己来,然后就是写入各种MQ,请君自选。
    在这里插入图片描述
    在这里插入图片描述

    三、搭建Java客户端

        <dependencies>
            <dependency>
                <groupId>com.alibaba.ottergroupId>
                <artifactId>canal.clientartifactId>
                <version>1.1.0version>
            dependency>
        dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    canal组件细节图
    在这里插入图片描述

    package org.tg.canal;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.google.protobuf.ByteString;
    import com.google.protobuf.InvalidProtocolBufferException;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Author Sumuxi
     * @Date 2023/11/10 21:58
     * @Desc
     */
    
    public class Main {
    
    
        public static void main(String[] args) throws InvalidProtocolBufferException {
    
            // 1。获取链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop113", 11111), "example", "", "");
    
            // 2.链接canal
            connector.connect();
            // 3.订阅数据库
            connector.subscribe("canal.*");
            while (true) {
    
                // 4.获取数据
                Message message = connector.get(100);// 获取指定数量的数据,有多少取多少,不会阻塞等待
                // 5.获取entry集合
                List<CanalEntry.Entry> entryList = message.getEntries();
    
                // 集合是空,就等3s后再去拉取
                if (entryList.size() == 0) {
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                } else {
                    // 遍历entryList,单条单条解析
                    for (CanalEntry.Entry entry : entryList) {
                        // 1.获取表名
                        String tableName = entry.getHeader().getTableName();
    
                        // 2.获取类型
                        //    TRANSACTIONBEGIN      事务开启,
                        //    ROWDATA               行数据,
                        //    TRANSACTIONEND        事务关闭
                        //    HEARTBEAT             心跳,
                        //    GTIDLOG               GTID日志;
                        CanalEntry.EntryType entryType = entry.getEntryType();
    
                        // 3.获取序列化后的数据
                        ByteString storeValue = entry.getStoreValue();
                        // 4.判断当前entry的类型是什么?
                        if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                            // 5.反序列化数据
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                            // 6.获取事件的操作类型
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            // 7.获取数据集
                            List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
    
                            for (CanalEntry.RowData rowData : rowDataList) {
                                // 更新前
                                JSONObject beforeData = new JSONObject();
                                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    beforeData.put(column.getName(), column.getValue());
                                }
                                // 更新后
                                JSONObject afterData = new JSONObject();
                                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    afterData.put(column.getName(), column.getValue());
                                }
    
                                System.out.println(
                                        "Table:" + tableName +
                                                ",EventType:" + eventType +
                                                ",Before:" + beforeData +
                                                ",After:" + afterData);
                            }
                        } else {
                            System.out.println("当前操作类型为:" + entryType);
                        }
                    }
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    最后,其实参考github上的这个实例也能书写客户端,https://github.com/alibaba/canal/wiki/ClientExample,不妥之处,盼多多指正,后续持续完善。
    纨绔不饿死,儒冠多误身。读书破万卷,下笔如有神。致君尧舜上,再使风俗淳。骑驴三十载,旅食京华春。朝扣富儿门,暮随肥马尘埃。残杯与冷炙,到处潜悲辛。白鸥没浩荡,万里谁能驯。——节选杜甫《奉赠韦左丞丈二十二韵》中的几句

  • 相关阅读:
    数据库优化方法及思路分析
    人工智能知识全面讲解: 图像识别的准备工作
    Slimming剪枝方法
    SpringBoot
    JS判断浏览器类型
    Linux —用户和组
    并查集及实现
    【Qt】Qt中关联容器QMap,QMultiMap,QHash,QMultiHash 的理解
    高通导航器软件开发包使用指南(6)
    Sql优化总结!详细!(2021最新面试必问)
  • 原文地址:https://blog.csdn.net/Sumuxi9797926/article/details/134339394