• alibaba Canal 增量订阅 & 消费组件,了解,安装,部署实践


    alibaba Canal 增量订阅 & 消费组件,了解,安装,部署实践

    简介

    Github地址:https://github.com/alibaba/canal

    canal [kə’næl],译意为水道 / 管道 / 沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

    早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

    基于日志增量订阅和消费的业务包括

    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护 (拆分异构索引、倒排索引等)
    • 业务 cache 刷新
    • 带业务逻辑的增量数据处理

    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

    在这里插入图片描述

    工作原理

    • MySQL master:将数据变更写入二进制日志 (binary log, 其中记录叫做二进制日志事件 binary log events,可以通过 show binlog events 进行查看)
    • MySQL slave :masterbinary log events 拷贝到它的中继日志 (relay log)
    • MySQL slave :重放 relay log 中事件,将数据变更反映它自己的数据

    在这里插入图片描述

    canal 工作原理

    • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
    • MySQL master 收到 dump 请求,开始推送 binary log slave (即 canal)
    • canal 解析 binary log 对象 (原始为 byte 流)

    windows,linux 部署

    下载

    下载地址:https://github.com/alibaba/canal/releases

    canal.deployer-1.1.6.tar.gz,百度云地址链接:https://pan.baidu.com/s/1UIRIaDLm32xPTAdXPvdpYg 提取码:jhnq

    canal.deployer-1.1.4.tar.gz, 百度云地址链接:https://pan.baidu.com/s/1QV9Dg0_cfsAu-c2zOKwAQg 提取码:gknf

    下载部署版本 :canal.deployer-1.1.6.tar.gz
    在这里插入图片描述

    issues地址:https://github.com/alibaba/canal/issues/4245

    注意:1.1.6 版本安装包有问题,需要下载源码,自己重新编译版本

    Bug:java.io.IOException: ErrorPacket [errorNumber=1146, fieldCount=-1, message=Table ‘test.base table’ doesn’t exist, sqlState=42S02, sqlStateMarker=#]

    在这里插入图片描述

    下载部署版本 : canal.deployer-1.1.4.tar.gz

    在这里插入图片描述

    安装,以下步骤,我用 1.1.4 版本

    直接解压即可,

    在这里插入图片描述

    • bin :windows 和 Linux 启动命令
    • conf : 配置文件
    • lib: 依赖包
    • logs:运行日志文件存放地址

    配置Mysql

    在my.ini 文件中 添加 配置

    [mysqld]
    
    #开启binlog
    log-bin = mysql-bin
    
    #选择 row 模式
    binlog-format = ROW 
    
    #配置 mysql replaction 需要定义,不能和 canal 的 slaveId 重
    server_id = 1 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    检查配置是否生效

    show variables like 'binlog_format%'
    
    • 1

    在这里插入图片描述

    创建Mysql ,canal,用户

    #创建用户
    CREATE USER canal IDENTIFIED BY 'canal';
    
    #赋权
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';    
    
    #刷新
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;    
    FLUSH PRIVILEGES;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    配置 canal

    启动报错:ch.qos.logback.core.LogbackException: Unexpected filename extension of file [file:/E:/codes/github/canal/deployer/target/canal/conf/]. Should be either .groovy or .xml

    见:https://github.com/alibaba/canal/issues/3150

    修改方法,打开 startup.bat 文件 19 行修改如下:
    在这里插入图片描述

    set CANAL_OPTS= -DappName=otter-canal -Dlogback.configurationFile="%logback_configurationFile%logback.xml" -Dcanal.conf="%canal_conf%"
    
    • 1

    在这里插入图片描述

    /conf/canal.properties 单机下默认不用改配置

    /conf/example/ instance.properties 单机下默认不用改配置

    启动

    打开 bin 目录下, startUp.bat 双击

    在这里插入图片描述

    检查日志 是否启动成功,打开 /logs/example/example.log
    在这里插入图片描述

    检查日志 是否启动成功,打开 /logs/canal/canal.log
    在这里插入图片描述

    实践:Java 编码连接客户端

    引入依赖

            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.protocol</artifactId>
                <version>1.1.5</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.1.5</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    简单测试

    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import com.alibaba.otter.canal.protocol.Message;
    import java.net.InetSocketAddress;
    
    public  void canalClientExe() {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
                    11111), "example", "", "");
            int batchSize = 1000;
            try {
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                while (executeFlag) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        parseEntry(message.getEntries());
                    }
    
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
            } finally {
                connector.disconnect();
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    使用第三方开源依赖 canal-starter。地址:https://github.com/NormanGyllenhaal/canal-client

    依赖:

    	    <dependency>
                <groupId>top.javatool</groupId>
                <artifactId>canal-spring-boot-starter</artifactId>
                <version>1.2.1-RELEASE</version>
            </dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    编写配置文件:

    canal:
      destination: example  # canal实例名默认为example可以在配置文件中修改
      server: 192.168.0.5:11111
    
    • 1
    • 2
    • 3

    编写监听器:

    //需要监听的表
    @CanalTable("history_log")  
    @Component
    public class HistoryLogHandler implements EntryHandler<HistoryLog> {//指定表关系实体类
        @Override
        public void insert(HistoryLog historyLog) {
            //新增数据时执行此方法
        }
        @Override
        public void update(HistoryLog before, HistoryLog after) {
            //更新数据时执行此方法
        }
        @Override
        public void delete(HistoryLog historyLog) {
            //删除数据时执行此方法
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    总结

    canal 的好处在于对业务代码没有侵入,因为是基于监听 binlog 日志去进行同步数据的。实时性也能做到准实时,其实是很多企业一种比较常见的数据同步的方案。

    通过上面的学习之后,我们应该都明白 canal 是什么,它的原理,还有用法。实际上这仅仅只是入门,因为实际项目中我们不是这样玩的…

    实际项目我们是配置 MQ 模式,配合 RocketMQ 或者 Kafka,canal 会把数据发送到 MQ 的 topic 中,然后通过消息队列的消费者进行处理。

    Canal 的部署也是支持集群的,需要配合 ZooKeeper 进行集群管理。

  • 相关阅读:
    Keras深度学习实战(17)——使用U-Net架构进行图像分割
    ZigBee 3.0实战教程-Silicon Labs EFR32+EmberZnet-2-02:开发环境搭建
    动态一键换肤实现思路和demo
    Redis入门与应用
    【JDBC笔记】获取数据库连接
    知识点记录:李群李代数,微分流形,微分几何,图论
    mysql binlog的清理
    【Android进阶】5、Android断点调试与LogCat
    加拿大FBA海运详细说明
    Mac的设置与优化...持续更新
  • 原文地址:https://blog.csdn.net/Crazy_Cw/article/details/125491526