• 第3.6章:StarRocks数据导入——DataX StarRocksWriter


    一、Datax

    1.1 DataX 3.0概述

     DataX3.0是一个异构数据源离线同步工具,可以方便的对各种异构数据源进行高效的数据同步。   其github地址为:

    https://github.com/alibaba/DataX/blob/master/introduction.mdicon-default.png?t=N7T8https://github.com/alibaba/DataX/blob/master/introduction.md

    GitCode - 开发者的代码家园icon-default.png?t=N7T8https://gitcode.com/alibaba/datax/overview

    1.2 DataX3.0框架设计

    DataX将复杂的网状的同步链路变成了星型数据链路,DataX自身作为中间传输载体负责连接各种数据源,解决了异构数据源同步问题。Datax采用的是

       DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中:

    • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
    • Writer:Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
    • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

    1.3 DataX3.0核心架构

        DataX 3.0 开源版本支持单机多线程模式完成同步作业运行。基于DataX作业生命周期的时序图,从整体架构设计角度来阐述DataX各个模块相互关系。

    1.3.1 核心模块介绍

    • DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
    • DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
    • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
    • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
    • DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

     1.3.2 DataX调度流程

         用户提交了一个DataX作业,并且配置了DataX Channel并发数为20个,需求是将一个100张分表的mysql数据同步到starrocks里面, 则DataX的调度决策思路是:

    • DataXJob根据分库分表切分成了100个Task。
    • 根据20个并发,DataX计算共需要分配4个TaskGroup。
    • 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

    二、StarRocksWriter

       DataX基于StarRocks开发的StarRocksWriter插件支持MySQL、Oracle等数据库中的数据导入至 StarRocks。在底层实现上,StarRocksWriter内部将各种reader读取的数据进行缓存攒批(以csv或 json格式),之后采用Stream Load 方式批量导入至 StarRocks。总体的数据流是Source -->Reader -->DataX channel --> Writer ---> StarRocks

     官网文章地址:

    使用 DataX 导入 | StarRocks

    三、创建配置文件

     为导入作业创建 JSON 格式配置文件, 这里列举几种Datax同步脚本。

    (1)同步oracle数据至starrocks:oracle2starrocks.json

    1. {
    2. "job": {
    3. "setting": {
    4. "speed": {
    5. "channel": 1
    6. },
    7. "errorLimit": {
    8. "record": 0,
    9. "percentage": 0
    10. }
    11. },
    12. "content": [
    13. {
    14. "reader": {
    15. "name": "oraclereader",
    16. "parameter": {
    17. "username": "root",
    18. "password": "root",
    19. "connection": [
    20. {
    21. "querySql": [
    22. "select fid,f_diccode,concat(substr(qhcode,1,2),'0000') as partition_no from nannd.test1"
    23. ],
    24. "jdbcUrl": [
    25. "jdbc:oracle:thin:@192.168.22.115:1521/init"
    26. ]
    27. }
    28. ]
    29. }
    30. },
    31. "writer": {
    32. "name": "starrockswriter",
    33. "parameter": {
    34. "username": "root",
    35. "password": "root",
    36. "database": "",
    37. "table": "test2",
    38. "column": [
    39. "fid",
    40. "f_diccode",
    41. "partition_no"
    42. ],
    43. "preSql": ["truncate table des.test2"],
    44. "postSql": [],
    45. "jdbcUrl": "jdbc:mysql://192.168.10.103:9030",
    46. "loadUrl": [
    47. "192.168.10.101:8030",
    48. "192.168.10.102:8030",
    49. "192.168.10.103:8030"
    50. ],
    51. "loadProps": {
    52. "format": "json",
    53. "strip_outer_array": true
    54. }
    55. }
    56. }
    57. }
    58. ]
    59. }
    60. }
    • OracleReader的配置说明见:

     https://github.com/alibaba/DataX/blob/master/introduction.md

    https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md

    • StarRocksWriter的配置说明见:官网

    使用 DataX 导入 | StarRocks

    (2)同步mysql库的数据至starrocks:mysql2starrocks.json

    1. {
    2. "job": {
    3. "setting": {
    4. "speed": {
    5. "channel": 1
    6. },
    7. "errorLimit": {
    8. "record": 0,
    9. "percentage": 0
    10. }
    11. },
    12. "content": [
    13. {
    14. "reader": {
    15. "name": "mysqlreader",
    16. "parameter": {
    17. "username": "root",
    18. "password": "root",
    19. "column": [
    20. "OBJECTID",
    21. "xmmc",
    22. "shengmc",
    23. "shimc",
    24. "xianmc",
    25. ],
    26. "connection": [
    27. {
    28. "table": [
    29. "init2.test6"
    30. ],
    31. "jdbcUrl": [
    32. "jdbc:mysql://192.168.22.156:3306/init2"
    33. ]
    34. }
    35. ]
    36. }
    37. },
    38. "writer": {
    39. "name": "starrockswriter",
    40. "parameter": {
    41. "username": "root",
    42. "password": "root",
    43. "database": "des3",
    44. "table": "test7",
    45. "column": [
    46. "OBJECTID",
    47. "shengmc",
    48. "shimc",
    49. "xianmc",
    50. ],
    51. "preSql": [],
    52. "postSql": [],
    53. "jdbcUrl": "",
    54. "loadUrl": [
    55. "192.168.10.101:8030",
    56. "192.168.10.102:8030",
    57. "192.168.10.103:8030"
    58. ],
    59. "loadProps": {
    60. "format": "json",
    61. "strip_outer_array": true
    62. }
    63. }
    64. }
    65. }
    66. ]
    67. }
    68. }
    • MysqlReader的配置说明见:

    https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

    • StarRocksWriter的配置说明见:官网

    (3)同步tidb库的数据至starrocks:tidb2starrocks.json

    1. {
    2. "job": {
    3. "setting": {
    4. "speed": {
    5. "channel": 1
    6. },
    7. "errorLimit": {
    8. "record": 0,
    9. "percentage": 0
    10. }
    11. },
    12. "content": [
    13. {
    14. "reader": {
    15. "name": "mysqlreader",
    16. "parameter": {
    17. "username": "root",
    18. "password": "root@sq2023",
    19. "connection": [
    20. {
    21. "querySql": [
    22. "select id,member_id,create_time,update_time,now() as run_dt from test2"
    23. ],
    24. "jdbcUrl": [
    25. "jdbc:mysql://192.168.22.143:4000/init1"
    26. ]
    27. }
    28. ]
    29. }
    30. },
    31. "writer": {
    32. "name": "starrockswriter",
    33. "parameter": {
    34. "username": "root",
    35. "password": "root",
    36. "database": "des1",
    37. "table": "test3",
    38. "column": [
    39. "id",
    40. "member_id",
    41. "create_time",
    42. "update_time",
    43. "run_dt"
    44. ],
    45. "preSql": [],
    46. "postSql": [],
    47. "jdbcUrl": "",
    48. "loadUrl": [
    49. "192.168.10.101:8030",
    50. "192.168.10.102:8030",
    51. "192.168.10.103:8030"
    52. ],
    53. "loadProps": {
    54. "format": "json",
    55. "strip_outer_array": true
    56. }
    57. }
    58. }
    59. }
    60. ]
    61. }
    62. }

     ps:从tidb数据读取数据,采用的read插件还是MysqlReder,不赘述。

    四、常见问题记录

    4.1 常规排查方案

       例如:针对配置文件job.json启动导入任务,设置JVM 调优参数(--jvm="-Xms6G -Xmx6G")以及日志等级(--loglevel=debug),日志等级用来任务失败时打印更详细的作业执行信息

    python datax/bin/datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug datax/job/job.json

    4.2 时区问题

        如果源数据库与目标数据库时区不同,需要命令行中添加 -Duser.timezone=GMTxxx 选项设置源数据库的时区信息。例如,源库使用 UTC 时区,则启动任务时需添加参数 -Duser.timezone=GMT+0

    4.3 性能调优

    4.3.1 合理拆分任务

        合理配置任务参数,让DataX任务拆分为多个Task,同时,提升DataX Channel并发数。以mysqlreader为例,就要合理配置splitPk参数,如果splitPk不填写(包括不提供splitPk或者splitPk值为空),DataX会视作使用单通道同步该表数据。

    4.3.2 配置堆内存

       当提升DataX Job内Channel并发数时,内存的占用也会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,调大JVM的堆内存。调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,在命令行添加对应的参数,如下:(xms:初始化堆内存; xmx:堆最大内存)

    python datax/bin/datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug datax/job/job.json

    ps:建议将初始化堆内存与堆最大内存配置一致,这样可以让同步数据处理起来更快,也可以避免内存的抖动。

    4.3.3 任务限速

      使用DataX进行数据同步的另一个优势是可以限速,进而降低同步过程中对业务库的压力影响。DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以方便的控制同步作业速度,让同步作业在库可以承受的范围内达到最佳的同步速度。以最常用的字节流限速为例:

    • 修改datax/conf/core.json,限制单个chanel的速度为2M (2*1024*1024= 2097152 byte):

    1. "speed": {
    2.    "byte": 2097152,
    3.    },
    • 同时修改作业json部分的速度限制,例如限制为4M(这样任务会用4/2=2个channel并发进行任务),修改:
    1.     "job": {
    2.         "setting": {
    3.             "speed": {
    4.                 "byte" : 4194304
    5.             }
    6.         },
    7.         ...
    8.     }
    • 以及:
    1. "speed": {
    2. "channel": 5,
    3. "byte": 1048576,
    4. "record": 10000
    5. }

    4.3.4 读取StarRocks数据

       StarRocks兼容MySQL协议,当我们需要将StarRocks中的数据同步至其他数据库时,可以使用mysqlreader来直接读取,但这种JDBC的方式性能可能不是很好,推荐Flink Connector或者Spark Connector来进行处理。

    参考文章:

    第3.5章:StarRocks数据导入--DataX StarRocksWriter_datax-starrockswriter-CSDN博客

  • 相关阅读:
    Vue.js 条件判断
    php如何在header增加key,sign,timestamp?怎么鉴权?
    职场成长的三种方式
    【Leetcode】1196. How Many Apples Can You Put into the Basket
    干货 | 解决 App 自动化测试的常见痛点(弹框及首页启动加载完成判断处理)
    git 学习总结
    四、【React-Router5】样式丢失问题
    RapidSSL的便宜单域名https证书
    【APUE】文件系统 — 目录和文件
    2022年最新前端面试题
  • 原文地址:https://blog.csdn.net/SHWAITME/article/details/136228892