• 通过 Canal 将 MySQL 数据实时同步到 Easysearch


    Canal 是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。使用 Canal 模拟成 MySQL 的 Slave,实时接收 MySQL 的增量数据 binlog,然后通过 RESTful API 将数据写入到 Easysearch 中。

    前提条件

    1. 部署 Easysearch 集群。
    2. 部署 MySQL 数据库。
    3. 部署 Gateway,Canal Adapter 不支持使用 HTTPS 协议连接,使用 Gateway 代理 Easysearch 。
    4. 部署 Console,方便查看 Easysearch 数据。

    对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    • 1
    • 2
    • 3
    • 4

    创建 canal 用户,授权 canal 连接 MySQL 具有作为 MySQL slave 的权限。

    CREATE USER canal IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    
    • 1
    • 2
    • 3
    • 4

    操作步骤

    在进行数据同步时支持自定义索引 Mapping,但需保证 Mapping 中定义的字段(名称+类型)与 MySQL 中一致。

    1. 准备 MySQL 数据源

    create database canal;
    use canal;
    CREATE TABLE `test` (
        `id` bigint(32) NOT NULL,
        `name` text NOT NULL,
        `age` smallint  NOT NULL,
        PRIMARY KEY (`id`)
    ) ENGINE=InnoDB
    DEFAULT CHARACTER SET=utf8;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2. Easysearch 创建索引

    PUT test
    {
        "settings" : {
          "index" : {
            "number_of_shards" : "1",
            "number_of_replicas" : "1"
          }
        },
        "mappings" : {
                "properties" : {
                  "id": {
                       "type": "integer"
                   },
                   "name": {
                        "type" : "text"
                    },
                    "age" : {
                        "type" : "integer"
                    }
                }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    3. 安装并启动 Canal-server

    下载https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
    修改配置文件
    vi conf/example/instance.properties

    启动 canal
    sh bin/startup.sh
    启动成功日志信息,logs/canal/canal.log

    关闭 canal
    sh bin/stop.sh

    4. 安装并启动 Canal-adapter

    下载https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz
    修改配置文件:application.yml

    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      flatMessage: true
      syncBatchSize: 1000
      retries: -1
      timeout:
      accessKey:
      secretKey:
      consumerProperties:
        canal.tcp.server.host: 127.0.0.1:11111
        canal.tcp.batch.size: 500
    
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true
          username: canal
          password: canal
      canalAdapters:
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
          - name: es7
            properties:
              security.auth: admin:4ad8f8f792e81cd0a6de
              cluster.name: easysearch
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    新增 canal-adapter/conf/es7/test.yml,配置索引和表的映射关系。

    dataSourceKey: defaultDS
    destination: example
    groupId: g1
    esMapping:
      _index: test           # es 的索引名称
      _id: _id               # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
      # sql映射
      sql: " select a.id as _id,a.id,a.name,a.age from test a "
      etlCondition: "where a.id>={}"
      commitBatch: 3000      # 提交批大小
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    启动 canal-adapter
    ./bin/startup.sh

    5. 验证增量数据同步

    在 MySQL 数据库中,对 test 表插入两条数据。
    insert test(id,name,age) values(1,'canal_test1',11);
    insert test(id,name,age) values(2,'canal_test2',22);

    6. 在 Console 中,执行以下命令查询数据

    最后

    Canal 同步的是增量数据,不会同步之前的存量数据。要同步存量数据可参考《使用 Logstash 同步 MySQL 到 Easysearch》

    关于 Easysearch

    about easysearch

    INFINI Easysearch 是一个分布式的近实时搜索与分析引擎,核心引擎基于开源的 Apache Lucene。Easysearch 的目标是提供一个轻量级的 Elasticsearch 可替代版本,并继续完善和支持更多的企业级功能。 与 Elasticsearch 相比,Easysearch 更关注在搜索业务场景的优化和继续保持其产品的简洁与易用性。

    官网文档:https://www.infinilabs.com/docs/latest/easysearch

    下载地址:https://www.infinilabs.com/download

  • 相关阅读:
    大数据在电力行业的应用案例100讲(十六)-Full GC对Hbase影响及相关解决方法实践
    基于vue和node.js的志愿者招募网站设计
    transition和animation的区别?
    Docker下安装oracle11g数据库(Win10)
    如何使用CSS伪类选择器
    层叠、继承与盒模型
    功能强大的专业网页设计工具:RapidWeaver 8 mac版
    关于http请求、文件处理以及空间地理信息处理的工具类介绍
    “1-5-15”原则:中国联通数字化监控平台可观测稳定性保障实践
    Java基础:代理
  • 原文地址:https://blog.csdn.net/infinilabs/article/details/134466281