• Spark SQL 与 Hive 的小文件调优


    小文件危害

    小文件会造成 nn 处理压力变大,大大降低了读取性能,整个 HDFS 文件系统访问缓慢,大量的小文件还会导致 nn 内存溢出,无法正常使用。

    表的缓存

    # 缓存某个表
    spark.catalog.cacheTable("tableName")
    
    # 释放缓存的表
    spark.catalog.uncacheTable("tableName")
    
    • 1
    • 2
    • 3
    • 4
    • 5

    shuffle 分区数调整

    # 指定在进行 shuffle 操作时的分区数量,默认:200
    spark.sql.shuffle.partitions
    
    • 1
    • 2

    Spark SQL 客户端设置合并

    在 Spark SQL 客户端合并小文件主要有两种方式,一种是 repartition,还有一种是 coalesce 方法。

    两者区别:

    repartition 在进行分区时会进行 shuffle 操作,通过哈希值来进行数据的重新分区。

    coalesce 默认相当于 repartition 不走 shuffle 操作,假设当前合并为一个分区:coalesce(1),那么只会运行 1 个 task 任务,多余的资源会被浪费。

    还需注意 coalesce 的分区缩减全在内存里进行处理,如果当前处理的数据量过大,会导致程序的内存溢出

    如果在进行 coalesce 操作前的分区数小于得到的分区数,coalesce 就不会起作用,也不会进行 shuffle 操作。

    在 Spark SQL 中使用如下:

    名称不区分大小写。

    INSERT ... SELECT /*+REPARTITION(n)*/ ...
    
    INSERT ... SELECT /*+COALESCE(n)*/ ...
    
    • 1
    • 2
    • 3

    使用场景:

    假设我们当前有 P 个分区,现在需要调整为 N 个分区。

    如果 P < N,那么就只能使用 repartition

    如果 P > N,既可以使用 repartition,也可以使用 coalesce;但是如果大于很多的话建议用 repartition,运行效率会更高,资源不会浪费。

    Hive 客户端处理小文件合并

    方法一:

    -- 是否开启分区调整功能,默认:false
    set spark.sql.adaptive.enabled=true;
    
    -- 开启分区调整后,在 reduce 阶段每个 task 最少处理的数据量,默认:64M,单位:B
    -- 一般改成和集群块一样的大小,Hadoop2.7.2版本及之前默认64MB,Hadoop2.7.3版本及之后默认128M
    set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128000000;
    
    -- 开启分区调整后,最小的 reducer 个数,默认:1
    set spark.sql.adaptive.minNumPostShufflePartitions=1;
    
    -- 开启分区调整后,最大的 reducer 个数,默认:500
    set spark.sql.adaptive.maxNumPostShufflePartitions=500;
    
    -- 开启分区调整后,在 reduce 阶段每个 task 最少处理的条数,默认:20000000
    set spark.sql.adaptive.shuffle.targetPostShuffleRowCount=20000000;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    一般情况数据量小的话,只需要设置前面两个参数就可以了。

    参数调整前:

    在这里插入图片描述

    参数调整后:

    新建了一个和其结构一样 test 表,然后导入数据,成功的合并了小文件:

    在这里插入图片描述


    方法二:

    -- 执行 Map 前进行小文件合并,默认开启
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    
    -- 合并为一个 split 的最大值(单位byte),当超过该值时新建一个split,256MB
    set mapreduce.input.fileinputformat.split.maxsize=268435456;
    
    -- 在第一步合并后每个节点剩余的文件,如果大于该值(单位byte),单独创建一个split,256MB
    set mapreduce.input.fileinputformat.split.minsize.per.node=268435456;
    
    -- 在第二步处理完,每个机架上剩余的文件,如果大于该值(单位byte),单独创建一个split,256MB
    set mapreduce.input.fileinputformat.split.minsize.per.rack=268435456;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    参数调整前:

    在这里插入图片描述

    参数调整后:

    新建了一个和其结构一样 test 表,然后导入数据,成功的合并了小文件:

    在这里插入图片描述

    以上参数调优都是临时调优,仅限于本次会话,如果想要永久设置的话只需要将参数配置到 hive-site.xml 文件中即可。

    动态分区调优

    当我们在使用动态分区的时候,为了避免产生过多的小文件。我们可以使用 distribute by 的方式来对数据进行分区。

    具体操作: 将动态分区的键设置为 distribute by 的 key 值。

    假设有共有10个动态分区,那么使用调优之后只会产生10个 reducer,这样就大大减少了小文件的输出。

    但是这样就会出现一个问题,就是每个分区下面只会有一个文件,而该文件存储的数据量会很大,我们后期访问起来还是会很慢。

    解决方法:

    我们可以通过 distribute by + 随机数 rand() 的方式来解决,以此来控制 map 端给 reduce 的数据量,达到数据量的均衡。

    还需要结合如下参数:

    -- 执行 Map 前进行小文件合并,默认开启
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    
    -- 在 map 的任务结束时合并小文件
    set hive.merge.mapfiles = true;
    
    -- 在 MapReduce 的任务结束时合并小文件
    set hive.merge.mapredfiles = true;
    
    -- 合并文件的大小 
    set hive.merge.size.per.task = 256000000;
    
    -- 每个 Map 最大输入大小(这个值决定了合并后文件的数量) 
    set mapred.max.split.size=256000000;   
    
    -- 每个reducer的大小, 默认是1G,输入文件如果是10G,那么就会起10个reducer,单位:B
    set hive.exec.reducers.bytes.per.reducer=1073741824;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    设置每个输出文件的大小控制在 256MB 左右。

    我们还可以通过控制随机数的值来改变生成的文件个数,如:distribute by cast(rand()*3 as int),表示在同一分区中生成三个大小一致的文件。

    小文件归档

    针对 Hive 表中已经产生小文件的场景,对其进行归档。

    $HADOOP_HOME/share/hadoop/tools/lib/ 目录下的 hadoop-archives-xxx.jar 包,复制一份到 $HIVE_HOME/lib 目录下。

    在 Hive 中开启如下参数:

    -- 开启归档功能
    set hive.archive.enabled=true;
    
    -- Hive在创建归档时是否可以设置父目录
    set hive.archive.har.parentdir.settable=true;
    
    -- 归档大小,单位:B
    set  har.partfile.size=256000000;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    使用示例:

    开启以上参数后,直接运行如下的 SQL 语句即可设置小文件归档操作。

    -- 进行归档
    alter table db_name.tb_name archive partition(dy='...');
    
    -- 取消归档
    alter table db_name.tb_name unarchive partition(dy='...');
    
    • 1
    • 2
    • 3
    • 4
    • 5

    归档的分区可以查看,但是不能 insert overwrite,必须先取消归档。

  • 相关阅读:
    2021-07-09 springboot 整合shiro(前后端分离解决方案)
    XTuner InternLM-Chat 个人小助手认知微调实践
    JWFD开源工作流矩阵引擎测试版本BUG20231022修正代码
    DeepinV20/Ubuntu安装postgresql方法
    Golang 单元测试合集整理,(我最常用 gomonkey)欢迎收藏
    蓝桥杯官网填空题(方格计数)
    爬虫抓取网站数据
    2022-33周(8.08-8.14) 项目问题整理
    Go Atomic
    Python读取复杂电子表格(CSV)数据小技巧一则
  • 原文地址:https://blog.csdn.net/weixin_46389691/article/details/128150211