• 阿里同步神器Canal原理+安装+快速使用



    前言

    最开始听说canal是从mysql与redis双写一致性解决方案,当时并没有太在意,最近由于需要实时同步数据,如果在代码对insert/update/delete做拦截也可以实现,但对代码侵入性太大了,并且后期更改时容易有遗漏,风险太高,这时就又想到了canal,canal的好处在于对业务代码没有侵入,因为是基于监听binlog日志去进行同步数据,这个真的是太爽爽爽了。并且实时性也能做到准实时,这也是canal为什么这么流行,因为确实很多企业会用来做数据同步的方案。

    Canal简介

    canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

    早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

    基于日志增量订阅和消费的业务包括

    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护(拆分异构索引、倒排索引等)
    • 业务 cache 刷新
    • 带业务逻辑的增量数据处理

    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

    MySQL主备复制原理

    在这里插入图片描述

    • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
    • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
    • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

    canal 工作原理

    • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
    • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
    • canal 解析 binary log 对象(原始为 byte 流)

    1、MySQL配置

    我们提前得有一台MySQL数据库,本文基于mysql 5.7,参考安装装程:window mysql 5.7安装教程
    在安装Canal之前,我们需要做2件事:1). 修改MySQL配置支持binlog,2). 创建canal用户

    1.1 修改MySQL配置支持binlog

    修改my.ini,在[mysqld]下配置以下4项:

    [mysqld]
    # 打开binlog
    log-bin=mysql-bin
    # 选择ROW(行)模式
    binlog-format=ROW
    # 配置MySQL replaction需要定义,不要和canal的slaveId重复
    server_id=1
    # 要监控的数据库名称
    binlog-do-db=my-test
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    log-bin=mysql-bin

    打开binlog:表示 binlog 日 志 的 前 缀 是 mysql-bin , 以后生成的日志文件就是mysql-bin.000001,mysql-bin.000002…
    文件后面的数字按顺序生成,每次 mysql 重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。

    binlog-format=ROW

    mysql binlog 的格式有三种:binlog_format = statement | mixed | row
    ◼ statement
    语句级,binlog 会记录每次一执行写操作的语句。
    相对 row 模式节省空间,但是可能产生不一致性,比如
    update tt set create_date=now()
    如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据不同。
    优点: 节省空间
    缺点: 有可能造成数据不一致。
    ◼ row
    行级, binlog 会记录每次操作后每行记录的变化
    优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,只记录执行后的效果。
    缺点:占用较大空间。
    ◼ mixed
    statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement 模式不一致问题.
    默认还是 statement,在某些情况下譬如:
    当函数中包含 UUID() 时; 包含 AUTO_INCREMENT 字段的表被更新时; 执行 INSERT DELAYED 语句时;用 UDF 时;会按照 ROW 的方式进行处理。
    优点:节省空间,同时兼顾了一定的一致性。
    缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对 binlog 的监控的情况都不方便。

    1.2 创建canal用户

    -- 1. 使用命令登录:mysql -u root -p
    -- 2. 创建用户 用户名:canal 密码:canal@123456
    create user 'canal'@'%' identified by 'canal@123456';
    -- 3. 授权 *.*表示所有库
    grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal@123456';
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    1.3 重启mysql服务

    以管理员身份运行cmd
    C:\Windows\system32>net stop mysql
    MySQL 服务正在停止…
    MySQL 服务已成功停止。
    C:\Windows\system32>net start mysql
    MySQL 服务正在启动 .
    MySQL 服务已经启动成功。

    1.4 基本的查看binlog命令

    • 查看是否打开binlog模式:show variables like 'log_bin';
      在这里插入图片描述

    • 查看binlog日志文件列表:show binary logs;
      在这里插入图片描述

    • 查看当前正在写入的binlog文件:show master status;
      在这里插入图片描述


    2、下载安装canal

    官网下载:https://github.com/alibaba/canal/releases
    Latest: v1.1.6
    在这里插入图片描述

    2.1 解压canal

    我解压到 E:\servers\canal\canal.deployer-1.1.6
    在这里插入图片描述

    2.2 配置与mysql信息

    打开配置文件conf/example/instance.properties,主要配置数据库地址和用户:

    # mysql数据库地址
    canal.instance.master.address=127.0.0.1:3306
    # username/password
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal@123456
    
    • 1
    • 2
    • 3
    • 4
    • 5

    注意:这里不需要设置binlog的文件名和位置,自动为空即可。

    2.3 启动canal

    转到bin目录,cmd运行startup.bat
    在这里插入图片描述


    3. 快速使用

    3.1 官方客户端

    创建Maven项目,不需要依赖Spring,添加maven依赖:

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.6</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.protocol</artifactId>
        <version>1.1.6</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    创建类SimpleCanalClient

    package com.tiangang;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.common.utils.AddressUtils;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    public class SimpleCanalClient {
    
        private final CanalConnector connector;
        private Thread thread = null;
        private final Thread.UncaughtExceptionHandler handler = (t, e) -> e.printStackTrace();
        private volatile boolean running = false;
        private final static int BATCH_SIZE = 5 * 1024;
    
        public SimpleCanalClient(CanalConnector connector) {
            this.connector = connector;
        }
    
        public static void main(String[] args) {
            // 根据ip,直接创建链接,无HA的功能
            String destination = "example";
            String ip = AddressUtils.getHostIp();
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
                    destination,
                    "canal",
                    "canal@123456");
            final SimpleCanalClient simpleCanalClient = new SimpleCanalClient(connector);
            simpleCanalClient.start();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    System.out.println("## stop the canal client");
                    simpleCanalClient.stop();
                } catch (Throwable e) {
                    System.out.println("##something goes wrong when stopping canal:");
                    e.printStackTrace();
                } finally {
                    System.out.println("## canal client is down.");
                }
            }));
        }
    
        public void start() {
            if (this.connector == null) {
                System.out.println("connector不能为空,启动失败");
                return;
            }
            thread = new Thread(this::process);
            thread.setUncaughtExceptionHandler(handler);
            running = true;
            thread.start();
            System.out.println("canal client started...");
        }
    
        public void stop() {
            if (!running) {
                return;
            }
            running = false;
            if (thread != null) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    // ignore
                }
            }
            System.out.println("canal client stopped...");
        }
    
        private void process() {
            while (running) {
                try {
                    //打开连接
                    connector.connect();
                    //订阅数据库表,全部表
                    connector.subscribe(".*\\..*");
                    //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
                    connector.rollback();
                    while (running) {
                        // 获取指定数量的数据
                        Message message = connector.getWithoutAck(BATCH_SIZE);
                        //获取批量ID
                        long batchId = message.getId();
                        //获取批量的数量
                        int size = message.getEntries().size();
                        //如果没有数据
                        if (batchId == -1 || size == 0) {
                            try {
                                //线程休眠2秒
                                Thread.sleep(2000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        } else {
                            //如果有数据,处理数据
                            printEntry(message.getEntries());
                        }
                        if (batchId != -1) {
                            // 提交确认
                            connector.ack(batchId);
                        }
                    }
                } catch (Throwable e) {
                    e.printStackTrace();
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e1) {
                        // ignore
                    }
                    connector.rollback(); // 处理失败, 回滚数据
                } finally {
                    connector.disconnect();
                }
            }
        }
    
        /**
         * 打印canal server解析binlog获得的实体类信息
         */
        private static void printEntry(List<CanalEntry.Entry> entrys) {
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    //开启/关闭事务的实体类型,跳过
                    continue;
                }
                //RowChange对象,包含了一行数据变化的所有特征
                //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
                CanalEntry.RowChange rowChage;
                try {
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
                }
                //获取操作类型:insert/update/delete类型
                CanalEntry.EventType eventType = rowChage.getEventType();
                //打印Header信息
                System.out.println(String.format("================》; binlog[%s:%s] , dbName:%s, tableName:%s , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
                //判断是否是DDL语句
                if (rowChage.getIsDdl()) {
                    System.out.println("================》;isDDL: true,sql:" + rowChage.getSql());
                }
                //获取RowChange对象里的每一行数据,打印出来
                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                    //如果是删除语句
                    if (eventType == CanalEntry.EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                        //如果是新增语句
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                        //如果是更新的语句
                    } else {
                        //变更前的数据
                        System.out.println("------->; before");
                        printColumn(rowData.getBeforeColumnsList());
                        //变更后的数据
                        System.out.println("------->; after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<CanalEntry.Column> columns) {
            for (CanalEntry.Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175

    insert测试

    我提前创建了一个表

    CREATE TABLE `canal_user`  (
      `id` bigint(0) UNSIGNED NOT NULL AUTO_INCREMENT,
      `name` varchar(100) NULL,
      `age` int(0) UNSIGNED NULL,
      `create_time` datetime(0) NULL,
      `is_deleted` tinyint(1) NULL,
        PRIMARY KEY (`id`)
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    插入一条:

    INSERT INTO `canal_user`(`name`, `age`, create_time, is_deleted) VALUES ('hello-canal', 18, '2022-11-15 00:00:00', 0);
    
    • 1

    SimpleCanalClient监听到了:

    ================; binlog[mysql-bin.000005:7300] , dbName:my-test, tableName:canal_user , eventType : INSERT
    id : 1    update=true
    name : hello-canal    update=true
    age : 18    update=true
    create_time : 2022-11-15 00:00:00    update=true
    is_deleted : 0    update=true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    update测试

    update  `canal_user` set `name` = 'hello-canal-28', age = 28 where id = 1;
    
    • 1

    SimpleCanalClient监听到了:

    ================; binlog[mysql-bin.000005:7601] , dbName:my-test, tableName:canal_user , eventType : UPDATE
    ------->; before
    id : 1    update=false
    name : hello-canal    update=false
    age : 18    update=false
    create_time : 2022-11-15 00:00:00    update=false
    is_deleted : 0    update=false
    ------->; after
    id : 1    update=false
    name : hello-canal-28    update=true
    age : 28    update=true
    create_time : 2022-11-15 00:00:00    update=false
    is_deleted : 0    update=false
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    delete测试

    delete  from `canal_user`  where id = 1;
    
    • 1

    SimpleCanalClient监听到了:

    ================; binlog[mysql-bin.000005:7938] , dbName:my-test, tableName:canal_user , eventType : DELETE
    id : 1    update=false
    name : hello-canal-28    update=false
    age : 28    update=false
    create_time : 2022-11-15 00:00:00    update=false
    is_deleted : 0    update=false
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.2 第三方客户端

    第三方客户端采用SpringBoot整合:https://github.com/chenqian56131/spring-boot-starter-canal
    具体就不demo了,有兴趣的可以自行玩玩.


    Demo下载

    https://download.csdn.net/download/scm_2008/87017870

    参考

    Canal官网
    超详细canal入门,看这篇就够了
    阿里的数据同步神器——Canal
    实时采集Canal快速入门

  • 相关阅读:
    hiveserver2经常挂断的原因
    java-php-python-ssm玉米生产力管理与分析平台计算机毕业设计
    宏任务,微任务,事件循环event loop与promise、setTimeout、async、nextTick【超详细示例讲解】
    Python函数装饰器的深入解析
    电量优化 - Hook 系统服务
    Ubuntu 22.04 开机后在登陆界面循环问题解决
    adele心理学
    【PX4&Simulink&Gazebo联合仿真】在Simulink中使用ROS2控制无人机进入Offboard模式起飞悬停并在Gazebo中可视化
    python 线程池与队列简单应用
    语法练习:front3
  • 原文地址:https://blog.csdn.net/scm_2008/article/details/127862218