是一个异构数据源离线同步工具,致力于实现关系型数据库(mysql、oracle等)hdfs、hive、hbase等各种异构数据源之间的数据同步
https://github.com/alibaba/DataX/blob/master/introduction.mdhttps://github.com/alibaba/DataX/blob/master/userGuid.mdhttps://github.com/alibaba/DataX/blob/master/introduction.md
Datax作为离线数据同步工具,主要的是采用了Framework+plugin架构构成,将数据源的读数据和写数据封装成对应的Reader和Writer插件,纳入到整体的同步框架中。
1、Reader:作为数据的采集模块,负责采集数据源的数据,将数据发送给Framework
2、Writer:作为数据写入模块,负责不断的向Framework取出数据,将数据写入到对应的目的端
3、Framework:用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、数据转换等核心技术问题。
Datax3.0 开源版本是支持单机多线程来完成同步作业运行,因为底层是使用java做开发。整体的架构:
模块的核心介绍:
1、Datax完成单个数据同步做作业,被称之为job,Datax接收到一个job时就会启动一个进程来完成数据同步工作,所以Datax job 模块是单个作业的中枢管理中心,主要是承担数据清理,子任务切分、TaskGroup 管理。
2、当Datax启动后,Datax job会根据不同的源数据将job切分成不同的Task,所以Task是Datax的最小作业单位,每一个Task都会负责一部分的数据同步。
3、切分成多个Task后,Datax job 就会调用scheduler模块,根据配置的并发数量,将拆分的Task重新组合,组装成TaskGroup,每一个TaskGroup都负责一定的Task任务的执行,默认TaskGroup并发数量数5个。
4、每一个Task都是由TaskGroup所监控执行启动,每一个Task启动后都会按照Reader---Channel---Writer的执行顺序执行。
5、当任务启动后,Datax job就会监控所有的TaskGroup的执行情况,当所有的TaskGroup任务完成后,job就会退出,当出现异常,就会异常退出并且进程退出值非0.
1、可靠的数据质量监控
2、丰富的数据转换功能
3、精准的控制速度
4、容错机制:
1、线程内部重试
DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。
2、线程级别重试
目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。
5、极简的体验
功能 | Datax | sqoop |
运行模式 | 单进程多线程 | MR |
分布式 | 是不支持分布式 | 支持 |
流控 | 有 | 需要定制 |
统计信息 | 支持 | 不支持,分布式的数据收集不方便 |
数据校验 | 只有core部分有校验功能 | 不支持,分布式的数据收集不方便 |
监控 | 需要定制 | 需要定制 |
1、下载jar包:
下载路径:https://github.com/alibaba/DataX
2、解压文件,配置环境变量:
- #解压jar包
- tar -zxvf datax.tar.gz
-
- #配置环境变量:
- vim /etc/profile
-
-
- export DATAX_HOME=/user/loacl/soft/datax
- export PATH=.:$PATH:$DATAX_HOME/bin
-
- #配置好环境变量,让配置文件生效
- source /etc/profile
3、使执行文件拥有执行权:
- 添加执行权:
-
- chmod +x data.py
- 在datax中会自动的生成模板的命令:
-
- datax.py -r streamreader -w streamwriter
参数说明:
- "sliceRecordCount": 100 #指定打印的个数
-
- "channel": 1 #指定并发度
- #创建json文件:
-
- vim streamreadertostreamwriter.json
-
-
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "streamreader",
- "parameter": {
- "column": [
- {
- "type":"string",
- "value":"wyz"
- },
- {
- "type":"int",
- "value":"18"
- }
- ],
- "sliceRecordCount": 100 #指定打印的个数
- }
- },
- "writer": {
- "name": "streamwriter",
- "parameter": {
- "encoding": "",
- "print": true
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": 1 #指定并发度
- }
- }
- }
- }
-
-
-
- #脚本执行命令:
- datax.py streamreadertostreamwriter.json
- 1、可以通过命令获取模板:
- datax.py -r mysqlreader -w mysqlwriter
-
- 2、可以通过github上的模板进行编写:分别是mysqlreader和mysqlwriter,参数会比较详细
-
-
- 3、在插入数据的需要注意是在将数据写入的时候如果出现在数据,那么此时可能是创建的表出了问题
- 例如:表中的某个字段是主键,主键唯一
- vim mysqlreaderTomysqlwriter.json
-
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": [
- "id",
- "name",
- "age",
- "clazz",
- "gender"
-
- ],
- "connection": [
- {
- "jdbcUrl": ["jdbc:mysql://192.168.226.1:3306/bigdata25"],
- "table": ["stu"]
- }
- ],
- "password": "123456",
- "username": "root",
- "where": "" #不是必须要写的,作用是可以在读数据时进行一次过滤
- }
- },
- "writer": {
- "name": "mysqlwriter",
- "parameter": {
- "column": [
- "id",
- "name",
- "age",
- "clazz",
- "gender"
- ],
- "connection": [
- {
- "jdbcUrl": "jdbc:mysql://192.168.226.1:3306/bigdata25",
- "table": ["data_test"]
- }
- ],
- "password": "123456",
- "preSql": [], #不是必须写的,作用是再写入数据前可以执行该sql
- "session": [],
- "username": "root",
- "writeMode": "insert" #必选,指定数据写入的模式,分成三种:insert(一般默认)、replace、update
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": 1
- }
- }
- }
- }
-
-
-
-
- #执行脚本:
- datax.py mysqlreaderTomysqlwriter.json
将数据写入到mysql时,写入的表是需要提前创建的。
参数解释:
- 使用datax的时候hdfswriter只支持两种文件形式,分别是text和orc
-
- "fileType": "text", #支持两种方式:text和orc,text表示的是textfile,orc表示的orcfile
-
- "compress": "", #指定文件的压缩形式,不指定代表不用压缩,
- text支持的压缩方式:gzip,bzip2,
- orc支持的压缩方式有NONE和SNAPPY
-
- "writeMode": "append" #表示的是数据在写入的操作,分成三种:
- append,写入前不做任何处理
- nonconflit 如果文件存在,直接报错
- truncate:如果文件存在,那就先删除在写入
-
- "path": "/bigdata25/datax/datax_mysqltohdfs/" #文件不存在是需要提前创建的
-
- vim mysqlTohdfs.json
-
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["id","name","age","clazz","gender"],
- "connection": [
- {
- "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"],
- "table": ["stu"]
- }
- ],
- "password": "123456",
- "username": "root",
- "where": ""
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {
- "name":"id",
- "type":"string"
- },
- {
- "name":"stu_name",
- "type":"string"
- },
- {
- "name":"age",
- "type":"string"
- },
- {
-
- "name":"clazz",
- "type":"string"
- },
- {
- "name":"gender",
- "type":"string"
- }
- ],
- "compress": "",
- "defaultFS": "hdfs://master:9000",
- "fieldDelimiter": ",",
- "fileName": "stu_mysqltohdfs",
- "fileType": "text",
- "path": "/bigdata25/datax/datax_mysqltohdfs/",
- "writeMode": "append"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": 1
- }
- }
- }
- }
-
- #执行脚本:
- datax.py mysqlTohdfs.json
原理思想:
- 实际上还是将数据存入到hdfs上面,hive通过记录元数据信息来获取数据
- 原理:
- 创建好hive表在保存在hdfs上,是有文件路径,然后通过写入到指定的hdfs文件路径就能将数据写入到hive表中
- 当开启hive的时候,在hive中创建的表会默认的存储hdfs的/user/hive/warehouse/目录下
前期准备:
- 前期准备:
- 启动hive(后台启动):nohup hive --service metastore &
- 连接hive:hive
-
- 创建hive表(在没有说明的情况下一般在都是创建一个外部表)
- 创建一个datax数据库:
- create database datax;
- 切换数据库:use datax
- 创建外部表:
- create external table if not exists datax_mysqltohive(
- id string,
- name string,
- age int,
- clazz string,
- gender string
- )
- row format delimited fields terminated by ',' stored as textfile
编写脚本:
- vim mysqlTohive.json
-
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["id","name","age","clazz","gender"],
- "connection": [
- {
- "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"],
- "table": ["stu"]
- }
- ],
- "password": "123456",
- "username": "root",
- "where": ""
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {
- "name":"id",
- "type":"string"
- },
- {
- "name":"stu_name",
- "type":"string"
- },
- {
- "name":"age",
- "type":"string"
- },
- {
-
- "name":"clazz",
- "type":"string"
- },
- {
- "name":"gender",
- "type":"string"
- }
- ],
- "compress": "",
- "defaultFS": "hdfs://master:9000",
- "fieldDelimiter": ",",
- "fileName": "datax_mysqltohive1",
- "fileType": "text",
- "path": "/user/hive/warehouse/datax.db/datax_mysqltohive",
- "writeMode": "append"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": 1
- }
- }
- }
- }
-
-
- #执行脚本命令:
- datax.py mysqlTohive.json
参数解释:
- "mode": "normal" 写hbase的模式,目前只支持normal模式
-
- "hbaseConfig" {"hbase.zookeeper.quorum": "***"} 描述:连接HBase集群需要的配置信息,JSON格式
-
- "table": "writer" 表的名称,大小写比较敏感
-
- "encoding" 编码方式
-
- "rowkeyColumn" 描述:要写入的hbase的rowkey列。index:指定该列对应reader端column的索引,从0开始,若为常量index为-1;type:指定写入数据类型,用于转换HBase byte[];value:配置常量,常作为多个字段的拼接符。hbasewriter会将rowkeyColumn中所有列按照配置顺序进行拼接作为写入hbase的rowkey,不能全为常量
-
-
- "versionColumn" 表示指定写入hbase的时间戳,支持当前时间、指定时间列、指定时间
前期准备:
- 前期准备:
-
- 启动zookeeper:
- zkServer.sh start(每一个节点上都是需要启动的)
- 查看zk的状态:
- zkServer.sh status
-
- 启动hbase:
- start-hbase.sh
-
- 连接hbase:
- sqlline.py master,node1,node2
-
- 进入hbase的客户端:hbase shell
-
- hbase中查看表的命令:!table
- 退出命令 !quit
- 查看表:list
- 在hbase中创建创建表,指定表名和列簇:create 'student','cf1'
编写脚本:
- vim mysqlTohbase.json
-
-
-
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["id","name","age","clazz","gender"],
- "connection": [
- {
- "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"],
- "table": ["stu"]
- }
- ],
- "password": "123456",
- "username": "root",
- "where": ""
- }
- },
- "writer": {
- "name": "hbase11xwriter",
- "parameter": {
- "hbaseConfig": {
- "hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
- },
- "table": "NEW_STU",
- "mode": "normal",
- "rowkeyColumn": [
- {
- "index":0,
- "type":"string"
- },
- {
- "index":-1,
- "type":"string",
- "value":"_"
- }
- ],
- "column": [
- {
- "index":1,
- "name": "cf1:name",
- "type": "string"
- },
- {
- "index":2,
- "name": "cf1:age",
- "type": "int"
- },
- {
- "index":3,
- "name": "cf1:clazz",
- "type": "string"
- },
- {
- "index":4,
- "name": "cf1:gender",
- "type": "string"
- }
- ],
- "versionColumn":{
- "index": -1,
- "value":"123456789"
- },
- "encoding": "utf-8"
-
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": 1
- }
- }
- }
- }
-
- #执行脚本
- datax.py mysqlTohbase.json
最主要的工作就是在原先的mysql数据导入的hive中的基础上进行where过滤
- vim mysqlTohive.json
-
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["id","name","age","clazz","gender"],
- "connection": [
- {
- "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"],
- "table": ["stu"]
- }
- ],
- "password": "123456",
- "username": "root",
- "where": "id >20"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {
- "name":"id",
- "type":"string"
- },
- {
- "name":"stu_name",
- "type":"string"
- },
- {
- "name":"age",
- "type":"string"
- },
- {
-
- "name":"clazz",
- "type":"string"
- },
- {
- "name":"gender",
- "type":"string"
- }
- ],
- "compress": "",
- "defaultFS": "hdfs://master:9000",
- "fieldDelimiter": ",",
- "fileName": "datax_mysqltohive1",
- "fileType": "text",
- "path": "/user/hive/warehouse/datax.db/datax_mysqltohive",
- "writeMode": "append"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": 1
- }
- }
- }
- }
-
-
- #执行脚本命令:
- datax.py mysqlTohive.json
1、配置文件出现错误,脚本不完整: