• 技术实现 | Apache Doris Binlog 导入设计与实现


    Binlog 导入是通过获取并解析 Mysql 数据库的 Binlog 日志,增量同步用户在Mysql 数据库的所有更新操作,为 Doris 提供了一种对接 Mysql 数据库的 CDC(Change Data Capture) 功能。

    Binlog 导入已在 0.15 版本发布,本文主要介绍的是 Binlog 导入的总体设计与实现思路。

    总体架构

    Binlog 导入目前需要依赖canal作为中间媒介,让 canal 伪造成一个从节点去获取 Mysql 主节点上的 Binlog 并解析,再由 Doris 去获取 canal 上解析好的数据。

    后续 Binlog 导入可支持除了 canal 外的不同数据源。

    总体数据流向为:

       多表导入

    Binlog 导入的特点是可支持在一个数据同步作业里同步多张 Mysql 源表,用户可通过如下语法创建数据同步作业。

     
    

    sql> CREATE SYNC `test_db`.`job1`   (     FROM `mysql_db1`.`tbl1` INTO `test_tbl1 `,    FROM `mysql_db1`.`tbl2` INTO `test_tbl2 `,    FROM `mysql_db1`.`tbl3` INTO `test_tbl3 `   )   FROM BINLOG   (     "type" = "canal",     "canal.server.ip" = "127.0.0.1",     "canal.server.port" = "11111",     "canal.destination" = "example",     "canal.username" = "",     "canal.password" = ""   );

    创建同步作业时,用户需要配置 canal server 的远端配置。

    作业创建后会处于 Pending 状态,在此状态下 Fe 会立即启动一个 canal client ,来向 canal server 端订阅数据。

    canal client 启动成功后,作业状态会处于 Running 状态,如果运行时遇到了可恢复的错误,则会变成 Paused 状态。如果遇到了不可恢复的错误,则会变成 Cancelled 状态。Cancelled 状态的作业也不可恢复。

    用户可通过 show sync job 命令来查看当前同步作业的状态,状态间的转换条件如下图;

    创建同步作业后,Fe 会立即启动一个 canal client ,来向 canal server 端订阅数据。

    Fe 每次向 canal server 请求数据,都会返回一个数据 batch,这个数据 batch 一般以 protobuf 格式在 canal 和 Fe 间传送,且拥有唯一的标识,多个 batch 的标识通常为连续的,数据 batch 大小和包含的行数取决于用户在作业配置的 canal.batchSize 。

    Fe 每获取一个数据 batch 后,都会按行分发到不同的 Channel 进行消费,每个 Channel 对应一个 Doris 目标表。Channel 是指Fe 上的数据通道,数据同步作业会为其定义的下属每个表映射关系创建一个 Channel,分发到 Channel 上的数据会被暂时缓存起来,等待被提交到Be。

    Binlog 导入在 channel 里包装了stream load 导入事务的开启和提交等方法,底层仍然是调用的 stream load 导入接口,当数据 batch 的每一条数据都被分发进入 channel 后,有缓存数据的 channel 会发起一个 stream load 导入任务至 Be。

       顺序性

    stream load 任务的发送将统一由同步作业的线程池发送,channel 只负责向线程池提交stream load 的发送任务,所以无需阻塞等待任务的发送。

    而此线程池的实现可保证提交到相同表的 stream load 任务是按顺序发送的,因此不会出现提交到线程池的导入任务互相竞争资源导致顺序变乱。

    线程池全 Fe 唯一,通过线程数量上限制约了数据同步作业所占的资源,如果用户需要创建大量的数据同步作业,可通过调大 Fe 配置 max_sync_task_threads_num来调整线程池配置。

       幂等性

    多表同步作业除了需要保证数据是按顺序提交到 be 的,还需严格保证每个数据 batch 消费的幂等性。

    正常情况下,Fe 无需阻塞等待一个数据 batch 消费完才继续获取下一个数据 batch ,因此在一次消费里总是会包含多个连续的数据 batch,而一次消费的数据量大小取决于消费时间间隔,这个时间间隔可通过 Fe 的 sync_commit_interval_second 配置,此外,如果 canal 上的数据消费到底也会立即触发一次消费。

    如果消费期间有任意数据写入失败,Fe 会直接回退到上一个已成功消费成功的 batch 开始重新消费一遍,而对每个 channel 的数据消费进度的实时记录可保证已提交成功的数据不会再次提交以达到每个数据 batch 消费的幂等性。

    在下图中,每个 channel 记录了最近成功消费到的数据 batch,因此对于同一批数据(包含 batch 1~3),channel 2 不会再去提交数据的导入任务。

       可靠性

    在生产环境下,Binlog 导入保证了数据即使在宕机后也不会丢失。从上文可以知道,每个Channel 都会记录最新成果消费到的数据 batch,这个记录也会实时的持久化到磁盘上的元数据备份中,假设 Doris 出现了宕机重启,也能保证消费记录可以在 Fe 重启后重新被加载进 Channel 。

    Fe 重启后将会从上一次成功消费到的数据 batch 开始重新获取并消费,上文数据的幂等性可保证重复消费的数据 batch 不会被重复写入。

       Canal的工作原理

    目前 Binlog 导入仅支持 canal 数据源,后续可能会增加不同的数据源。

    在 canal 端,数据的流向如下为 Parser->Sink->Store

    我们只需要关心 store 的构造,因为 store 模块储存了被 canal 解析好的数据 batch,等待着Fe 去获取。

    数据 batch 存放在 store 模块内的环形队列上,而 Fe 通过 canal 的 Get 接口获取数据batch ,通过 Ack 接口通知 canal 成功消费了一个数据 batch。

    store 模块会通过三个指针去管理队列里的数据 batch:

     
    
    get指针:get指针代表客户端最后获取到的位置。ack指针:ack指针记录着最后消费成功的位置。put指针:代表sink模块最后写入store成功的位置
    Fe异步获取store中数据  get 0        get 1       get 2                    put   |            |           |         ......        |   v            v           v                       v------------------------------------------------------------ store环形队列   ^            ^   |            | ack 0        ack 1

    通过上文我们可以知道,Fe 每次消费都会 Get 多个数据 batch,消费成功后会 Ack 通知 canal 最后一个 batch 已经消费成功了,由于 batch 总是连续的,因此 canal 会视此 batch 之前的所有 batch 都已经消费成功了。

    假设初始时,get 和 ack 指针都处于 0

    当 Fe 通过 Get 命令获取一个 batch 后,get 指针右移至 1,Fe 可多次调用 Get 命令,直到get 指针追上 put 指针时,代表 store 中的数据 batch 已被全部获取完毕,再次 Get 会阻塞。

    当数据消费成功时:

    Fe 通过 Ack 命令通知 canal ,ack 指针会右移至 get 指针位置,代表 ack 指针前的数据都已消费成功,store 会立即清除 ack 指针之前的 batch。

    当数据消费失败时:

    store 模块会将 get 指针重置左移至 ack 指针位置,下一次 Fe 通过 Get 命令获取数据时,将会从 ack 指针位置开始。

    put 指针只在有新的数据进入 store 模块时才会右移,直到 put 指针追上 ack 指针,代表 store 空间已满,新数据写入阻塞。

       总结

    本文主要介绍了 Binlog 导入的设计和实现,并从特点,顺序性、幂等性、可靠性等三方面介绍了 Binlog 导入,如果大家对 Binlog 导入的使用感兴趣可直接阅读官网的 Binlog Load 文档,获取更详细的使用方法。

  • 相关阅读:
    力扣(LeetCode)1758. 生成交替二进制字符串的最少操作数(C++)
    ES6简介
    java计算机毕业设计基于springboot人职匹配推荐系统
    Java集合
    Kubernetes技术--k8s核心技术Helm
    排序:如何用快排思想在O(n)内查找第K大元素?
    MySQL 是否大小写敏感
    SpringBoot的shiro整合mybatis+druid数据源
    【JavaScript】制作一个抢红包雨页面
    【JAVA基础】面向对象基础(下)
  • 原文地址:https://blog.csdn.net/hebiwen95/article/details/126345557