• 【MySQL】开启 canal同步MySQL增量数据到ES


    开启 canal同步MySQL增量数据到ES

    canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。示使用 canal 将 MySQL 增量数据同步到ES。
    在这里插入图片描述

    一、集群模式

    在这里插入图片描述

    图中 server 对应一个 canal 运行实例 ,对应一个 JVM 。

    server 中包含 1…n 个 instance , 我们可以将 instance 理解为配置任务。

    instance 包含如下模块 :

    eventParser:数据源接入,模拟 slave 协议和 master 进行交互,协议解析。

    eventSink:Parser 和 Store 链接器,进行数据过滤,加工,分发的工作。

    eventStore:数据存储。

    metaManager:增量订阅 & 消费信息管理器。

    真实场景中,canal 高可用依赖 zookeeper ,笔者将客户端模式可以简单划分为:TCP 模式 和 MQ 模式 。

    实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可。

    顺序消费:

    对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
    在这里插入图片描述

    二、MySQL配置

    1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

    注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。

    2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant :

    CREATE USER canal IDENTIFIED BY ‘canal’;
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@‘%’;
    – GRANT ALL PRIVILEGES ON . TO ‘canal’@‘%’ ;
    FLUSH PRIVILEGES;

    3、创建数据库商品表 t_product :

    CREATE TABLE t_product (
    id BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
    name VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
    price DECIMAL ( 10, 2 ) NOT NULL,
    status TINYINT ( 4 ) NOT NULL,
    create_time datetime NOT NULL,
    update_time datetime NOT NULL,
    PRIMARY KEY ( id )
    ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin

    三、Elasticsearch配置

    使用 Kibana 创建商品索引 。

    PUT /t_product
    {
    “settings”: {
    “number_of_shards”: 2,
    “number_of_replicas”: 1
    },
    “mappings”: {
    “properties”: {
    “id”: {
    “type”:“keyword”
    },
    “name”: {
    “type”:“text”
    },
    “price”: {
    “type”:“double”
    },
    “status”: {
    “type”:“integer”
    },
    “createTime”: {
    “type”: “date”,
    “format”: “yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis”
    },
    “updateTime”: {
    “type”: “date”,
    “format”: “yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis”
    }
    }
    }
    }

    执行完成,如图所示 :

    在这里插入图片描述

    四、RocketMQ 配置

    创建主题:product-syn-topic ,canal 会将 Binlog 的变化数据发送到该主题。
    在这里插入图片描述
    在这里插入图片描述

    五、canal 配置

    我们选取 canal 版本 1.1.6 ,进入 conf 目录。

    1、配置 canal.properties

    #集群模式 zk地址
    canal.zkServers = localhost:2181
    #本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
    canal.serverMode = rocketMQ
    #instance 列表
    canal.destinations = product-syn
    #conf root dir
    canal.conf.dir = …/conf
    #全局的spring配置方式的组件文件 生产环境,集群化部署
    canal.instance.global.spring.xml = classpath:spring/default-instance.xml

    ######以下部分是默认值 展示出来
    #Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
    canal.mq.canalBatchSize = 50
    #Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
    canal.mq.canalGetTimeout = 100
    #是否为 flat json格式对象
    canal.mq.flatMessage = true

    2、instance 配置文件

    在 conf 目录下创建实例目录 product-syn , 在 product-syn 目录创建配置文件 :instance.properties。

    #按需修改成自己的数据库信息
    #################################################

    canal.instance.master.address=192.168.1.20:3306
    #username/password,数据库的用户名和密码

    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal

    #table regex
    canal.instance.filter.regex=mytest.t_product

    #mq config
    canal.mq.topic=product-syn-topic
    #针对库名或者表名发送动态topic
    #canal.mq.dynamicTopic=mytest,.,mytest.user,mytest\…,.\…
    canal.mq.partition=0
    #hash partition config
    #canal.mq.partitionsNum=3
    #库名.表名: 唯一主键,多个表之间用逗号分隔
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    #################################################

    3、服务启动

    启动两个 canal 服务,我们从 zookeeper gui 中查看服务运行情况 。

    在这里插入图片描述

    修改一条 t_product 表记录,可以从 RocketMQ 控制台中观测到新的消息。
    在这里插入图片描述

    六、消费者

    1、产品索引操作服务
    在这里插入图片描述

    2、消费监听器

    在这里插入图片描述

    消费者逻辑重点有两点:

    顺序消费监听器

    将消息数据转换成 JSON 字符串,从 data 节点中获取表最新数据(批量操作可能是多条)。然后根据操作类型 UPDATE、 INSERT、DELETE 执行产品索引操作服务的方法。

  • 相关阅读:
    aplus埋点笔记
    Z-Libary最新地址检测,再也不用担心找不到ZLibary了
    web安全漏洞
    抽了一包华子才写出来的linux 文件目录结构详解
    如何使用决策树判断要不要去相亲?
    查看网页最后修改时间方法以及原理简介
    基于Java生鲜蔬菜食品商城系统详细设计和实现
    [数据库]JDBC
    文件上传过大被限制问题-springboot
    java毕业设计项目_第167期ssm多用户博客个人网站_计算机毕业设计
  • 原文地址:https://blog.csdn.net/wjianwei666/article/details/133315748