canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
工作原理
MySQL 主备复制,MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看),MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log),然后重放 relay log 中事件,将数据变更反映它自己的数据。canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议,MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ),canal接收后,解析 binary log 对象(原始为 byte 流)
详细文档:https://github.com/alibaba/canal
笔者使用的环境
centos8,ip 192.168.5.19 ,内存 1 g
centos8,ip 192.168.5.39 ,内存 4 g
canal 版本 1.1.5
mysql 版本 8.0.31
目录
在 192.168.5.19 服务器上安装 mysql
拉取镜像
docker pull mysql:8.0.31

开启容器,挂载数据目录
- docker run --privileged=true -it -p 3306:3306 --name mysql8 -e MYSQL_ROOT_PASSWORD=123456 \
- -v /docker_date/mysql/conf:/etc/mysql/conf.d -v /docker_date/mysql/data:/var/lib/mysql -d mysql:8.0.31

进入/docker_date/mysql/conf目录
cd /docker_date/mysql/conf
新建 my.cnf 文件
touch my.cnf

编辑 my.cnf 文件
vi my.cnf
my.cnf 内容
- [mysqld]
- server-id=1
因为mysql8默认开启binlog,且格式是ROW,所以my.cnf 内容笔者只配置了server-id

保存后重启mysql容器
docker restart mysql8
使用Navicat 连接数据库查看binlog是否默认开启
查看命令
SHOW VARIABLES LIKE 'log_bin';

查看binlog格式
SHOW VARIABLES LIKE 'binlog_format';

创建用于复制的账号 canal,密码12346
create user 'canal'@'%' identified with mysql_native_password by '123456';

赋予权限
grant select, replication client,replication slave on *.* to 'canal'@'%';

刷新权限
flush privileges;

使用 Navicat 新建数据库 canal_test,然后建表 canal_user,用于测试
建表sql
- DROP TABLE IF EXISTS `canal_user`;
- CREATE TABLE `canal_user` (
- `id` int(0) NOT NULL AUTO_INCREMENT,
- `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
- PRIMARY KEY (`id`) USING BTREE
- ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
创建后

在 192.168.5.39 服务器上安装 canal
拉取镜像
docker pull canal/canal-server:v1.1.5

开启容器
docker run --name canal -it -p 11111:11111 -d canal/canal-server:v1.1.5
开启后可使用命令查看是否开启成功
docker logs canal
这里要给canal所在服务器分配足够内存,否则会启动失败,笔者刚开始给canal所在服务器1g内容,导致cana启动后会自动关闭,后来改成4g内存成功启动

成功开启容器后,进入容器修改配置
进入容器命令
docker exec -it canal bash
进入后编辑 instance.properties 文件
vi /home/admin/canal-server/conf/example/instance.properties
(1)、配置 canal.instance.mysql.slaveId=2,用于主从复制
(2)、配置要同步数据的 mysql 地址,canal.instance.master.address=192.168.5.19:3306
(3)、配置要同步数据的 mysql 的账号,canal.instance.dbUsername=canal
(4)、配置要同步数据的 mysql 的密码,canal.instance.dbPassword=123456
(5)、修改要同步数据的库表,canal.instance.filter.regex=canal_test\\..* (默认的canal.instance.filter.regex=.*\\..* 是所有库的所有表,这里笔者修改成 canal_test 库下的所有表,.* 表示所有,\\. 就是.因为这里是正则,正则中的 . 表示匹配除换行符以外的任意字符,所以 . 本身需要转义)
(6)、canal.mq.topic=example,这个不用改,后面会用
配置截图

保存后退出
重启 canal 容器
docker restart canal
firewall-cmd --zone=public --add-port=11111/tcp --permanent
更新防火墙规则(无需断开连接,动态添加规则)
firewall-cmd --reload
新建maven项目,引入 canal client 依赖
- <dependency>
- <groupId>com.alibaba.ottergroupId>
- <artifactId>canal.clientartifactId>
- <version>1.1.5version>
- dependency>
- <dependency>
- <groupId>com.alibaba.ottergroupId>
- <artifactId>canal.protocolartifactId>
- <version>1.1.5version>
- dependency>
笔者新建一个 springboot 项目
新建 Demo2 的类
- package com.wsjz.springbootcanal.canal;
-
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import com.alibaba.otter.canal.protocol.Message;
- import com.google.protobuf.InvalidProtocolBufferException;
- import java.net.InetSocketAddress;
- import java.util.List;
-
- /**
- * Demo2
- *
- * @author wsjz
- * @date 2022/11/11
- */
- public class Demo2 {
-
- public static void main(String[] args) {
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.5.39",
- 11111), "example", "", "");
- int batchSize = 1000;
-
- connector.connect();
- connector.subscribe(".*\\..*");
- connector.rollback();
-
- try {
- while (true) {
- Message message = connector.getWithoutAck(batchSize);
- long batchId = message.getId();
- int size = message.getEntries().size();
-
- if (batchId == -1 || size == 0) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- } else {
- try {
- entryhandler(message.getEntries());
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- }
- }
- }
- } finally {
- connector.disconnect();
- }
- }
-
- private static void entryhandler(List
entries) throws InvalidProtocolBufferException { - for (CanalEntry.Entry entry : entries) {
- if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
- System.out.println(entry.getEntryType());
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- System.out.println(rowChange.getEventType());
- if (CanalEntry.EventType.INSERT == rowChange.getEventType()) {
- insertHandler(entry);
- }
- if (CanalEntry.EventType.UPDATE == rowChange.getEventType()) {
- updateHandler(entry);
- }
- if (CanalEntry.EventType.DELETE == rowChange.getEventType()) {
- deleteHandler(entry);
- }
- }
- }
-
- }
-
- private static void insertHandler(CanalEntry.Entry entry) throws InvalidProtocolBufferException {
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- List
rowDatas = rowChange.getRowDatasList(); - for (CanalEntry.RowData rowData : rowDatas) {
- List
columns = rowData.getAfterColumnsList(); - for (CanalEntry.Column column : columns) {
- //获取字段
- System.out.println(column.getName());
- //获取值
- System.out.println(column.getValue());
- }
- }
- }
-
- private static void updateHandler(CanalEntry.Entry entry) throws InvalidProtocolBufferException {
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- List
rowDatas = rowChange.getRowDatasList(); - for (CanalEntry.RowData rowData : rowDatas) {
- //修改前
- List
beforeColumns = rowData.getBeforeColumnsList(); - for (CanalEntry.Column column : beforeColumns) {
- //获取字段
- System.out.println(column.getName());
- //修改前字段值
- System.out.println(column.getValue());
- }
-
- //修改后
- List
columns = rowData.getAfterColumnsList(); - for (CanalEntry.Column column : columns) {
- //获取字段
- System.out.println(column.getName());
- //修改后字段值
- System.out.println(column.getValue());
- }
- }
- }
-
- private static void deleteHandler(CanalEntry.Entry entry) throws InvalidProtocolBufferException {
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- List
rowDatas = rowChange.getRowDatasList(); - for (CanalEntry.RowData rowData : rowDatas) {
- //删除前
- List
beforeColumns = rowData.getBeforeColumnsList(); - for (CanalEntry.Column column : beforeColumns) {
- //获取字段
- System.out.println(column.getName());
- //删除前字段值
- System.out.println(column.getValue());
- }
- }
- }
- }
运行测试
在 Navicat 中向数据库添加数据,看程序能否接到
insert into canal_user(name) values('小明');

Java Client 成功接收到同步的数据,canal环境搭建成功
至此完