• docker 使用2台服务器安装 Canal 同步 Mysql 数据


    canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

    工作原理

    MySQL 主备复制,MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看),MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log),然后重放 relay log 中事件,将数据变更反映它自己的数据。canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议,MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ),canal接收后,解析 binary log 对象(原始为 byte 流)

    详细文档:https://github.com/alibaba/canal

    笔者使用的环境

    centos8,ip 192.168.5.19 ,内存 1 g

    centos8,ip 192.168.5.39 ,内存 4 g

    canal 版本 1.1.5

    mysql 版本 8.0.31

    目录

    1、安装mysql

    1.1、开启mysql容器

    1.2、配置主服务器开启 binlog

    1.3、新建用于同步数据得账号

    1.4、建库建表

    2、 安装 Canal

    2.1、开启 canal 容器

    2.2、配置 canal

    2.3、开放防火墙 11111 端口 

     3、Java ClientAPI


    1、安装mysql

    在 192.168.5.19 服务器上安装 mysql

    1.1、开启mysql容器

    拉取镜像

    docker pull mysql:8.0.31

     开启容器,挂载数据目录

    1. docker run --privileged=true -it -p 3306:3306 --name mysql8 -e MYSQL_ROOT_PASSWORD=123456 \
    2. -v /docker_date/mysql/conf:/etc/mysql/conf.d -v /docker_date/mysql/data:/var/lib/mysql -d mysql:8.0.31

    1.2、配置主服务器开启 binlog

    进入/docker_date/mysql/conf目录

    cd /docker_date/mysql/conf

    新建 my.cnf 文件

    touch my.cnf

     

    编辑 my.cnf 文件

    vi my.cnf

    my.cnf 内容

    1. [mysqld]
    2. server-id=1

    因为mysql8默认开启binlog,且格式是ROW,所以my.cnf 内容笔者只配置了server-id

    保存后重启mysql容器

    docker restart mysql8

    使用Navicat 连接数据库查看binlog是否默认开启

    查看命令

    SHOW VARIABLES LIKE 'log_bin';

     查看binlog格式

    SHOW VARIABLES LIKE 'binlog_format';

     

    1.3、新建用于同步数据得账号

    创建用于复制的账号 canal,密码12346

    create user 'canal'@'%' identified with mysql_native_password by '123456';

    赋予权限

    grant select, replication client,replication slave on *.* to 'canal'@'%';

     

    刷新权限

    flush privileges;

     

     

    1.4、建库建表

    使用 Navicat 新建数据库 canal_test,然后建表 canal_user,用于测试

    建表sql

    1. DROP TABLE IF EXISTS `canal_user`;
    2. CREATE TABLE `canal_user` (
    3. `id` int(0) NOT NULL AUTO_INCREMENT,
    4. `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
    5. PRIMARY KEY (`id`) USING BTREE
    6. ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

    创建后

    2、 安装 Canal

    在 192.168.5.39 服务器上安装 canal

    2.1、开启 canal 容器

    拉取镜像

    docker pull canal/canal-server:v1.1.5

    开启容器

    docker run --name canal -it -p 11111:11111 -d canal/canal-server:v1.1.5

    开启后可使用命令查看是否开启成功

    docker logs canal

    这里要给canal所在服务器分配足够内存,否则会启动失败,笔者刚开始给canal所在服务器1g内容,导致cana启动后会自动关闭,后来改成4g内存成功启动

     

    2.2、配置 canal

    成功开启容器后,进入容器修改配置

    进入容器命令

    docker exec -it canal bash

    进入后编辑 instance.properties 文件

    vi /home/admin/canal-server/conf/example/instance.properties

    (1)、配置 canal.instance.mysql.slaveId=2,用于主从复制

    (2)、配置要同步数据的 mysql 地址,canal.instance.master.address=192.168.5.19:3306

    (3)、配置要同步数据的 mysql 的账号,canal.instance.dbUsername=canal

    (4)、配置要同步数据的 mysql 的密码,canal.instance.dbPassword=123456

    (5)、修改要同步数据的库表,canal.instance.filter.regex=canal_test\\..*    (默认的canal.instance.filter.regex=.*\\..*  是所有库的所有表,这里笔者修改成 canal_test 库下的所有表,.*  表示所有,\\.  就是.因为这里是正则,正则中的 . 表示匹配除换行符以外的任意字符,所以 本身需要转义)

    (6)、canal.mq.topic=example,这个不用改,后面会用

    配置截图

    保存后退出

    重启 canal 容器

    docker restart canal

    2.3、开放防火墙 11111 端口 

    firewall-cmd --zone=public --add-port=11111/tcp --permanent

    更新防火墙规则(无需断开连接,动态添加规则)

    firewall-cmd --reload

     3、Java ClientAPI

     新建maven项目,引入  canal client 依赖

    1. <dependency>
    2. <groupId>com.alibaba.ottergroupId>
    3. <artifactId>canal.clientartifactId>
    4. <version>1.1.5version>
    5. dependency>
    6. <dependency>
    7. <groupId>com.alibaba.ottergroupId>
    8. <artifactId>canal.protocolartifactId>
    9. <version>1.1.5version>
    10. dependency>

    笔者新建一个 springboot 项目

    新建 Demo2 的类

    1. package com.wsjz.springbootcanal.canal;
    2. import com.alibaba.otter.canal.client.CanalConnector;
    3. import com.alibaba.otter.canal.client.CanalConnectors;
    4. import com.alibaba.otter.canal.protocol.CanalEntry;
    5. import com.alibaba.otter.canal.protocol.Message;
    6. import com.google.protobuf.InvalidProtocolBufferException;
    7. import java.net.InetSocketAddress;
    8. import java.util.List;
    9. /**
    10. * Demo2
    11. *
    12. * @author wsjz
    13. * @date 2022/11/11
    14. */
    15. public class Demo2 {
    16. public static void main(String[] args) {
    17. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.5.39",
    18. 11111), "example", "", "");
    19. int batchSize = 1000;
    20. connector.connect();
    21. connector.subscribe(".*\\..*");
    22. connector.rollback();
    23. try {
    24. while (true) {
    25. Message message = connector.getWithoutAck(batchSize);
    26. long batchId = message.getId();
    27. int size = message.getEntries().size();
    28. if (batchId == -1 || size == 0) {
    29. try {
    30. Thread.sleep(1000);
    31. } catch (InterruptedException e) {
    32. }
    33. } else {
    34. try {
    35. entryhandler(message.getEntries());
    36. } catch (InvalidProtocolBufferException e) {
    37. e.printStackTrace();
    38. }
    39. }
    40. }
    41. } finally {
    42. connector.disconnect();
    43. }
    44. }
    45. private static void entryhandler(List entries) throws InvalidProtocolBufferException {
    46. for (CanalEntry.Entry entry : entries) {
    47. if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
    48. System.out.println(entry.getEntryType());
    49. CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    50. System.out.println(rowChange.getEventType());
    51. if (CanalEntry.EventType.INSERT == rowChange.getEventType()) {
    52. insertHandler(entry);
    53. }
    54. if (CanalEntry.EventType.UPDATE == rowChange.getEventType()) {
    55. updateHandler(entry);
    56. }
    57. if (CanalEntry.EventType.DELETE == rowChange.getEventType()) {
    58. deleteHandler(entry);
    59. }
    60. }
    61. }
    62. }
    63. private static void insertHandler(CanalEntry.Entry entry) throws InvalidProtocolBufferException {
    64. CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    65. List rowDatas = rowChange.getRowDatasList();
    66. for (CanalEntry.RowData rowData : rowDatas) {
    67. List columns = rowData.getAfterColumnsList();
    68. for (CanalEntry.Column column : columns) {
    69. //获取字段
    70. System.out.println(column.getName());
    71. //获取值
    72. System.out.println(column.getValue());
    73. }
    74. }
    75. }
    76. private static void updateHandler(CanalEntry.Entry entry) throws InvalidProtocolBufferException {
    77. CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    78. List rowDatas = rowChange.getRowDatasList();
    79. for (CanalEntry.RowData rowData : rowDatas) {
    80. //修改前
    81. List beforeColumns = rowData.getBeforeColumnsList();
    82. for (CanalEntry.Column column : beforeColumns) {
    83. //获取字段
    84. System.out.println(column.getName());
    85. //修改前字段值
    86. System.out.println(column.getValue());
    87. }
    88. //修改后
    89. List columns = rowData.getAfterColumnsList();
    90. for (CanalEntry.Column column : columns) {
    91. //获取字段
    92. System.out.println(column.getName());
    93. //修改后字段值
    94. System.out.println(column.getValue());
    95. }
    96. }
    97. }
    98. private static void deleteHandler(CanalEntry.Entry entry) throws InvalidProtocolBufferException {
    99. CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    100. List rowDatas = rowChange.getRowDatasList();
    101. for (CanalEntry.RowData rowData : rowDatas) {
    102. //删除前
    103. List beforeColumns = rowData.getBeforeColumnsList();
    104. for (CanalEntry.Column column : beforeColumns) {
    105. //获取字段
    106. System.out.println(column.getName());
    107. //删除前字段值
    108. System.out.println(column.getValue());
    109. }
    110. }
    111. }
    112. }

    运行测试

    在 Navicat 中向数据库添加数据,看程序能否接到

    insert into canal_user(name) values('小明');

    Java Client 成功接收到同步的数据,canal环境搭建成功

    至此完

  • 相关阅读:
    java计算机毕业设计基于node.js的电影交流观影网站
    Introduction to ByteDance Pitaya
    python中的图像增强技术
    哈啰面试:说说Dubbo运行原理?
    Spring MVC BeanNameUrlHandlerMapping原理解析
    c语言练习78:执⾏操作后的变量值
    PsiQuantum宣布在容错量子计算架构方面取得新突破
    云原生 | Docker - [常用命令]
    助力道路场景下智能环境识别,基于YOLOv8全系列【n/s/m/l/x】参数模型开发构建道路场景下的道路边侧裸土检测识别分析系统
    JDBC 的定义及产品组件
  • 原文地址:https://blog.csdn.net/wsjzzcbq/article/details/127808614