• 大数据开发之小文件合并


    问题背景:
    MR计算引擎一般把一次查询切分成多个stage,每个stage是一次MapReduce计算,而MapReduce计算则是将多个Map Task读取到的数据,划分成不同的分区,汇聚到不同的Reduce上进行计算。
    Spark计算引擎也是先将一次查询划分成多个stage,各自计算,再写入表中。
    文件的平均大小=分区大小/文件数,理想情况下,文件的平均大小在128MB~256MB之间,如果文件的平均大小过于小,则认为数据表里小文件过多,可以进行文件的合并。
    小文件增加会导致Namenode节点的内存暴增,严重影响了集群的整体性能。严重情况下会导致集群无法对外服务。
    小文件的增加增加了文件读写时间,因为客户端需要从NameNode获取元信息,再从DataNode读取对应数据。
    小文件增加还会增加下游任务的map数量,因为 map数量和源数据的文件个数,文件大小等因素相关。
    如果map数量很大,并且分区数也很大的情况下,就会产生大量的小文件,小文件个数 和 Map数 * 分区数正相关。因为每个map在往目标表中的某个分区输出数据时,都会输出一个单独的文件。
    具体而言:
    1、对于MAP only任务,小文件个数 <= Map数 * 分区数,当某些map 没有对应分区字段的数据时,并不会向目标分区输出文件。
    在这里插入图片描述
    2、对于Map-Reduce 任务,小文件个数 <= Reduce数 * 分区数,当某些reduce 没有对应分区字段的数据时,并不会向目标分区输出文件。 在这里插入图片描述
    解决方案:
    1.减小目标表分区数:
    分区表通常存在是为了读操作更加快速,而分区数一般由业务字段决定,所以可以根据业务,以及数据分布,特殊处理,合并表小的的分区。
    2.减小map/reduce 数
    2.1. 减小map数
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
    – 设置hive输入端端进行小文件合并
    set mapred.max.split.size=536870912 --512M
    – 每个Map最大输入大小,调大可以减小mapper数
    以下参数也可以适当调大:
    set mapred.min.split.size.per.node =536870912 – 一个节点上split的至少的大小 ,如果小于这个数会进行合并
    set mapred.min.split.size.per.rack =536870912 – 一个交换机下split的至少的大小,如果小于这个参数,则进行合并
    set mapreduce.job.split.metainfo.maxsize = 10000000; – The maximum permissible size of the split metainfo file. The JobTracker won’t attempt to read split metainfo files bigger than the configured value. No limits if set to -1.
    Hadoop v2.x 后,对比Hadoop v1.x参数命名方式发生了变化:
    mapred.min.split.size => mapreduce.input.fileinputformat.split.minsize。
    mapred.max.split.size => mapreduce.input.fileinputformat.split.maxsize。
    mapred.min.split.size.per.rack => mapreduce.input.fileinputformat.split.minsize.per.rack。
    mapred.min.split.size.per.node => mapreduce.input.fileinputformat.split.minsize.per.node。
    dfs.block.size => dfs.blocksize
    mapred.map.tasks => mapreduce.job.maps
    但是参数设置太大,会导致Java Heap Space 出现oom,导致失败。
    2.2 减小ruduce 数量
    set mapred.reduce.tasks=10; – 除非知道数据量的分布,一般不直接设置
    set hive.exec.reducers.bytes.per.reducer=1073741824 – 每个reduce处理的数据量,实际上可能每个reduce 处理的数据量会超过它,默认1G 增加该参数,可以减小reduce数量
    3.输出端进行小文件合并:
    set hive.merge.mapfiles = true; – map-only任务开启合并,默认是true
    set hive.merge.mapredfiles = true; – map-reduce任务开启合并,默认false
    set hive.merge.smallfiles.avgsize=160000000; – 如果不是partitioned table的话,输出table文件的平均大小小于这个值,启动merge job,如果是partitioned table,则分别计算每个partition下文件平均大小,只merge平均大小小于这个值的partition。
    set hive.merge.size.per.task = 256000000 – merge job后每个文件的目标大小
    4.使用sort by,distrubute by
    sort by 实际上将Map-only 任务变为了Map-reduce 任务,主要由reduce任务数决定,且sort 会对shuffle-read的数据进行排序合并(reducer 内部排序合并),与下述处理方式异曲同工:
    用MapReduce的合并输入,用如下参数Insert Overwrite一次分区:
    set hive.execute.engine=mapreduce;
    set mapred.min.split.size=5368709120;
    set mapred.max.split.size=5368709120;
    set hive.exec.orc.default.compress=ZLIB;
    set hive.exec.orc.encoding.strategy=COMPRESSION;
    set hive.exec.orc.compression.strategy=COMPRESSION;
    INSERT OVERWRITE TABLE tablename partition(par_20220101)
    SELECT * FROM tablename partition(par_20220101) t ;
    若是非分区表,则先将表中的数据排序后,再插入表中;
    若是分区表,则先创建新表,插入旧表全部分区数据,再对新表数据排序后,插入表中;
    SQL聚合操作过程较长,和串行读文件有关,可以适当增加并发,在处理的时候切小一点:
    set hive.execute.engine=mapreduce;
    set mapreduce.input.fileinputformat.split.maxsize = 1048576;

       distribute by key会将同一个key的数据shuffle到同一个reducer(sort by 不会),然后在reducer 内部进行聚合合并,最终写到对应key的分区。
       使用distribute后在数据倾斜的情况,会导致任务运行时间变长(用时间换空间),这时候可以采取数据划分策略 ,将数量特别大的那些key建一个任务执行Map-only insert ,并调整好 map的数量,对于剩下的数据采用 insert...select... distrubute by key 的方法减小小文件。这样能保持整个表的小文件不会太大。
    
    • 1
    • 2
  • 相关阅读:
    cas与volatile
    FCOS论文复现:通用物体检测算法
    两道ospf 网络优化题目解析
    ADBMS1818驱动程序解析
    Android在XML和代码中,同时设置背景,导致背景叠加的问题
    Linux C/C++ 处理命令行参数
    【ATT&CK】MITRE Caldera-emu插件
    解密Prompt系列28. LLM Agent之金融领域摸索:FinMem & FinAgent
    深入了解Redis的TYPE命令
    制造企业如何通过PLM系统实现BOM管理的飞跃
  • 原文地址:https://blog.csdn.net/julyclj55555/article/details/126547879