• DataX数据同步


     基础概述

    1.什么是DataX?

    DataX 是阿里云商用产品 DataWorks 数据集成的开源版本,它是一个异构数据源的离线数据同步工具/平台(ETL工具)。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

    Tips : 异构即不同类型的应用或者数据源,例如MySQL/Oracle/DB2/MongDB等不同类型的数据库源
    Tips : 离线数据同步以及CDC实时数据复制,前者常用Sqoop以及DataX工具。
    Tips : ETL(Extract-Transform-Load)工具描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据,其常用在数据仓库,但其对象并不限于数据仓库(DW)。

    前面我们说到 DataX 的前身是阿里云商业化产品 DataWorks, 其稳定、高效、支持多样化等优点就不言而喻, DataWorks 致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力,以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户,单日同步数据超过3万亿条。DataWorks 数据集成目前支持离线50+种数据源,可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。商业版本参见(https://www.aliyun.com/product/bigdata/ide)

    2.DataX的设计思想

    描述:为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

    简单得说,DataX就像中间商一样为每一个服务对象进行需求供应。

     

    3.DataX的框架设计

    描述: DataX本身作为离线数据同步框架,离线(批量)的数据通道通过定义数据来源和去向的数据源和数据集,提供一套抽象化的数据抽取插件(Reader)、数据写入插件(Writer),并基于此框架设计一套简化版的中间数据传输格式,从而实现任意结构化、半结构化数据源之间数据传输。

    • Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。

    • Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

    • Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

     例如,将MySQL中的数据离线同步到HDFS之中来展示DataX的框架设计结构。

     Tips : DataX架构设计流程类似source(数据来源)-> channel(数据存储池中转通道) -> sink (目的地)流程

    4.DataX的运行原理

    描述: DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

     

    DataX 调度流程:
    描述: DataX完成单个数据同步的作业(Job),当DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。在Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。DataX作业运行起来之后,Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出,否则异常退出,并且进程退出值非0

    核心模块解析:

    • DataX Job 模块 : 是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

    • DataX Task : 由Job切分而来, 是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作(包含Reader—>Channel—>Writer流程)。

    • DataX Schedule 模块 : 将Task组成TaskGroup ,注意单个组的默认并发数量为5(动态概念即同时有5个在运行)。

    • DataX TaskGroup : 负责启动以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

    举例说明,当用户提交一个Datax Job 并且配置了20并发数,目的是将一个100张分别的MySQL数据同步到odps中。

    • (1) 首先根据分库分表切分成为100个Task。

    • (2) 根据要达到20个并发,此时我们需要分配4个TaskGroup,简单的说20并发除以每个TaskGroup的默认并发5得到4。

    • (3) 此时每一个TaskGroup负责以5并发数,共计运行25个Task,简单的说100Task除以4个TaskGroup就得到每个组需要执行的Task数。

    5.DataX支持的数据源

    DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下表 (https://github.com/alibaba/DataX#support-data-channels)

    类型数据源Reader(读)Writer(写)文档
    RDBMS 关系型数据库MySQL 、
               Oracle        √        √     、
               OceanBase      √        √     、
    SQLServer 、
    PostgreSQL 、
    DRDS 、
    通用RDBMS(支持所有关系型数据库) 、
    阿里云数仓数据存储ODPS 、
    ADS
    OSS 、
    OCS
    NoSQL数据存储OTS 、
    Hbase0.94 、
    Hbase1.1 、
    Phoenix4.x 、
    Phoenix5.x 、
    MongoDB 、
    Hive 、
    Cassandra 、
    无结构化数据存储TxtFile 、
    FTP 、
    HDFS 、
    Elasticsearch
    时间序列数据库OpenTSDB
    TSDB 、
    TDengine 、

    Tips : DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。 

    6.为何选择DataX?

    描述: 我们可以从DataX 3.0六大核心优势入手了解我们为何选择它。

    (1) 可靠的数据质量监控

    • 完美解决数据传输个别类型失真问题

    • 提供作业全链路的流量、数据量运行时监控

    • 提供脏数据探测

    (2) 丰富的数据转换功能 : DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。

    (3) 精准的速度控制 : 新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

    1. "speed": {
    2. "channel": 5,
    3. "byte": 1048576,
    4. "record": 10000
    5. }

    (4) 强劲的同步性能 : DataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长

    (5) 健壮的容错机制

    • 线程内部重试

    • 线程级别重试

    (6) 极简的使用体验

    • 易用: 开箱即用支持linux和windows,只需要短短几步骤就可以完成数据的传输

    • 详细: 在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况

     DataX 与 Sqoop 间的对比(VS)

    • 应用于企业应用中数据迁移备份,以及供不同接入的应用数据库的应用进行数据的访问。

    • 适用于从事数据采集工作人员,以及企业中从0到1建设阶段IT运维、以及DBA运维管理人员。

       Datax 安装使用

      1.快速开始

    • 描述: DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。

      系统环境依赖-System Requirements

    • Linux

    • JDK ( 1.8以上,推荐1.8 )

    • Python ( 推荐 Python2.6.X )

    1.先去python官网下载python源码安装包

    wget https://www.python.org/ftp/python/2.7.18/Python-2.7.18.tgz

    2.解压

    tar -zxf Python-2.7.18.tgz -C /usr/local/

     不知道有什么用,没有执行

      apt install gcc g++ make

     2. 转到解压文件夹下


    cd /usr/local/Python-2.7.18

     3.配置安装目录 在python安装配置中一定要加上`--enable-shared`参数就可以生成对应动态链接库

     ./configure --prefix=/usr/local/python27

     4.编译源码执行源码安装

     make && make install

     5.在 /usr/bin 文件夹下创建python3软链接,方便直接使用命令名启动  (没有执行)

     ln -s /usr/local/python27/bin/python2.7 /usr/bin/python
    ln -s /usr/local/python27/bin/python2.7 /usr/bin/python2

     6. 任意切换到某一个文件夹,验证是否能启动

     python --version && python2 --version

      1. # 国内镜像下载: https://npm.taobao.org/mirrors/python/2.6.9/Python-2.6.9.tgz
      2. export PYTHON_HOME="/usr/local/python27"
      3. Python 2.7.18

    安装部署方式

    1. $ export DATAX_HOME="/usr/local/datax"
    2. $ wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
    3. $ tar -zxf datax.tar.gz -C /usr/local/
    4. $ cd ${DATAX_HOME}/bin
    5. $ python ${DATAX_HOME}/bin/datax.py ${DATAX_HOME}/job/job.json
    6. $ ln -s ${DATAX_HOME}/bin/datax.py /usr/local/datax.py

    • 方法二、下载DataX源码,自己编译:DataX源码

    1. # (1)、下载DataX源码:
    2. $ git clone git@github.com:alibaba/DataX.git
    3. # (2)、通过maven打包:
    4. $ cd {DataX_source_code_home}
    5. $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
    6. # (3) 打包成功,日志显示如下:
    7. [INFO] BUILD SUCCESS
    8. [INFO] -----------------------------------------------------------------
    9. [INFO] Total time: 08:12 min
    10. [INFO] Finished at: 2015-12-13T16:26:48+08:00
    11. [INFO] Final Memory: 133M/960M
    12. [INFO] -----------------------------------------------------------------
    13. Tips : 打包成功后的DataX包位于 `{DataX_source_code_home}/target/datax/datax/ `

    Datax 解压后其结构如下:

    1. $ cd /usr/local/datax
    2. $ /usr/local/datax# tree -d -L 2
    3. ├── bin # 可执行的Python脚本
    4. ├── conf # Datax 配置文件
    5. ├── job # 离线同步任务
    6. ├── lib # 依赖库
    7. ├── log # 任务执行过程日志
    8. ├── log_perf
    9. ├── plugin # 各类数据库读写插件
    10. │ ├── reader
    11. │ └── writer
    12. ├── script # 脚本存放
    13. └── tmp # 临时目录

    运行测试
    描述: 采用 Datax 自带的 job/job.json 进行运行测试验证安装环境。

    python bin/datax.py job/job.json

    1. /usr/local/datax# python bin/datax.py job/job.json
    2. # (1) 显示机器相关信息(CPU/内存、以及JVM相关信息)
    3. 2021-10-26 11:20:54.301 [main] INFO Engine - the machine info =>
    4. osInfo: Eclipse Foundation 16 16.0.2+7
    5. jvmInfo: Linux amd64 5.4.0-88-generic
    6. cpu num: 4
    7. totalPhysicalMemory: -0.00G
    8. freePhysicalMemory: -0.00G
    9. maxFileDescriptorCount: -1
    10. currentOpenFileDescriptorCount: -1
    11. GC Names [G1 Young Generation, G1 Old Generation]
    12. MEMORY_NAME | allocation_size | init_size
    13. CodeHeap 'profiled nmethods' | 117.21MB | 2.44MB
    14. G1 Old Gen | 1,024.00MB | 970.00MB
    15. G1 Survivor Space | -0.00MB | 0.00MB
    16. CodeHeap 'non-profiled nmethods' | 117.22MB | 2.44MB
    17. Compressed Class Space | 1,024.00MB | 0.00MB
    18. Metaspace | -0.00MB | 0.00MB
    19. G1 Eden Space | -0.00MB | 54.00MB
    20. CodeHeap 'non-nmethods' | 5.57MB | 2.44MB
    21. # (2) Job 任务执行情况
    22. 2021-10-26 11:21:04.364 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.021s | All Task WaitReaderTim
    23. # (3) job 任务执行CPUGC占比信息
    24. 2021-10-26 11:21:04.367 [job-0] INFO JobContainer -
    25. [total cpu info] =>
    26. averageCpu | maxDeltaCpu | minDeltaCpu
    27. -1.00% | -1.00% | -1.00%
    28. [total gc info] =>
    29. NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
    30. G1 Young Generation | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
    31. G1 Old Generation | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
    32. # (4) Job 任务执行完毕总计数据(非常重要) 、可以验证同步的数据是否全部同步成功。
    33. 2021-10-26 11:21:04.367 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.021s | All Task WaitReaderTime 0.041s | Percentage 100.00%
    34. 2021-10-26 11:21:04.368 [job-0] INFO JobContainer -
    35. 任务启动时刻 : 2021-10-26 11:20:54
    36. 任务结束时刻 : 2021-10-26 11:21:04
    37. 任务总计耗时 : 10s
    38. 任务平均流量 : 253.91KB/s
    39. 记录写入速度 : 10000rec/s
    40. 读出记录总数 : 100000
    41. 读写失败总数 : 0

    Tips : 上面 Job 读写输出数据为DataX 19890604 1989-06-03 23:00:00 true test

    2.基础使用

    描述: 我们可以通过DataX数据源参考指南(https://github.com/alibaba/DataX/wiki/DataX-all-data-channels)来查看具体每个插件需要或者可选的插件。

    插件示例获取:

    1. $ ./bin/datax.py -r streamreader -w streamwriter
    2. # (1) 此处将会显示 读写 插件的使用文档说明
    3. Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
    4. Please refer to the streamwriter document:https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
    5. # (2) 命令执行示例
    6. python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
    7. # (3) Job 任务配置示例 Json 格式 (以下参数我简单描述)
    8. tee job/stream2stream.json <<'EOF'
    9. {
    10. "job": {
    11. "content": [
    12. {
    13. // 读插件
    14. "reader": {
    15. "name": "streamreader", // 指定插件名称
    16. "parameter": {
    17. "column": [ // 字段类与值 (必须进行指定)
    18. {
    19. "value": "WeiyiGeek",
    20. "type": "string"
    21. },
    22. {
    23. "value": 2021,
    24. "type": "long"
    25. },
    26. {
    27. "value": "2021-01-01 00:00:00",
    28. "type": "date"
    29. },
    30. {
    31. "value": true,
    32. "type": "bool"
    33. },
    34. {
    35. "value": "test",
    36. "type": "bytes"
    37. }
    38. ],
    39. "sliceRecordCount": "10" // 切片记录计数
    40. }
    41. },
    42. // 写插件
    43. "writer": {
    44. "name": "streamwriter", // 指定使用的插件名称
    45. "parameter": {
    46. "encoding": "UTF-8", // 编码格式
    47. "print": true // 是否终端打印
    48. }
    49. }
    50. }
    51. ],
    52. "setting": {
    53. "speed": { // 同步速度采用的类型
    54. "channel": "2" // 并发数
    55. //"byte": 10485760 // 字节数
    56. }
    57. }
    58. }
    59. }
    60. EOF

    执行结果: (执行时请删除上述备注)

    1. python bin/datax.py job/stream2stream.json
    2. # (1) 两个任务进程
    3. 2021-10-26 16:28:33.568 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [2] channels for [2] tasks.
    4. # (2) 每个任务进程执行10条 (即总数20条)
    5. 2021-10-26 16:28:33.579 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
    6. 2021-10-26 16:28:33.595 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[1] attemptCount[1] is started
    7. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    8. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    9. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    10. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    11. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    12. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    13. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    14. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    15. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    16. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    17. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    18. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    19. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    20. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    21. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    22. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    23. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    24. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    25. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    26. WeiyiGeek 2021 2021-01-01 00:00:00 true test
    27. # (3) 执行结果信息
    28. 2021-10-26 16:28:43.576 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 520 bytes | Speed 52B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReade rTime 0.002s | Percentage 100.00%
    29. 2021-10-26 16:28:43.576 [job-0] INFO JobContainer -
    30. 任务启动时刻 : 2021-10-26 16:28:33
    31. 任务结束时刻 : 2021-10-26 16:28:43
    32. 任务总计耗时 : 10s
    33. 任务平均流量 : 52B/s
    34. 记录写入速度 : 2rec/s
    35. 读出记录总数 : 20
    36. 读写失败总数 : 0

    Tips : 执行后的日志除了终端打印还会在本地日志目录中存放(/usr/local/datax/log/2021-10-26/b_stream2stream_json-16_28_33.312.log)文件。

    Tips : 非常执行同步写入的总次数为setting.speed.channel * sliceRecordCount

     Datax 实战使用

    mysqlreader 

     

     hdfwriter

    1.MySQL-To-HDFS

  • 相关阅读:
    自己写个网盘系列:③ 开源这个网盘编码,手把手教你.NET 8 windows linux 直接部署,docker本地打包部署网盘应用
    14、设计模式之命令模式(Command)
    redis的原理和源码-源码阅读指南
    051校园短期闲置资源置换平台
    又一量子金融用例: 巴斯夫联手Multiverse Computing开发外汇交易优化模型
    品牌蓝V打造抖音短视频代运营策划方案
    面试必问:JVM 如何确定死亡对象?
    A Philosophy of Software Design读书笔记——不同的层需要不同的抽象
    【C++】STL中的运算符重载
    大数据学习笔记1.3 Linux用户操作
  • 原文地址:https://blog.csdn.net/yuzheh521/article/details/126903790