• Hive调优及优化(包含数据倾斜和小文件问题)


    一、数据倾斜

    Hive 中数据倾斜的基本表现

    • 一般都发生在 Sql 中 group by 和 join on 上,而且和数据逻辑绑定比较深。
    • 任务进度长时间维持在99%(或100%),查看任务监控页面**,发现只有少量(1个或几个)reduce子任务未完成**。因为其处理的数据量和其他reduce差异过大

    如何产生

    • key的分布不均匀或者说某些key太集中
    • 业务数据自身的特性,例如不同数据类型关联产生数据倾斜
    • SQL语句导致的数据倾斜

    二、优化SQL语句导致的数据倾斜

     2.1 、空值引发的数据倾斜

    第一种:可以直接不让null值参与join操作,即不让null值有shuffle阶段

    1. create table res_tbl as
    2. select n.* from
    3. (select * from res where id is not null ) n
    4. left join org_tbl o on n.id = o.id;


    第二种:因为null值参与shuffle时的hash结果是一样的,那么我们可以给null值随机赋值,这样它们的hash结果就不一样,就会进到不同的reduce中

    1. SELECT *
    2. FROM log a
    3. LEFT JOIN users b ON CASE
    4. WHEN a.user_id IS NULL THEN concat('hive_', rand())
    5. ELSE a.user_id
    6. END = b.user_id;
    • join内连接,是返回两个表中都有的符合条件的行。
    • left   join左连接,是返回左表中所有的行及右表中符合条件的行。
    • right   join右连接,是返回右表中所有的行及左表中符合条件的行。
    • full   join全连接,是返回左表中所有的行及右表中所有的行,并按条件连接。

    2.2 不同数据类型引发的数据倾斜

    对于两个表join,表a中需要join的字段key为int,表b中key字段既有string类型也有int类型。当按照key进行两个表的join操作时,默认的Hash操作会按int型的id来进行分配,这样所有的string类型都被分配成同一个id,结果就是所有的string类型的字段进入到一个reduce中,引发数据倾斜。

    解决方案

    如果key字段既有string类型也有int类型,我们直接把int类型都转为string就好了,这样key字段都为string,hash时就按照string类型分配了

    1. SELECT *
    2. FROM users a
    3. LEFT JOIN logs b ON a.usr_id = CAST(b.user_id AS string);

    2.3、 不可拆分大文件引发的数据倾斜

    我们在对文件进行压缩时,为避免因不可拆分大文件而引发数据读取的倾斜,在数据压缩的时候可以采用bzip2和Zip等支持文件分割的压缩算法。

    2.4、 表连接时引发的数据倾斜

    2.4.1 build table(小表)前置

    Hive在解析带join的SQL语句时,会默认将最后一个表作为probe table(探查表),将前面的表作为build table并试图将它们读进内存。如果表顺序写反,probe table在前面,引发OOM的风险就高了。

    在维度建模数据仓库中,事实表就是probe table,维度表就是build table。假设现在要将日历记录事实表和记录项编码维度表来join 维度表在前,事实表在后

    2.4.2 Reduce join 改为Map join

    在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map            端进行join,避免reducer处理

    •  MapJoin set hive.auto.convert.join = true;默认为true 把小表自动加载到内存中
    •  hive.mapjoin.smalltable.filesize   默认值25000000(25MB) 即当小于25MB的时候默认是小表

    三、聚合类group by操作,发生数据倾斜

    3.1 map段部分聚合

    • 开启Map端聚合参数设置set hive.map.aggr=true
      默认 true,并不是所有的聚合操作都需要在reduce部分进行,很多聚合操作都可以先在Map端进行部分聚合,然后reduce端得出最终结果。
      对应的优化器为GroupByOptimizer
    • 在Map端进行聚合操作的条目数目set hive.grouby.mapaggr.checkinterval=100000
      hive.groupby.mapaggr.checkinterval设置map端预聚合的行数阈值,超过该值就会分拆job,默认值100000

    3.2 、有数据倾斜的时候进行负载均衡(默认是false)

    • set hive.groupby.skewindata = true 

      hive.groupby.skewindatafalse, 解释:在group by时启动两个MR job。第一个job会将map端数据随机输入reducer,每个reducer做部分聚合,相同的key就会分布在不同的reducer中。第二个job再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果。
    • 阶段拆分-两阶段聚合 需要聚合的key前加一个随机数的前后缀,这样就均匀了,之后再按照原始的key聚合一次

    • 生成的查询计划有两 个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。相同的 GroupB Key 有可能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。

      1. 假设 key = 水果
      2. select count(substr(a.tmp,1,2)) as key from(
      3. select concat(key,'_',cast(round(10*rand())+1 as string)) tmp
      4. from table group by tmp )a group by key

      四、Hive 小文件问题及解决

    产生原因:

    • 动态分区插入数据,产生大量的小文件,从而导致map数量剧增;

    • 倒入数据时产生,每执行一次 insert 时hive中至少产生一个文件,文件数量=MapTask数量*分区数,insert 导入时至少会有一个MapTask

    解决方案:

    输入阶段合并

    需要更改Hive的输入文件格式,即参数hive.input.format,默认值是org.apache.hadoop.hive.ql.io.HiveInputFormat,我们改成org.apache.hadoop.hive.ql.io.CombineHiveInputFormat。 这样比起上面调整mapper数时,又会多出两个参数,分别是mapred.min.split.size.per.nodemapred.min.split.size.per.rack,含义是单节点和单机架上的最小split大小。如果发现有split大小小于这两个值(默认都是100MB),则会进行合并


    输出阶段合并

    直接将hive.merge.mapfiles和hive.merge.mapredfiles都设为true即可,前者表示将map-only任务的输出合并,后者表示将map-reduce任务的输出合并。 另外,hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值,hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值,默认值都是1GB。如果平均大小不足的话,就会另外启动一个任务来进行合并。

    五、其他HiveSQL优化

    5.1、 sort by代替order by

    HiveSQL中的order by与其他SQL方言中的功能一样,就是将结果按某字段全局排序,这会导致所有map端数据都进入一个reducer中,在数据量大时可能会长时间计算不完。

    如果使用sort by,那么还是会视情况启动多个reducer进行排序,并且保证每个reducer内局部有序。为了控制map端数据分配到reducer的key,往往还要配合distribute by一同使用。如果不加distribute by的话,map端数据就会随机分配到reducer。
    举个例子,假如要以UID为key,以上传时间倒序、记录类型倒序输出记录数据。
     

    1. select uid,upload_time,event_type,record_data
    2. from calendar_record_log
    3. where pt_date >= 20190201 and pt_date <= 20190224
    4. distribute by uid
    5. sort by upload_time desc,event_type desc;

    5.2、 group by代替distinct 

    当要统计某一列的去重数时,如果数据量很大,count(distinct)就会非常慢,原因与order by类似,count(distinct)逻辑只会有很少的reducer来处理。这时可以用group by来改写

    1. select count(1) from (
    2. select uid from calendar_record_log
    3. where pt_date >= 20190101
    4. group by uid
    5. ) t;

  • 相关阅读:
    CiteSpace for Mac 最新保姆级教程
    Java网络编程之阻塞式IO与非阻塞IO
    贪心算法-寻找硬币
    【中秋国庆不断更】OpenHarmony定义可动画属性:@AnimatableExtend装饰器
    聊聊SQL语句中 DDL 、DML 、DQL 、DCL 分别是什么
    代码随想录Leetcode 343. 整数拆分
    并发编程十 定时任务&定时线程池
    nginx版本平滑升级(超详细)
    c++结构体
    计网总结☞应用层
  • 原文地址:https://blog.csdn.net/libaowen609/article/details/126670716