• 数据同步工具DataX从Mysql同步数据到HDFS实战


    1. 查看数据同步模板

    我自己在下面的模板文件中添加了一些说明注释

    [root@bigdata001 datax]# bin/datax.py -r mysqlreader -w hdfswriter
    
    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    
    
    Please refer to the mysqlreader document:
         https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 
    
    Please refer to the hdfswriter document:
         https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
     
    Please save the following configuration as a json file and  use
         python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
    to run the job.
    
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader", 
                        "parameter": {
                            "column": [],                                             # 可以填写["*"]表示同步所有列。还支持["1", "2.3", "true", "'bazhen.csy'", "null", "upper('a')"], 分别表示整形、浮点数、布尔值、字符串、空指针,表达式
                            "connection": [
                                {
                                    "jdbcUrl": [],                                     # 支持多个连接地址。会依次进行连接测试,选择一个可用的进行查询数据
                                    "table": []                                         # 支持同步多个表。多个表必须schema相同
                                }
                            ], 
                            "password": "", 
                            "username": "", 
                            "where": "",
                            "splitPk": "",                                            # 一般是主键,只支持整形字段。先计算min(splitPk)、max(splitPk),再进行范围分区划分,将job划分成多个task。不指定则只有一个task
                            "querySql": ["select id, name from person where id < 10;"]    # 这个是我自己添加的。有了querySql会自动忽略column、table、where
                        }
                    }, 
                    "writer": {
                        "name": "hdfswriter", 
                        "parameter": {
                            "column": [],                                            # 必须和reader的列数量对应
                            "compress": "",                                       # 默认不填写,表示不压缩。text文件支持gzip、bzip2; orc文件支持NONE、SNAPPY
                            "defaultFS": "", 
                            "fieldDelimiter": "", 
                            "fileName": "", 
                            "fileType": "",                                          # 目前仅支持text和orc。其中orc需指定compress为SNAPPY
                            "path": "",                                                # 该路径必须存在
                            "writeMode": ""                                       # append:表示新建一个文件插入数据;nonConflict:有fileName为前缀的文件直接报错
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": ""
                }
            }
        }
    }
    [root@bigdata001 datax]#
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    2. 高可用HA的HDFS配置

    配置参考如下:

                            "defaultFS": "hdfs://192.168.8.111:9000", 
    						"hadoopConfig": {
    						    "dfs.nameservices": "nnha",
    						    "dfs.ha.namenodes.nnha": "nn1,nn2,nn3",
    						    "dfs.namenode.rpc-address.nnha.nn1": "192.168.8.111:9870",
    						    "dfs.namenode.rpc-address.nnha.nn2": "192.168.8.112:9870",
    						    "dfs.namenode.rpc-address.nnha.nn3": "192.168.8.113:9870",
    						    "dfs.client.failover.proxy.provider.nnha": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    						},
                            "fieldDelimiter": "|"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3. MysqlReader针对Mysql类型转换说明

    DataX内部类型Mysql数据类型
    Longint, tinyint, smallint, mediumint, int, bigint
    Doublefloat, double, decimal
    Stringvarchar, char, tinytext, text, mediumtext, longtext, year
    Datedate, datetime, timestamp, time
    Booleanbit, bool
    Bytestinyblob, mediumblob, blob, longblob, varbinary

    4. HdfsWriter支持大部分Hive类型

    DataX内部类型HIVE数据类型
    LongTINYINT,SMALLINT,INT,BIGINT
    DoubleFLOAT,DOUBLE
    StringSTRING,VARCHAR,CHAR
    BooleanBOOLEAN
    DateDATE,TIMESTAMP

    5. Mysql准备数据如下

    mysql> create table person(
        -> id bigint,
        -> name varchar(64)
        -> );
    Query OK, 0 rows affected (0.06 sec)
    
    mysql> 
    mysql> insert into person(id, name) values(1, 'yi'), (2, 'er');
    Query OK, 2 rows affected (0.02 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> 
    mysql> select * from person;
    +------+------+
    | id   | name |
    +------+------+
    |    1 | yi   |
    |    2 | er   |
    +------+------+
    2 rows in set (0.00 sec)
    
    mysql> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    6. 新建job/mysql2hdfs.json

    内容如下:

    [root@bigdata001 datax]# cat job/mysql2hdfs.json 
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader", 
                        "parameter": {
                            "column": ["id", "name"], 
                            "connection": [
                                {
                                    "jdbcUrl": ["jdbc:mysql://192.168.8.115:3306/test"], 
                                    "table": ["person"]
                                }
                            ], 
                            "password": "Root_123", 
                            "username": "root", 
                            "where": ""
                        }
                    }, 
                    "writer": {
                        "name": "hdfswriter", 
                        "parameter": {
                            "column": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}], 
                            "compress": "", 
                            "defaultFS": "hdfs://192.168.8.111:9000", 
                            "fieldDelimiter": "|", 
                            "fileName": "person.txt", 
                            "fileType": "text", 
                            "path": "/", 
                            "writeMode": "append"
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": "1"
                }
            }
        }
    }
    [root@bigdata001 datax]#
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    7. 执行job

    会先写入临时文件,如果成功,则将临时文件rename,再删除临时文件;如果失败,直接删除临时文件

    [root@bigdata001 datax]# bin/datax.py job/mysql2hdfs.json 
    
    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    ......省略部分......
    2022-06-14 10:16:33.551 [0-0-0-writer] INFO  HdfsWriter$Task - begin do write...
    2022-06-14 10:16:33.551 [0-0-0-writer] INFO  HdfsWriter$Task - write to file : [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d]
    ......省略部分......
    2022-06-14 10:16:43.512 [job-0] INFO  HdfsWriter$Job - start rename file [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d] to file [hdfs://192.168.8.111:9000/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d].
    ......省略部分......
    2022-06-14 10:16:43.915 [job-0] INFO  HdfsWriter$Job - start delete tmp dir [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f] .
    ......省略部分......
    2022-06-14 10:16:44.034 [job-0] INFO  JobContainer - 
    任务启动时刻                    : 2022-06-14 10:16:31
    任务结束时刻                    : 2022-06-14 10:16:44
    任务总计耗时                    :                 12s
    任务平均流量                    :                0B/s
    记录写入速度                    :              0rec/s
    读出记录总数                    :                   2
    读写失败总数                    :                   0
    
    [root@bigdata001 datax]#
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    8. 查看hdfs的文件

    会在该文件名后添加随机的后缀,作为每个线程写入的实际文件名

    [root@bigdata001 ~]# hadoop fs -cat /person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d
    1|yi
    2|er
    [root@bigdata001 ~]#
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    大厂超全安全测试--关于安全测试的分类及如何测试
    如何理解Linux文件IO?
    00-从JAVA工程师的角度快速上手Python-基础语法
    Re44:数据集 GSM8K 和 论文 Training Verifiers to Solve Math Word Problems
    Oracle VM VirtualBox上配置新的Ubuntu20.04
    java碎碎碎碎碎碎
    Python正则表达式操作(re模块使用篇)
    CSS清除浮动的五种方法(超详细)
    ABeam中国2022社招 | ABeam旗下德硕管理咨询(上海) 热招岗位虚位以待
    SpingBoot之替换容器为Undertow
  • 原文地址:https://blog.csdn.net/yy8623977/article/details/125272844