目录
3.全量导入(将数据从mysql导入到hive,hive表不存在,导入时自动创建hive表)
5.增量导入-append模式(将mysql数据增量导入hadoop)
6.增量导入-lastmodified模式(将mysql时间列大于等于阈值的数据增量导入HDFS)
Sqoop是一款开源的etl工具,主要用于在Hadoop(Hive)与传统数据库(mysql、postgresql...)间进行数据的传递,可以将关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导出到关系型数据库中。
Sqoop是专为大数据批量传输而设计,能够分割数据集并创建map task任务来处理每个区块。
- sqoop-list-databases 列出服务器上存在的数据库清单
- sqoop-list-tables 列出服务器上存在的数据表清单
- sqoop-job 用来生成一个sqoop的任务,生成后,该任务并不执行,除非使用命令执行该任务。
- sqoop import 从RDBMS导入到HDFS
- sqoop export 从HDFS导出到RDBMS
- --connect jdbc:mysql://ip:port/db_name 连接mysql数据库
- --username username 数据库用户名
- --password password 数据库密码
- --table table_name 源头数据表
- --target-dir /user/hive/warehouse/... 指定导入的目录,若不指定就会导入默认的HDFS存储路径
- --delete-target-dir HDFS地址如果存在的话删除,一般都是用在全量导入,增量导入的时候加该参数会报错
- --num-mappers 1 maptask数量
- -m 3 (使用3个mapper任务,即进程,并发导入。一般RDBMS的导出速度控制在60~80MB/s,每个 map 任务的处理速度5~10MB/s 估算所需并发map数。)
- --split-by id (根据id字段来切分工作单元实现哈希分片,从而将不同分片的数据分发到不同 map 任务上去跑,避免数据倾斜。)
- --autoreset-to-one-mapper 如果表没有主键,导入时使用一个mapper执行
- --input-null-string '\\N' 空值转换
- --input-null-non-string '\\N' 非空字符串替换
- --fields-terminated-by "\t" 字符串分割
- --query 'select * from test_table where id>10 and $CONDITIONS' ($CONDITIONS必须要加上就相当于一个配置参数,sql语句用单引号,用了SQL查询就不能加参数--table )
- -hive-home <dir> 重写$HIVE_HOME
- -hive-import 插入数据到hive当中,使用hive的默认分隔符
- -hive-overwrite 重写插入
- -create-hive-table 建表,如果表已经存在,该操作会报错
- -hive-table <table-name> 设置到hive当中的表名
- -hive-drop-import-delims 导入到hive时删除 \n, \r, and \0001
- -hive-delims-replacement 导入到hive时用自定义的字符替换掉 \n, \r, and \0001
- -hive-partition-key hive分区的key
- -hive-partition-value <v> hive分区的值
- -map-column-hive <map> 类型匹配,sql类型对应到hive类型
- --direct 是为了利用某些数据库本身提供的快速导入导出数据的工具,比如mysql的mysqldump
- --direct-split-size 在使用上面direct直接导入的基础上,对导入的流按字节数分块,特别是使用直连模式从PostgreSQL导入数据的时候,可以将一个到达设定大小的文件分为几个独立的文件。
- --columns <列名> 指定列
- --z, -–compress 打开压缩功能
- --compression-codec < c > 使用Hadoop的压缩,默认为gzip压缩
- --fetch-size < n > 从数据库一次性读入的记录数
- --append 将数据追加到hdfs中已经存在的dataset中。使用该参数,sqoop将把数据先导入到一个临时目录中,然后重新给文件命名到一个正式的目录中,以避免和该目录中已存在的文件重名。
- --as-avrodatafile 导入数据格式为avro
- --as-sequencefile 导入数据格式为sqeuqncefile
- --as-textfile 导入数据格式为textfile
- --as-parquetfile 导入数据格式为parquet
-
-
- 详细内容可以参考Sqoop用户手册(英文版):
- http://archive.cloudera.com/cdh/3/sqoop/SqoopUserGuide.html
- # 全量导入(将数据从mysql导入到HDFS指定目录)
- sqoop import --connect jdbc:mysql://ip:prot/db \
- --username username --password password \
- --query 'select * from mysql_table_name where $CONDITIONS' \
- --target-dir /user/hive/warehouse/... \
- --delete-target-dir \
- --fields-terminated-by '\t' \
- --hive-drop-import-delims \
- --null-string '\\N' \
- --null-non-string '\\N' \
- --split-by id \
- -m 1
- # 全量导入(将数据从mysql导入到已有的hive表)
- sqoop import --connect jdbc:mysql://ip:prot/db \
- --username username --password password \
- --table mysql_table_name \
- --hive-import \
- --hive-database hive_db_name \
- --hive-table hive_table_name \
- --fields-terminated-by '\t' \
- --hive-overwrite \
- --null-string '\\N' \
- --null-non-string '\\N' \
- --split-by id \
- -m 1
- # 全量导入(将数据从mysql导入到hive,hive表不存在,导入时自动创建hive表)
- sqoop import --connect jdbc:mysql://ip:prot/db \
- --username username --password password \
- --table mysql_table_name \
- --hive-import \
- --hive-database hive_db_name \
- --create-hive-table \
- --hive-table hive_table_name \
- --fields-terminated-by '\t' \
- --null-string '\\N' \
- --null-non-string '\\N' \
- --split-by id \
- -m 1
-
- #--hive-table hive_table_name
- #该参数不添加时默认hive建表表名与mysql表名一致
- # 全库导入(将mysql全库数据批量导入到hive)
- sqoop import-all-tables
- --connect jdbc:mysql://ip:prot/db \
- --username username --password password \
- --hive-database hive_db_name \
- --create-hive-table \
- --hive-import \
- --hive-overwrite \
- --fields-terminated-by '\t' \
- --exclude-tables 'drop_table' \
- --autoreset-to-one-mapper \
- --as-textfile
-
-
- #--exclude-tables 'drop_table' \
- # (此参数可以 exclude掉不需要import的表(多个表逗号分隔))
- #增量导入-append模式(将mysql数据增量导入hive表)
- sqoop import jdbc:mysql://ip:prot/db \
- --username username --password password \
- --table mysql_table_name \
- --hive-import \
- --hive-database hive_db_name \
- --hive-table hive_table_name \
- --hive-drop-import-delims \
- --fields-terminated-by '\t' \
- --null-string '\\N' \
- --null-non-string '\\N' \
- --incremental append \
- --check-column column_name \
- --last-value 10 \
- --split-by column_name \
- -m 1
-
- #增量导入-append模式(将mysql数据增量导入hdfs)
- sqoop import jdbc:mysql://ip:prot/db \
- --username username --password password \
- --table mysql_table_name \
- --target-dir /user/hive/warehouse/hive_db_name.db/hive_table_name \
- --incremental append \
- --check-column column_name \
- --last-value 10 \
- --split-by column_name \
- -m 1
-
- #--incremental append 基于递增列的增量导入(将递增列大于阈值的所有数据导入hadoop)
- #--check-column column_name 递增列
- #--last-value 数字 阈值
- #增量导入-lastmodified模式(将mysql时间列大于等于阈值的数据增量导入HDFS)
- #lastmodified模式不支持直接导入Hive表,但是可以使用导入HDFS的方法,--target-dir设置成Hive table在HDFS中的关联位置即可)
- sqoop import --connect jdbc:mysql://ip:prot/db \
- --username username --password password \
- --table mysql_table_name \
- --target-dir /user/hive/warehouse/hive_db_name.db/hive_table_name \
- --null-string '\\N' \
- --null-non-string '\\N' \
- --split-by id \
- -m 1 \
- --fields-terminated-by '\t' \
- --hive-drop-import-delims \
- --incremental lastmodified \
- --check-column time_column_name \
- --last-value '2022-09-09 10:00:01'
-
- #--incremental lastmodified 基于时间列的增量导入(将时间列大于阈值的所有数据导入hdfs)
- #--check-column time_column_name 时间列
- #--last-value 时间 阈值
- #全量导出(将hdfs全量导出到mysql表)
- sqoop export --jdbc:mysql://ip:prot/db \
- --username username --password password \
- --table mysql_table_name \
- --columns column1,column2,column3
- --export-dir /user/hive/warehouse//hive_db_name.db/hive_table_name \
- --input-fields-terminated-by '\001' \
- --input-null-string '\\N' \
- --input-null-non-string '\\N'