Hive 中数据倾斜的基本表现
如何产生
第一种:可以直接不让null值参与join操作,即不让null值有shuffle阶段
- create table res_tbl as
- select n.* from
- (select * from res where id is not null ) n
- left join org_tbl o on n.id = o.id;
第二种:因为null值参与shuffle时的hash结果是一样的,那么我们可以给null值随机赋值,这样它们的hash结果就不一样,就会进到不同的reduce中
- SELECT *
- FROM log a
- LEFT JOIN users b ON CASE
- WHEN a.user_id IS NULL THEN concat('hive_', rand())
- ELSE a.user_id
- END = b.user_id;
对于两个表join,表a中需要join的字段key为int,表b中key字段既有string类型也有int类型。当按照key进行两个表的join操作时,默认的Hash操作会按int型的id来进行分配,这样所有的string类型都被分配成同一个id,结果就是所有的string类型的字段进入到一个reduce中,引发数据倾斜。
解决方案:
如果key字段既有string类型也有int类型,我们直接把int类型都转为string就好了,这样key字段都为string,hash时就按照string类型分配了
- SELECT *
- FROM users a
- LEFT JOIN logs b ON a.usr_id = CAST(b.user_id AS string);
我们在对文件进行压缩时,为避免因不可拆分大文件而引发数据读取的倾斜,在数据压缩的时候可以采用bzip2和Zip等支持文件分割的压缩算法。
2.4.1 build table(小表)前置
Hive在解析带join的SQL语句时,会默认将最后一个表作为probe table(探查表),将前面的表作为build table并试图将它们读进内存。如果表顺序写反,probe table在前面,引发OOM的风险就高了。
在维度建模数据仓库中,事实表就是probe table,维度表就是build table。假设现在要将日历记录事实表和记录项编码维度表来join 维度表在前,事实表在后
2.4.2 Reduce join 改为Map join
在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map 端进行join,避免reducer处理
默认 true,并不是所有的聚合操作都需要在reduce部分进行,很多聚合操作都可以先在Map端进行部分聚合,然后reduce端得出最终结果。 对应的优化器为GroupByOptimizer |
hive.groupby.mapaggr.checkinterval | 设置map端预聚合的行数阈值,超过该值就会分拆job,默认值100000 |
set hive.groupby.skewindata = true
hive.groupby.skewindata | false, 解释:在group by时启动两个MR job。第一个job会将map端数据随机输入reducer,每个reducer做部分聚合,相同的key就会分布在不同的reducer中。第二个job再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果。 |
阶段拆分-两阶段聚合 需要聚合的key前加一个随机数的前后缀,这样就均匀了,之后再按照原始的key聚合一次
生成的查询计划有两 个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。相同的 GroupB Key 有可能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。
- 假设 key = 水果
- select count(substr(a.tmp,1,2)) as key from(
- select concat(key,'_',cast(round(10*rand())+1 as string)) tmp
- from table group by tmp )a group by key
动态分区插入数据,产生大量的小文件,从而导致map数量剧增;
倒入数据时产生,每执行一次 insert 时hive中至少产生一个文件,文件数量=MapTask数量*分区数,insert 导入时至少会有一个MapTask
输入阶段合并
需要更改Hive的输入文件格式,即参数hive.input.format,默认值是org.apache.hadoop.hive.ql.io.HiveInputFormat,我们改成org.apache.hadoop.hive.ql.io.CombineHiveInputFormat。 这样比起上面调整mapper数时,又会多出两个参数,分别是mapred.min.split.size.per.node和mapred.min.split.size.per.rack,含义是单节点和单机架上的最小split大小。如果发现有split大小小于这两个值(默认都是100MB),则会进行合并
输出阶段合并
直接将hive.merge.mapfiles和hive.merge.mapredfiles都设为true即可,前者表示将map-only任务的输出合并,后者表示将map-reduce任务的输出合并。 另外,hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值,hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值,默认值都是1GB。如果平均大小不足的话,就会另外启动一个任务来进行合并。
HiveSQL中的order by与其他SQL方言中的功能一样,就是将结果按某字段全局排序,这会导致所有map端数据都进入一个reducer中,在数据量大时可能会长时间计算不完。
如果使用sort by,那么还是会视情况启动多个reducer进行排序,并且保证每个reducer内局部有序。为了控制map端数据分配到reducer的key,往往还要配合distribute by一同使用。如果不加distribute by的话,map端数据就会随机分配到reducer。
举个例子,假如要以UID为key,以上传时间倒序、记录类型倒序输出记录数据。
- select uid,upload_time,event_type,record_data
- from calendar_record_log
- where pt_date >= 20190201 and pt_date <= 20190224
- distribute by uid
- sort by upload_time desc,event_type desc;
当要统计某一列的去重数时,如果数据量很大,count(distinct)就会非常慢,原因与order by类似,count(distinct)逻辑只会有很少的reducer来处理。这时可以用group by来改写
- select count(1) from (
- select uid from calendar_record_log
- where pt_date >= 20190101
- group by uid
- ) t;