小文件会造成 nn 处理压力变大,大大降低了读取性能,整个 HDFS 文件系统访问缓慢,大量的小文件还会导致 nn 内存溢出,无法正常使用。
# 缓存某个表
spark.catalog.cacheTable("tableName")
# 释放缓存的表
spark.catalog.uncacheTable("tableName")
# 指定在进行 shuffle 操作时的分区数量,默认:200
spark.sql.shuffle.partitions
在 Spark SQL 客户端合并小文件主要有两种方式,一种是 repartition
,还有一种是 coalesce
方法。
两者区别:
repartition
在进行分区时会进行 shuffle 操作,通过哈希值来进行数据的重新分区。
coalesce
默认相当于 repartition
不走 shuffle 操作,假设当前合并为一个分区:coalesce(1)
,那么只会运行 1 个 task 任务,多余的资源会被浪费。
还需注意 coalesce
的分区缩减全在内存里进行处理,如果当前处理的数据量过大,会导致程序的内存溢出。
如果在进行 coalesce
操作前的分区数小于得到的分区数,coalesce
就不会起作用,也不会进行 shuffle 操作。
在 Spark SQL 中使用如下:
名称不区分大小写。
INSERT ... SELECT /*+REPARTITION(n)*/ ...
INSERT ... SELECT /*+COALESCE(n)*/ ...
使用场景:
假设我们当前有 P 个分区,现在需要调整为 N 个分区。
如果 P < N,那么就只能使用 repartition
。
如果 P > N,既可以使用 repartition
,也可以使用 coalesce
;但是如果大于很多的话建议用 repartition
,运行效率会更高,资源不会浪费。
方法一:
-- 是否开启分区调整功能,默认:false
set spark.sql.adaptive.enabled=true;
-- 开启分区调整后,在 reduce 阶段每个 task 最少处理的数据量,默认:64M,单位:B
-- 一般改成和集群块一样的大小,Hadoop2.7.2版本及之前默认64MB,Hadoop2.7.3版本及之后默认128M
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128000000;
-- 开启分区调整后,最小的 reducer 个数,默认:1
set spark.sql.adaptive.minNumPostShufflePartitions=1;
-- 开启分区调整后,最大的 reducer 个数,默认:500
set spark.sql.adaptive.maxNumPostShufflePartitions=500;
-- 开启分区调整后,在 reduce 阶段每个 task 最少处理的条数,默认:20000000
set spark.sql.adaptive.shuffle.targetPostShuffleRowCount=20000000;
一般情况数据量小的话,只需要设置前面两个参数就可以了。
参数调整前:
参数调整后:
新建了一个和其结构一样 test
表,然后导入数据,成功的合并了小文件:
方法二:
-- 执行 Map 前进行小文件合并,默认开启
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
-- 合并为一个 split 的最大值(单位byte),当超过该值时新建一个split,256MB
set mapreduce.input.fileinputformat.split.maxsize=268435456;
-- 在第一步合并后每个节点剩余的文件,如果大于该值(单位byte),单独创建一个split,256MB
set mapreduce.input.fileinputformat.split.minsize.per.node=268435456;
-- 在第二步处理完,每个机架上剩余的文件,如果大于该值(单位byte),单独创建一个split,256MB
set mapreduce.input.fileinputformat.split.minsize.per.rack=268435456;
参数调整前:
参数调整后:
新建了一个和其结构一样 test
表,然后导入数据,成功的合并了小文件:
以上参数调优都是临时调优,仅限于本次会话,如果想要永久设置的话只需要将参数配置到 hive-site.xml
文件中即可。
当我们在使用动态分区的时候,为了避免产生过多的小文件。我们可以使用 distribute by
的方式来对数据进行分区。
具体操作: 将动态分区的键设置为 distribute by
的 key 值。
假设有共有10个动态分区,那么使用调优之后只会产生10个 reducer,这样就大大减少了小文件的输出。
但是这样就会出现一个问题,就是每个分区下面只会有一个文件,而该文件存储的数据量会很大,我们后期访问起来还是会很慢。
解决方法:
我们可以通过 distribute by
+ 随机数 rand()
的方式来解决,以此来控制 map 端给 reduce 的数据量,达到数据量的均衡。
还需要结合如下参数:
-- 执行 Map 前进行小文件合并,默认开启
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
-- 在 map 的任务结束时合并小文件
set hive.merge.mapfiles = true;
-- 在 MapReduce 的任务结束时合并小文件
set hive.merge.mapredfiles = true;
-- 合并文件的大小
set hive.merge.size.per.task = 256000000;
-- 每个 Map 最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;
-- 每个reducer的大小, 默认是1G,输入文件如果是10G,那么就会起10个reducer,单位:B
set hive.exec.reducers.bytes.per.reducer=1073741824;
设置每个输出文件的大小控制在 256MB
左右。
我们还可以通过控制随机数的值来改变生成的文件个数,如:distribute by cast(rand()*3 as int)
,表示在同一分区中生成三个大小一致的文件。
针对 Hive 表中已经产生小文件的场景,对其进行归档。
将 $HADOOP_HOME/share/hadoop/tools/lib/
目录下的 hadoop-archives-xxx.jar
包,复制一份到 $HIVE_HOME/lib
目录下。
在 Hive 中开启如下参数:
-- 开启归档功能
set hive.archive.enabled=true;
-- Hive在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable=true;
-- 归档大小,单位:B
set har.partfile.size=256000000;
使用示例:
开启以上参数后,直接运行如下的 SQL 语句即可设置小文件归档操作。
-- 进行归档
alter table db_name.tb_name archive partition(dy='...');
-- 取消归档
alter table db_name.tb_name unarchive partition(dy='...');
归档的分区可以查看,但是不能 insert overwrite
,必须先取消归档。