• 数据同步


    数据同步工具Canal

    https://github.com/alibaba/canal/
    阿里开源的数据同步工具,同步mysql的数据到es中,利用的是mysql的binlog日志。

    数据同步方案:

    1. 代码实现
      通过代码对数据库的增删改进行同步到es中,代码嵌入式较高。
    2. mybatis实现
      拦截sql语句,对insert,update,deleted语句进行分析,同步到es中,批量的话不好处理。
    3. aop实现
      也是不能较好处理批量问题。
    4. logstash实现
      官方的插件,但是性能不是很好,造成资源浪费。
    5. canal
      开源,简单,通过监听binlog日志来实现。

    工作原理:

    canal就是把自己伪装成Slave,模拟slave的交互协议向master发送消息,master接收到消息,把bin log给canal,然后canal解析bin log再发送到具体的目的地。

    作用:

    数据同步,不同 不是全量的,是增量的。

    【使用步骤】

    一:mysql配置
    1.新建一个mysql
    2.创建一个canal用户并授权,也可以直接使用root账号
    – 使用命令登录:mysql -u root -p
    – 创建用户 用户名:canal 密码:lihairui
    create user ‘canal’@‘%’ identified by ‘lihairui’;
    – 授权 *.*表示所有库

    grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'lihairui';
    GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'lihairui' WITH GRANT OPTION;
    FLUSH PRIVILEGES;
    
    • 1
    • 2
    • 3

    3.修改mysql的配置文件(my.ini,my.cnf)
    [mysqld]

    打开binlog

    log-bin=mysql-bin
    
    • 1

    选择ROW(行)模式

    binlog-format=ROW
    
    • 1

    配置MySQL replaction需要定义,不要和canal的slaveId重复

    server_id=1
    
    • 1

    4.重启mysql,查看是否开启了binlog模式

    show variables like 'log_bin';
    show variables like 'binlog_format';
    
    • 1
    • 2

    5.查看日志状态和master和slave状态

    show binary logs;
    show master status;
    show slave status;
    
    • 1
    • 2
    • 3

    二:canal.admin配置

    默认端口是8089,
    默认账号密码admin/123456

    1.github下载canal.admin压缩包并解压到指定目录(目录不要有中文和空格)
    2.配置admin的conf/application.yml
    修改数据库信息
    3.执行conf/canal_manager.sql文件,新建数据库和表
    4.启动start.bat,输入localhost:8090,admin/123456账号密码

    三:canal.deployer配置

    默认端口是11111
    1.github下载cancal.deployer压缩包并解压到指定目录(目录不要有中文和空格)
    2.如果使用canal.admin的话,要配置deployer的conf/canal.properties,把conf/canal_local.properties里面的配置覆盖到canal.properties的admin属性模块里面
    在这里插入图片描述

    3.配置deplyer的 conf/example/instance.properties
    修改canal.instance.master.address
    修改canal.instance.dbUsername
    修改canal.instance.dbPassword
    修改canal.instance.filter.regex,配置要监控哪些表,如果不配置,监控所有表
    4.配置conf/canal.properties
    修改canal.destinations,监听哪些canal实例名称(即conf下配置的文件夹名称,文件夹包含instance.properties),如果有多个,逗号分割
    在这里插入图片描述

    5.启动startup.bat,
    如果报缺少jvm.dll,把jdk/jre/server文件夹拷贝到jre/bin下面
    如果报下图
    在这里插入图片描述

    startup.bat打开删除红色里面的代码
    在这里插入图片描述

    6.如果么没报错,表明启动成功

    三.canal.adaptor配置
    默认端口是8081

    1. 在github下载canal.adaptor压缩包并解压到指定目录(目录不要有中文和空格)

    2. 修改conf/application.yml文件
      修改canalServerHost,默认不修改,读取的是canalserver的配置信息
      修改defaultDS
      修改outerAdapter.es,要和es配置信息保持一致,选用高级的rest
      在这里插入图片描述

    3. 修改conf/es/mytest_user.yml,不修改启动会报错
      修改mytest_user.yml
      创建mytest表

    dataSourceKey: defaultDS
    destination: example
    groupId: g1
    esMapping:
      _index: mytest_user
      _type: _doc
      _id: id
      upsert: true
    #  pk: id
      sql: "select a.id id,a.name name,a.role_id role_id,a.c_time c_time from user a"
    #  objFields:
    #    _labels: array:;
      etlCondition: "where a.c_time>={}"
      commitBatch: 3000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    创建mytest表

    create database mytest;
    use mytest;
    create table user (
      `id` int(10) NOT NULL,
      `name` varchar(100) DEFAULT NULL,
      `role_id` int(10) NOT NULL,
      `c_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
      `c_utime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`)
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    创建es的mytest_user索引

    PUT /mytest_user
    {
      "mappings": {
        "_doc": {
          "properties": {
            "name": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword"
                }
              }
            },
            "role_id": {
              "type": "long"
            },
            "c_time": {
              "type": "date"
            }
          }
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    9.启动canal.adapter,插入一条数据,看是否同步到es了。

    【初始化数据,全量导入】
    http://127.0.0.1:8081/etl/es/mytest_user.yml

    【canal同步数据到es步骤】

    1. 在deployer的conf/ 下面新建文件夹,文件夹就是canal的实例名字,然后在新建的文件夹里面新建instance.properties,修改相关配置
      修改canal.instance.master.address
      修改canal.instance.dbUsername
      修改canal.instance.dbPassword
      修改canal.instance.filter.regex

    2. 在deployer的conf/canal.properties修改配置
      在这里插入图片描述

    3. 在adapter里面配置conf/application.yml,配置要适配的实例名称
      在这里插入图片描述

    4. 确定要处理的索引的结构,一旦建立无法修改

    5. 在adapter的conf/es文件夹下面建立 索引名称.yml文件,

    6. 在kibana里面执行插入索引,然后重启canal服务。

    【sql映射】

    1. 主表不能为子查询语句
    2. 只能使用left outer join即最左表一定要是主表
    3. 关联从表如果是子查询不能有多张表
    4. 主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐, 可能会造成数据同步的不一致, 比如修改了where条件中的字段内容)
      关联条件只允许主外键的’='操作不能出现其他常量判断比如: on a.role_id=b.id and b.statues=1
    5. 关联条件必须要有一个字段出现在主查询语句中比如: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须出现在主select语句中

    【canal同步可能遇到的问题】

    1. adapter同步es的时候,如果有连表聚合查询,主表一定要有一个字段叫_id的别名,canal会自动转换通过_v._id进行关联。
    2. adapter的sql语句,mysql的保留字最好不要当做字段名,sql语句也不要加``,要不然同步到ES会不成功。
    3. adapter的sql语句,主表不能加where条件,如果加了where条件,到时候修改了where条件字段的值,是不会同步过来的。

    ###【注意】
    deployer的conf里面可以配置多个目录,每个目录(canal要配置的多个实例名字)里面的配置表明是一个索引,建立完毕之后,deployer就会监控这里面的instance.properties。

    adapter同步es,adapter下的conf/application.yml,添加多个- instance:实例,实例名字就是deployer的conf配置的文件夹名字,然后通过es文件夹下面的实例名字.yml进行es相关的配置映射等。

    总结:
    deployer的conf文件夹就是配置要查询的表信息和数据库连接信息,配置的文件夹名字就是canal实例名字。。
    adapter如果是同步es,下面conf/application.yml就要配置多个canal实例名字(即deployer配置的文件夹名字),然后再conf/es文件夹下面,配置实例名字.yml配置文件,进行sql查询之后的结果和es入库之间的映射关系。

  • 相关阅读:
    华为OD机试 - 矩形相交的面积 - 逻辑分析(Java 2023 B卷 100分)
    leetcode 习题集 【9月】
    windows 下使用 nvm use version 时报错 exit status 1
    语音识别whisper的介绍、安装、错误记录
    头歌php 表单语言进阶
    Debezium系列之:修改源码支持drop foreign key if exists fk
    【二八法则】精品和爆款的力量
    ssm分页实战
    【HCIE】01.IGP高级特性
    通过css设置元素隐藏和显示
  • 原文地址:https://blog.csdn.net/u010020726/article/details/125531442