• 数据集成工具 ---- datax 3.0


    1、datax:

            是一个异构数据源离线同步工具,致力于实现关系型数据库(mysql、oracle等)hdfs、hive、hbase等各种异构数据源之间的数据同步

    2、参考网址文献:

    https://github.com/alibaba/DataX/blob/master/introduction.mdhttps://github.com/alibaba/DataX/blob/master/userGuid.mdhttps://github.com/alibaba/DataX/blob/master/introduction.md

    3、Datax的框架设计:

    Datax作为离线数据同步工具,主要的是采用了Framework+plugin架构构成,将数据源的读数据和写数据封装成对应的Reader和Writer插件,纳入到整体的同步框架中。

            1、Reader:作为数据的采集模块,负责采集数据源的数据,将数据发送给Framework

            2、Writer:作为数据写入模块,负责不断的向Framework取出数据,将数据写入到对应的目的端

            3、Framework:用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、数据转换等核心技术问题。

    4、Datax的核心架构:

    Datax3.0 开源版本是支持单机多线程来完成同步作业运行,因为底层是使用java做开发。整体的架构:

    模块的核心介绍:

            1、Datax完成单个数据同步做作业,被称之为job,Datax接收到一个job时就会启动一个进程来完成数据同步工作,所以Datax job 模块是单个作业的中枢管理中心,主要是承担数据清理,子任务切分、TaskGroup 管理。

            2、当Datax启动后,Datax job会根据不同的源数据将job切分成不同的Task,所以Task是Datax的最小作业单位,每一个Task都会负责一部分的数据同步。

            3、切分成多个Task后,Datax job 就会调用scheduler模块,根据配置的并发数量,将拆分的Task重新组合,组装成TaskGroup,每一个TaskGroup都负责一定的Task任务的执行,默认TaskGroup并发数量数5个。

            4、每一个Task都是由TaskGroup所监控执行启动,每一个Task启动后都会按照Reader---Channel---Writer的执行顺序执行。

            5、当任务启动后,Datax job就会监控所有的TaskGroup的执行情况,当所有的TaskGroup任务完成后,job就会退出,当出现异常,就会异常退出并且进程退出值非0.

    5、Datax的核心优势:

            1、可靠的数据质量监控

            2、丰富的数据转换功能

            3、精准的控制速度

            4、容错机制:

                    1、线程内部重试

                            DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。

                    2、线程级别重试

                            目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。

            5、极简的体验

    6、Datax与Sqoop的区别:
    功能Dataxsqoop
    运行模式单进程多线程MR
    分布式是不支持分布式支持
    流控需要定制
    统计信息支持不支持,分布式的数据收集不方便
    数据校验只有core部分有校验功能不支持,分布式的数据收集不方便
    监控需要定制需要定制
    7、Datax部署:

    1、下载jar包:

            下载路径:https://github.com/alibaba/DataX

    2、解压文件,配置环境变量:  

    1. #解压jar包
    2. tar -zxvf datax.tar.gz
    3. #配置环境变量:
    4. vim /etc/profile
    5. export DATAX_HOME=/user/loacl/soft/datax
    6. export PATH=.:$PATH:$DATAX_HOME/bin
    7. #配置好环境变量,让配置文件生效
    8. source /etc/profile

    3、使执行文件拥有执行权:

    1. 添加执行权:
    2. chmod +x data.py
    8、Datax的使用:
    1. 在datax中会自动的生成模板的命令:
    2. datax.py -r streamreader -w streamwriter
            1、streamreader  to  streamwriter,数据打印在控制台上面

    参数说明:

    1. "sliceRecordCount": 100 #指定打印的个数
    2. "channel": 1 #指定并发度
    1. #创建json文件:
    2. vim streamreadertostreamwriter.json
    3. {
    4. "job": {
    5. "content": [
    6. {
    7. "reader": {
    8. "name": "streamreader",
    9. "parameter": {
    10. "column": [
    11. {
    12. "type":"string",
    13. "value":"wyz"
    14. },
    15. {
    16. "type":"int",
    17. "value":"18"
    18. }
    19. ],
    20. "sliceRecordCount": 100 #指定打印的个数
    21. }
    22. },
    23. "writer": {
    24. "name": "streamwriter",
    25. "parameter": {
    26. "encoding": "",
    27. "print": true
    28. }
    29. }
    30. }
    31. ],
    32. "setting": {
    33. "speed": {
    34. "channel": 1 #指定并发度
    35. }
    36. }
    37. }
    38. }
    39. #脚本执行命令:
    40. datax.py streamreadertostreamwriter.json
            2、mysql to mysql
    1. 1、可以通过命令获取模板:
    2. datax.py -r mysqlreader -w mysqlwriter
    3. 2、可以通过github上的模板进行编写:分别是mysqlreader和mysqlwriter,参数会比较详细
    4. 3、在插入数据的需要注意是在将数据写入的时候如果出现在数据,那么此时可能是创建的表出了问题
    5. 例如:表中的某个字段是主键,主键唯一
    1. vim mysqlreaderTomysqlwriter.json
    2. {
    3. "job": {
    4. "content": [
    5. {
    6. "reader": {
    7. "name": "mysqlreader",
    8. "parameter": {
    9. "column": [
    10. "id",
    11. "name",
    12. "age",
    13. "clazz",
    14. "gender"
    15. ],
    16. "connection": [
    17. {
    18. "jdbcUrl": ["jdbc:mysql://192.168.226.1:3306/bigdata25"],
    19. "table": ["stu"]
    20. }
    21. ],
    22. "password": "123456",
    23. "username": "root",
    24. "where": "" #不是必须要写的,作用是可以在读数据时进行一次过滤
    25. }
    26. },
    27. "writer": {
    28. "name": "mysqlwriter",
    29. "parameter": {
    30. "column": [
    31. "id",
    32. "name",
    33. "age",
    34. "clazz",
    35. "gender"
    36. ],
    37. "connection": [
    38. {
    39. "jdbcUrl": "jdbc:mysql://192.168.226.1:3306/bigdata25",
    40. "table": ["data_test"]
    41. }
    42. ],
    43. "password": "123456",
    44. "preSql": [], #不是必须写的,作用是再写入数据前可以执行该sql
    45. "session": [],
    46. "username": "root",
    47. "writeMode": "insert" #必选,指定数据写入的模式,分成三种:insert(一般默认)、replace、update
    48. }
    49. }
    50. }
    51. ],
    52. "setting": {
    53. "speed": {
    54. "channel": 1
    55. }
    56. }
    57. }
    58. }
    59. #执行脚本:
    60. datax.py mysqlreaderTomysqlwriter.json

     将数据写入到mysql时,写入的表是需要提前创建的。

            3、mysql to hdfs

    参数解释:

    1. 使用datax的时候hdfswriter只支持两种文件形式,分别是text和orc
    2. "fileType": "text", #支持两种方式:text和orc,text表示的是textfile,orc表示的orcfile
    3. "compress": "", #指定文件的压缩形式,不指定代表不用压缩,
    4. text支持的压缩方式:gzip,bzip2,
    5. orc支持的压缩方式有NONE和SNAPPY
    6. "writeMode": "append" #表示的是数据在写入的操作,分成三种:
    7. append,写入前不做任何处理
    8. nonconflit 如果文件存在,直接报错
    9. truncate:如果文件存在,那就先删除在写入
    10. "path": "/bigdata25/datax/datax_mysqltohdfs/" #文件不存在是需要提前创建的
    1. vim mysqlTohdfs.json
    2. {
    3. "job": {
    4. "content": [
    5. {
    6. "reader": {
    7. "name": "mysqlreader",
    8. "parameter": {
    9. "column": ["id","name","age","clazz","gender"],
    10. "connection": [
    11. {
    12. "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"],
    13. "table": ["stu"]
    14. }
    15. ],
    16. "password": "123456",
    17. "username": "root",
    18. "where": ""
    19. }
    20. },
    21. "writer": {
    22. "name": "hdfswriter",
    23. "parameter": {
    24. "column": [
    25. {
    26. "name":"id",
    27. "type":"string"
    28. },
    29. {
    30. "name":"stu_name",
    31. "type":"string"
    32. },
    33. {
    34. "name":"age",
    35. "type":"string"
    36. },
    37. {
    38. "name":"clazz",
    39. "type":"string"
    40. },
    41. {
    42. "name":"gender",
    43. "type":"string"
    44. }
    45. ],
    46. "compress": "",
    47. "defaultFS": "hdfs://master:9000",
    48. "fieldDelimiter": ",",
    49. "fileName": "stu_mysqltohdfs",
    50. "fileType": "text",
    51. "path": "/bigdata25/datax/datax_mysqltohdfs/",
    52. "writeMode": "append"
    53. }
    54. }
    55. }
    56. ],
    57. "setting": {
    58. "speed": {
    59. "channel": 1
    60. }
    61. }
    62. }
    63. }
    64. #执行脚本:
    65. datax.py mysqlTohdfs.json
            4、mysql to hive

     原理思想:

    1. 实际上还是将数据存入到hdfs上面,hive通过记录元数据信息来获取数据
    2. 原理:
    3. 创建好hive表在保存在hdfs上,是有文件路径,然后通过写入到指定的hdfs文件路径就能将数据写入到hive表中
    4. 当开启hive的时候,在hive中创建的表会默认的存储hdfs的/user/hive/warehouse/目录下

    前期准备:

    1. 前期准备:
    2. 启动hive(后台启动):nohup hive --service metastore &
    3. 连接hive:hive
    4. 创建hive表(在没有说明的情况下一般在都是创建一个外部表)
    5. 创建一个datax数据库:
    6. create database datax;
    7. 切换数据库:use datax
    8. 创建外部表:
    9. create external table if not exists datax_mysqltohive(
    10. id string,
    11. name string,
    12. age int,
    13. clazz string,
    14. gender string
    15. )
    16. row format delimited fields terminated by ',' stored as textfile

    编写脚本:

    1. vim mysqlTohive.json
    2. {
    3. "job": {
    4. "content": [
    5. {
    6. "reader": {
    7. "name": "mysqlreader",
    8. "parameter": {
    9. "column": ["id","name","age","clazz","gender"],
    10. "connection": [
    11. {
    12. "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"],
    13. "table": ["stu"]
    14. }
    15. ],
    16. "password": "123456",
    17. "username": "root",
    18. "where": ""
    19. }
    20. },
    21. "writer": {
    22. "name": "hdfswriter",
    23. "parameter": {
    24. "column": [
    25. {
    26. "name":"id",
    27. "type":"string"
    28. },
    29. {
    30. "name":"stu_name",
    31. "type":"string"
    32. },
    33. {
    34. "name":"age",
    35. "type":"string"
    36. },
    37. {
    38. "name":"clazz",
    39. "type":"string"
    40. },
    41. {
    42. "name":"gender",
    43. "type":"string"
    44. }
    45. ],
    46. "compress": "",
    47. "defaultFS": "hdfs://master:9000",
    48. "fieldDelimiter": ",",
    49. "fileName": "datax_mysqltohive1",
    50. "fileType": "text",
    51. "path": "/user/hive/warehouse/datax.db/datax_mysqltohive",
    52. "writeMode": "append"
    53. }
    54. }
    55. }
    56. ],
    57. "setting": {
    58. "speed": {
    59. "channel": 1
    60. }
    61. }
    62. }
    63. }
    64. #执行脚本命令:
    65. datax.py mysqlTohive.json
            5、mysql to hbase(Hbase11XWriter)

    参数解释:

    1. "mode": "normal" 写hbase的模式,目前只支持normal模式
    2. "hbaseConfig" {"hbase.zookeeper.quorum": "***"} 描述:连接HBase集群需要的配置信息,JSON格式
    3. "table": "writer" 表的名称,大小写比较敏感
    4. "encoding" 编码方式
    5. "rowkeyColumn" 描述:要写入的hbase的rowkey列。index:指定该列对应reader端column的索引,从0开始,若为常量index为-1type:指定写入数据类型,用于转换HBase byte[];value:配置常量,常作为多个字段的拼接符。hbasewriter会将rowkeyColumn中所有列按照配置顺序进行拼接作为写入hbase的rowkey,不能全为常量
    6. "versionColumn" 表示指定写入hbase的时间戳,支持当前时间、指定时间列、指定时间

    前期准备:

    1. 前期准备:
    2. 启动zookeeper:
    3. zkServer.sh start(每一个节点上都是需要启动的)
    4. 查看zk的状态:
    5. zkServer.sh status
    6. 启动hbase:
    7. start-hbase.sh
    8. 连接hbase:
    9. sqlline.py master,node1,node2
    10. 进入hbase的客户端:hbase shell
    11. hbase中查看表的命令:!table
    12. 退出命令 !quit
    13. 查看表:list
    14. 在hbase中创建创建表,指定表名和列簇:create 'student','cf1'

    编写脚本:

    1. vim mysqlTohbase.json
    2. {
    3. "job": {
    4. "content": [
    5. {
    6. "reader": {
    7. "name": "mysqlreader",
    8. "parameter": {
    9. "column": ["id","name","age","clazz","gender"],
    10. "connection": [
    11. {
    12. "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"],
    13. "table": ["stu"]
    14. }
    15. ],
    16. "password": "123456",
    17. "username": "root",
    18. "where": ""
    19. }
    20. },
    21. "writer": {
    22. "name": "hbase11xwriter",
    23. "parameter": {
    24. "hbaseConfig": {
    25. "hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
    26. },
    27. "table": "NEW_STU",
    28. "mode": "normal",
    29. "rowkeyColumn": [
    30. {
    31. "index":0,
    32. "type":"string"
    33. },
    34. {
    35. "index":-1,
    36. "type":"string",
    37. "value":"_"
    38. }
    39. ],
    40. "column": [
    41. {
    42. "index":1,
    43. "name": "cf1:name",
    44. "type": "string"
    45. },
    46. {
    47. "index":2,
    48. "name": "cf1:age",
    49. "type": "int"
    50. },
    51. {
    52. "index":3,
    53. "name": "cf1:clazz",
    54. "type": "string"
    55. },
    56. {
    57. "index":4,
    58. "name": "cf1:gender",
    59. "type": "string"
    60. }
    61. ],
    62. "versionColumn":{
    63. "index": -1,
    64. "value":"123456789"
    65. },
    66. "encoding": "utf-8"
    67. }
    68. }
    69. }
    70. ],
    71. "setting": {
    72. "speed": {
    73. "channel": 1
    74. }
    75. }
    76. }
    77. }
    78. #执行脚本
    79. datax.py mysqlTohbase.json
            6、mysql增量同步数据到hive中。

    最主要的工作就是在原先的mysql数据导入的hive中的基础上进行where过滤

    1. vim mysqlTohive.json
    2. {
    3. "job": {
    4. "content": [
    5. {
    6. "reader": {
    7. "name": "mysqlreader",
    8. "parameter": {
    9. "column": ["id","name","age","clazz","gender"],
    10. "connection": [
    11. {
    12. "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"],
    13. "table": ["stu"]
    14. }
    15. ],
    16. "password": "123456",
    17. "username": "root",
    18. "where": "id >20"
    19. }
    20. },
    21. "writer": {
    22. "name": "hdfswriter",
    23. "parameter": {
    24. "column": [
    25. {
    26. "name":"id",
    27. "type":"string"
    28. },
    29. {
    30. "name":"stu_name",
    31. "type":"string"
    32. },
    33. {
    34. "name":"age",
    35. "type":"string"
    36. },
    37. {
    38. "name":"clazz",
    39. "type":"string"
    40. },
    41. {
    42. "name":"gender",
    43. "type":"string"
    44. }
    45. ],
    46. "compress": "",
    47. "defaultFS": "hdfs://master:9000",
    48. "fieldDelimiter": ",",
    49. "fileName": "datax_mysqltohive1",
    50. "fileType": "text",
    51. "path": "/user/hive/warehouse/datax.db/datax_mysqltohive",
    52. "writeMode": "append"
    53. }
    54. }
    55. }
    56. ],
    57. "setting": {
    58. "speed": {
    59. "channel": 1
    60. }
    61. }
    62. }
    63. }
    64. #执行脚本命令:
    65. datax.py mysqlTohive.json
    9、在使用datax过程中出现的错误:

    1、配置文件出现错误,脚本不完整:

  • 相关阅读:
    基于Nodejs的医生预约平台的设计和实现
    JSP内置对象及作用域(三)
    给你安利一款带有AI功能的数据库管理工具
    【Harmony OS】【JAVA UI】鸿蒙应用如何集成OKHttp网络三方库
    java计算机毕业设计基于ssm的少儿编程管理系统(源代码+数据库+Lw文档)
    CommonModule.dll动态链接库(DLL)文件丢失的处理方法
    The Grapes NFT 概览与数据分析
    LeetCode1247
    TI DSP的中断
    企业上云时代,适配进销存管理系统从四方面选
  • 原文地址:https://blog.csdn.net/m0_62078954/article/details/136646970