HQL语法和运行参数层面,主要跟大家讲讲如果写出高效的HQL,以及如果利用一些控制参数来调优HQL的执行。这是HQL调优的一个大头。
Hive 的 SQL 语句在执行之前需要将 SQL 语句转换成 MapReduce 任务,因此需要了解具体的转换过程,可以在 SQL 语句中输入如下命令查看具体的执行计划。
查看执行计划,添加extended关键字可以查看更加详细的执行计划
explain [extended] query
列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时,如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。
Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他的列。这样做可以节省读取开销:中间表存储开销和数据整合开销
set hive.optimize.cp = true; ## 列裁剪,取数只取查询中需要用到的列,默认是true
将 SQL 语句中的 where 谓词逻辑都尽可能提前执行,减少下游处理的数据量。对应逻辑优化器是PredicatePushDown。
set hive.optimize.ppd=true; ## 默认是true
示例程序:
select a.*, b.* from a join b on a.id = b.id where b.age > 20;
select a.*, c.* from a join (select * from b where age > 20) c on a.id = c.id;
列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时,如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。
在查询的过程中只选择需要的分区,可以减少读入的分区数目,减少读入的数据量。
Hive 中与分区裁剪优化相关的则是:
set hive.optimize.pruner=true; ## 默认是true
在 HiveQL 解析阶段对应的则是 ColumnPruner 逻辑优化器。
select * from student where department = "AAAA";
如果一个mapreduce job碰到一对小文件作为输入,一个小文件启动一个Task
Map 输入合并
在执行 MapReduce 程序的时候,一般情况是一个文件的一个数据分块需要一个 mapTask 来处理。但是如果数据源是大量的小文件,这样就会启动大量的 mapTask 任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少 mapTask 任务数量。
## Map端输入、合并文件之后按照block的大小分割(默认)
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
## Map端输入,不合并
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
Map/Reduce输出合并
大量的小文件会给 HDFS 带来压力,影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来消除影响。
## 是否合并Map输出文件, 默认值为true
set hive.merge.mapfiles=true;
## 是否合并Reduce端输出文件,默认值为false
set hive.merge.mapredfiles=true;
## 合并文件的大小,默认值为256000000
set hive.merge.size.per.task=256000000;
## 每个Map 最大分割大小
set mapred.max.split.size=256000000;
## 一个节点上split的最少值
set mapred.min.split.size.per.node=1; // 服务器节点
## 一个机架上split的最少值
set mapred.min.split.size.per.rack=1; // 服务器机架
hive.merge.size.per.task 和 mapred.min.split.size.per.node 联合起来:
1、默认情况先把这个节点上的所有数据进行合并,如果合并的那个文件的大小超过了256M就开启另外一个文
件继续合并
2、如果当前这个节点上的数据不足256M,那么就都合并成一个逻辑切片。
现在有100个Task,总共有10000M的数据, 平均一下,每个Task执行100M的数据的计算。
假设只启动10个Task,每个Task就要执行1000M的数据。 如果只有2个Task, 5000M。
第一:MapReduce中的MapTask的并行度机制
Map数过大:当输入文件特别大,MapTask 特别多,每个计算节点分配执行的 MapTask 都很多,这时候可以考虑减少 MapTask 的数量。增大每个 MapTask 处理的数据量。而且 MapTask 过多,最终生成的结果文件数也太多。
1、Map阶段输出文件太小,产生大量小文件
2、初始化和创建Map的开销很大
Map数太小:当输入文件都很大,任务逻辑复杂,MapTask 执行非常慢的时候,可以考虑增加
MapTask 数,来使得每个 MapTask 处理的数据量减少,从而提高任务的执行效率。
1、文件处理或查询并发度小,Job执行时间过长
2、大量作业时,容易堵塞集群
在 MapReduce 的编程案例中,我们得知,一个MapReduce Job 的 MapTask 数量是由输入分片
InputSplit 决定的。而输入分片是由 FileInputFormat.getSplit() 决定的。一个输入分片对应一个
MapTask,而输入分片是由三个参数决定的:
参数 | 默认值 | 意义 |
---|---|---|
dfs.blocksize | 128M | HDFS默认数据块大小 |
mapreduce.input.fileinputformat.split.minsize | 1 | 最小分片大小(MR) |
mapreduce.input.fileinputformat.split.maxsize | 256M | 最大分片大小(MR) |
输入分片大小的计算是这么计算出来的:
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启用一个
MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处理效率
两种经典的控制MapTask的个数方案:减少MapTask数 或者 增加MapTask数
1、减少 MapTask 数是通过合并小文件来实现,这一点主要是针对数据源
2、增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数
重点注意:不推荐把这个值进行随意设置!
推荐的方式:使用默认的切块大小即可。如果非要调整,最好是切块的N倍数
NodeManager节点个数:N ===》 Task = ( N * 0.95) * M
第二:合理控制 MapTask 数量
1、减少 MapTask 数可以通过合并小文件来实现
2、增加 MapTask 数可以通过控制上一个 ReduceTask 默认的 MapTask 个数
计算方式
输入文件总大小:total_size
HDFS 设置的数据块大小:dfs_block_size
default_mapper_num = total_size / dfs_block_size
MapReduce 中提供了如下参数来控制 map 任务个数,从字面上看,貌似是可以直接设置 MapTask 个数的样子,但是很遗憾不行,这个参数设置只有在大于 default_mapper_num 的时候,才会生效。
set mapred.map.tasks=10; ## 默认值是2
那如果我们需要减少 MapTask 数量,但是文件大小是固定的,那该怎么办呢?
可以通过 mapred.min.split.size 设置每个任务处理的文件的大小,这个大小只有在大于
dfs_block_size 的时候才会生效
split_size = max(mapred.min.split.size, dfs_block_size)
split_num = total_size / split_size
compute_map_num = Math.min(split_num, Math.max(default_mapper_num,
mapred.map.tasks))
这样就可以减少 MapTask 数量了。
总结一下控制 mapper 个数的方法:
1、如果想增加 MapTask 个数,可以设置 mapred.map.tasks 为一个较大的值
2、如果想减少 MapTask 个数,可以设置 maperd.min.split.size 为一个较大的值
3、如果输入是大量小文件,想减少 mapper 个数,可以通过设置 hive.input.format 合并小文件
如果想要调整 mapper 个数,在调整之前,需要确定处理的文件大概大小以及文件的存在形式(是大量小文件,还是单个大文件),然后再设置合适的参数。不能盲目进行暴力设置,不然适得其反。
MapTask 数量与输入文件的 split 数息息相关,在 Hadoop 源码
org.apache.hadoop.mapreduce.lib.input.FileInputFormat 类中可以看到 split 划分的具体逻
辑。可以直接通过参数 mapred.map.tasks (默认值2)来设定 MapTask 数的期望值,但它不一定会
生效。
如果 ReduceTask 数量过多,一个 ReduceTask 会产生一个结果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个 Job 的输入,则会出现小文件需要进行合并的问题,而且启动和初始化ReduceTask 需要耗费资源。
如果 ReduceTask 数量过少,这样一个 ReduceTask 就需要处理大量的数据,并且还有可能会出现数据倾斜的问题,使得整个查询耗时长。 默认情况下,Hive 分配的 reducer 个数由下列参数决定:
Hadoop MapReduce 程序中,ReducerTask 个数的设定极大影响执行效率,ReducerTask 数量与输出文件的数量相关。如果 ReducerTask 数太多,会产生大量小文件,对HDFS造成压力。如果
ReducerTask 数太少,每个ReducerTask 要处理很多数据,容易拖慢运行时间或者造成 OOM。这使得Hive 怎样决定 ReducerTask 个数成为一个关键问题。遗憾的是 Hive 的估计机制很弱,不指定
ReducerTask 个数的情况下,Hive 会猜测确定一个ReducerTask 个数,基于以下两个设定:
参数1:hive.exec.reducers.bytes.per.reducer (默认256M)
参数2:hive.exec.reducers.max (默认为1009)
参数3:mapreduce.job.reduces (默认值为-1,表示没有设置,那么就按照以上两个参数
进行设置)
ReduceTask 的计算公式为:
N = Math.min(参数2,总输入数据大小 / 参数1)
可以通过改变上述两个参数的值来控制 ReduceTask 的数量。 也可以通过
set mapred.map.tasks=10;
set mapreduce.job.reduces=10;
通常情况下,有必要手动指定 ReduceTask 个数。考虑到 Mapper 阶段的输出数据量通常会比输入有大幅减少,因此即使不设定 ReduceTask 个数,重设 参数2 还是必要的。
依据经验,可以将 参数2 设定为 M * (0.95 * N) (N为集群中 NodeManager 个数)。一般来说,
NodeManage 和 DataNode 的个数是一样的。
Join优化整体原则:
1、优先过滤后再进行Join操作,最大限度的减少参与join的数据量
2、小表join大表,最好启动mapjoin,hive自动启用mapjoin, 小表不能超过25M,可以更改
3、Join on的条件相同的话,最好放入同一个job,并且join表的排列顺序从小到大:select a.*,
b.*, c.* from a join b on a.id = b.id join c on a.id = c.i
4、如果多张表做join, 如果多个链接条件都相同,会转换成一个JOb
优先过滤数据
尽量减少每个阶段的数据量,对于分区表能用上分区字段的尽量使用,同时只选择后面需要使用到的列,最大
限度的减少参与 Join 的数据量
小表 join 大表原则
小表 join 大表的时应遵守小表 join 大表原则,原因是 join 操作的 reduce 阶段,位于 join 左边
的表内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存溢出的几率。join 中执行顺序是
从左到右生成 Job,应该保证连续查询中的表的大小从左到右是依次增加的。
使用相同的连接键
在 hive 中,当对 3 个或更多张表进行 join 时,如果 on 条件使用相同字段,那么它们会合并为一个
MapReduce Job,利用这种特性,可以将相同的 join on 放入一个 job 来节省执行时间。
尽量原子操作
尽量避免一个SQL包含复杂的逻辑,可以使用中间表来完成复杂的逻辑。
大表Join大表
1、空key过滤:有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的
reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据
是异常数据,我们需要在SQL语句中进行过滤。
2、空key转换:有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join
的结果中,此时我们可以表a中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer
上
这个优化措施,但凡能用就用! 大表 join 小表 小表满足需求: 小表数据小于控制条件时
MapJoin 是将 join 双方比较小的表直接分发到各个 map 进程的内存中,在 map 进程中进行 join 操
作,这样就不用进行 reduce 步骤,从而提高了速度。只有 join 操作才能启用 MapJoin。
## 是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中。
## 对应逻辑优化器是MapJoinProcessor
set hive.auto.convert.join = true;
## 刷入内存表的大小(字节)
set hive.mapjoin.smalltable.filesize = 25000000;
## hive会基于表的size自动的将普通join转换成mapjoin
set hive.auto.convert.join.noconditionaltask=true;
## 多大的表可以自动触发放到内层LocalTask中,默认大小10M
set hive.auto.convert.join.noconditionaltask.size=10000000;
Hive 可以进行多表 Join。Join 操作尤其是 Join 大表的时候代价是非常大的。MapJoin 特别适合大小表join的情况。在Hive join场景中,一般总有一张相对小的表和一张相对大的表,小表叫 build table,大表叫 probe table。Hive 在解析带 join 的 SQL 语句时,会默认将最后一个表作为 probe table,将前面的表作为 build table 并试图将它们读进内存。如果表顺序写反,probe table 在前面,引发 OOM 的风险就高了。在维度建模数据仓库中,事实表就是 probe table,维度表就是 build table。这种 Join 方式在 map 端直接完成 join 过程,消灭了 reduce,效率很高。而且 MapJoin 还支持非等值连接。
当 Hive 执行 Join 时,需要选择哪个表被流式传输(stream),哪个表被缓存(cache)。Hive 将
JOIN 语句中的最后一个表用于流式传输,因此我们需要确保这个流表在两者之间是最大的。如果要在不同的 key 上 join 更多的表,那么对于每个 join 集,只需在 ON 条件右侧指定较大的表。
也可以手动开启mapjoin:
--SQL方式,在SQL语句中添加MapJoin标记(mapjoin hint)
--将小表放到内存中,省去shffle操作
// 在没有开启mapjoin的情况下,执行的是reduceJoin
SELECT /*+ MAPJOIN(smallTable) */ smallTable.key, bigTable.value FROM
smallTable JOIN bigTable ON smallTable.key = bigTable.key;
/*+mapjoin(smalltable)*/
Sort-Merge-Bucket(SMB) Map Join
它是另一种Hive Join优化技术,使用这个技术的前提是所有的表都必须是分桶表(bucket)和分桶排序的(sort)。分桶表的优化!
具体实现:
1、针对参与join的这两张做相同的hash散列,每个桶里面的数据还要排序
2、这两张表的分桶个数要成倍数。
3、开启 SMB join 的开关!
一些常见参数设置:
## 当用户执行bucket map join的时候,发现不能执行时,禁止查询
set hive.enforce.sortmergebucketmapjoin=false;
## 如果join的表通过sort merge join的条件,join是否会自动转换为sort merge join
set hive.auto.convert.sortmerge.join=true;
## 当两个分桶表 join 时,如果 join on的是分桶字段,小表的分桶数是大表的倍数时,可以启用
mapjoin 来提高效率。
# bucket map join优化,默认值是 false
set hive.optimize.bucketmapjoin=false;
## bucket map join 优化,默认值是 false
set hive.optimize.bucketmapjoin.sortedmerge=false;
在编写 Join 查询语句时,如果确定是由于 join 出现的数据倾斜,那么请做如下设置:
# join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.skewjoin.key=100000;
# 如果是join过程出现倾斜应该设置为true
set hive.optimize.skewjoin=false;
如果开启了,在 Join 过程中 Hive 会将计数超过阈值 hive.skewjoin.key(默认100000)的倾斜 key 对应的行临时写进文件中,然后再启动另一个 job 做 map join 生成结果.
通过 hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个 job 的 mapper 数量,默认
10000。
set hive.skewjoin.mapjoin.map.tasks=10000;
join的时候表的顺序的关系:前面的表都会被加载到内存中。后面的表进行磁盘扫描
select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.id;
Hive 自 0.14.0 开始,加入了一项 “Cost based Optimizer” 来对 HQL 执行计划进行优化,这个功能通
过 “hive.cbo.enable” 来开启。在 Hive 1.1.0 之后,这个 feature 是默认开启的,它可以 自动优化 HQL中多个 Join 的顺序,并选择合适的 Join 算法。
CBO,成本优化器,代价最小的执行计划就是最好的执行计划。传统的数据库,成本优化器做出最优化的执行计划是依据统计信息来计算的。Hive 的成本优化器也一样。
Hive 在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成的。根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。
要使用基于成本的优化(也称为CBO),请在查询开始设置以下参数:
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
当 Hive 设定为严格模式(hive.mapred.mode=strict)时,不允许在 HQL 语句中出现笛卡尔积,这实
际说明了 Hive 对笛卡尔积支持较弱。因为找不到 Join key,Hive 只能使用 1 个 reducer 来完成笛卡尔积。
当然也可以使用 limit 的办法来减少某个表参与 join 的数据量,但对于需要笛卡尔积语义的需求来说,经常是一个大表和一个小表的 Join 操作,结果仍然很大(以至于无法用单机处理),这时 MapJoin 才是最好的解决办法。MapJoin,顾名思义,会在 Map 端完成 Join 操作。这需要将 Join 操作的一个或多个表完全读入内存。
PS:MapJoin 在子查询中可能出现未知 BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的方法是,给 Join 添加一个 Join key,原理很简单:将小表扩充一列 join key,并将小表的条目复制数倍,join key 各不相同;将大表扩充一列 join key 为随机数。
精髓就在于复制几倍,最后就有几个 reduce 来做,而且大表的数据是前面小表扩张 key 值范围里面随机出来的,所以复制了几倍 n,就相当于这个随机范围就有多大 n,那么相应的,大表的数据就被随机的分为了 n 份。并且最后处理所用的 reduce 数量也是 n,而且也不会出现数据倾斜。
默认情况下,Map 阶段同一个 Key 的数据会分发到一个 Reduce 上,当一个 Key 的数据过大时会产生数据倾斜。进行 group by 操作时可以从以下两个方面进行优化:
1. Map端部分聚合
事实上并不是所有的聚合操作都需要在 Reduce 部分进行,很多聚合操作都可以先在 Map 端进行部分聚合,然后在 Reduce 端的得出最终结果。
## 开启Map端聚合参数设置
set hive.map.aggr=true;
# 设置map端预聚合的行数阈值,超过该值就会分拆job,默认值100000
set hive.groupby.mapaggr.checkinterval=100000
2. 有数据倾斜时进行负载均衡
当 HQL 语句使用 group by 时数据出现倾斜时,如果该变量设置为 true,那么 Hive 会自动进行负载均衡。策略就是把 MapReduce 任务拆分成两个:第一个先做预汇总,第二个再做最终汇总。
# 自动优化,有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata=false;
当选项设定为 true 时,生成的查询计划有两个 MapReduce 任务.
1、在第一个 MapReduce 任务中,map 的输出结果会随机分布到 reduce 中,每个 reduce 做部分聚合
操作,并输出结果,这样处理的结果是相同的`group by key`有可能分发到不同的 reduce 中,从而达到
负载均衡的目的;
2、第二个 MapReduce 任务再根据预处理的数据结果按照 group by key 分布到各个 reduce 中,最
后完成最终的聚合操作。
Map 端部分聚合:并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果,对应的优化器为 GroupByOptimizer。
那么如何用 group by 方式同时统计多个列?
select t.a, sum(t.b), count(t.c), count(t.d) from some_table t group by t.a;
下面是解决方法:
select t.a, sum(t.b), count(t.c), count(t.d) from (
select a,b,null c,null d from some_table
union all
select a,0 b,c,null d from some_table group by a,c
union all
select a,0 b,null c,d from some_table group by a,d
) t;
order by 只能是在一个 reduce 进程中进行,所以如果对一个大数据集进行 order by ,会导致一个
reduce 进程中处理的数据相当大,造成查询执行缓慢。
1、在最终结果上进行order by,不要在中间的大数据集上进行排序。如果最终结果较少,可以在一个
reduce上进行排序时,那么就在最后的结果集上进行order by。
2、如果是取排序后的前N条数据,可以使用distribute by和sort by在各个reduce上进行排序后前N
条,然后再对各个reduce的结果集合合并后在一个reduce中全局排序,再取前N条,因为参与全局排序的
order by的数据量最多是reduce个数 * N,所以执行效率会有很大提升。
在Hive中,关于数据排序,提供了四种语法,一定要区分这四种排序的使用方式和适用场景。
1、order by:全局排序,缺陷是只能使用一个reduce
2、sort by:单机排序,单个reduce结果有序
3、cluster by:对同一字段分桶并排序,不能和sort by连用
4、distribute by + sort by:分桶,保证同一字段值只存在一个结果文件当中,结合sort by保证每
个reduceTask结果有序
Hive HQL 中的 order by 与其他 SQL 方言中的功能一样,就是将结果按某字段全局排序,这会导致所有 map 端数据都进入一个 reducer 中,在数据量大时可能会长时间计算不完。
如果使用 sort by,那么还是会视情况启动多个 reducer 进行排序,并且保证每个 reducer 内局部有
序。为了控制map 端数据分配到 reducer 的 key,往往还要配合 distribute by 一同使用。如果不加
distribute by 的话,map 端数据就会随机分配到 reducer。
提供一种方式实现全局排序:两种方式:
1、建表导入数据准备
create table if not exists student(id int, name string, sex string, age int,
department string) row format delimited fields terminated by ",";
load data local inpath "/home/bigdata/students.txt" into table student;
2、第一种方式
-- 直接使用order by来做。如果结果数据量很大,这个任务的执行效率会非常低
select id,name,age from student order by age desc limit 3;
3、第二种方式
-- 使用distribute by + sort by 多个reduceTask,每个reduceTask分别有序
set mapreduce.job.reduces=3;
drop table student_orderby_result;
-- 范围分桶 0 < 18 < 1 < 20 < 2
create table student_orderby_result as select * from student distribute by (case
when age > 20 then 0 when age < 18 then 2 else 1 end) sort by (age desc);
关于分界值的确定,使用采样的方式,来估计数据分布规律。
当要统计某一列去重数时,如果数据量很大,count(distinct) 就会非常慢,原因与 order by 类似,
count(distinct) 逻辑只会有很少的 reducer 来处理。这时可以用 group by 来改写:
-- 先 group by 在 count
select count(1) from (
select age from student
where department >= "MA"
group by age
) t;
再来一个例子:
优化前 ,一个普通的只使用一个reduceTask来进行count(distinct) 操作
-- 优化前(只有一个reduce,先去重再count负担比较大):
select count(distinct id) from tablename;
优化后 ,但是这样写会启动两个MR job(单纯 distinct 只会启动一个),所以要确保数据量大到启动
job 的 overhead 远小于计算耗时,才考虑这种方法。当数据集很小或者 key 的倾斜比较明显时,
group by 还可能会比 distinct 慢。
优化后 ,但是这样写会启动两个MR job(单纯 distinct 只会启动一个),所以要确保数据量大到启动
job 的 overhead 远小于计算耗时,才考虑这种方法。当数据集很小或者 key 的倾斜比较明显时,
group by 还可能会比 distinct 慢。
select t.a, count(t.b) , sum(t.c) from t group by t.a;
select t.a, count(distinct t.b, t.c) from t group by t.a;
在Hive的早期版本中,in/exists语法是不被支持的,但是从 hive-0.8x 以后就开始支持这个语法。但是
不推荐使用这个语法。虽然经过测验,Hive-2.3.6 也支持 in/exists 操作,但还是推荐使用 Hive 的一个高效替代方案:left semi join
比如说:
-- in / exists 实现
select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a where exists (select id from b where a.id = b.id);
可以使用join来改写:
select a.id, a.namr from a join b on a.id = b.id;
应该转换成:
-- left semi join 实现
select a.id, a.name from a left semi join b on a.id = b.id;
在计算类似 scan, filter, aggregation 的时候, vectorization 技术以设置批处理的增量大小为 1024 行
单次来达到比单条记录单次获得更高的效率
set hive.vectorized.execution.enabled=true ;
set hive.vectorized.execution.reduce.enabled=true;
如果你碰到一堆SQL,并且这一堆SQL的模式还一样。都是从同一个表进行扫描,做不同的逻辑。
有可优化的地方:如果有n条SQL,每个SQL执行都会扫描一次这张表。
如果一个 HQL 底层要执行 10 个 Job,那么能优化成 8 个一般来说,肯定能有所提高,多重插入就是一个非常实用的技能。一次读取,多次插入,有些场景是从一张表读取数据后,要多次利用,这时可以使用 multi insert 语法:
from sale_detail
insert overwrite table sale_detail_multi partition (sale_date='2021',
region='china' )
select shop_name, customer_id, total_price where .....
insert overwrite table sale_detail_multi partition (sale_date='2022',
region='china' )
select shop_name, customer_id, total_price where .....;
说明:multi insert语法有一些限制。
1、一般情况下,单个SQL中最多可以写128路输出,超过128路,则报语法错误。
2、在一个multi insert中:
对于分区表,同一个目标分区不允许出现多次。
对于未分区表,该表不能出现多次。
3、对于同一张分区表的不同分区,不能同时有insert overwrite和insert into操作,否则报错返回。
Multi-Group by 是 Hive 的一个非常好的特性,它使得 Hive 中利用中间结果变得非常方便。例如:
FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b
ON (a.userid = b.userid and a.ds='2019-03-20' )) subq1
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2019-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2019-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school;
上述查询语句使用了 Multi-Group by 特性连续 group by 了 2 次数据,使用不同的 Multi-Group by。
这一特性可以减少一次 MapReduce 操作。
map 输出压缩
set mapreduce.map.output.compress=true;
set
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
中间数据压缩
中间数据压缩就是对 hive 查询的多个 Job 之间的数据进行压缩。最好是选择一个节省CPU耗时的压缩方式。可以采用 snappy 压缩算法,该算法的压缩和解压效率都非常高。
set hive.exec.compress.intermediate=true;
set
hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
结果数据压缩
最终的结果数据(Reducer输出数据)也是可以进行压缩的,可以选择一个压缩效果比较好的,可以减少数据的大小和数据的磁盘读写时间; 注:常用的 gzip,snappy 压缩算法是不支持并行处理的,如果数据源是 gzip/snappy压缩文件大文件,这样只会有有个 mapper 来处理这个文件,会严重影响查询效率。 所以如果结果数据需要作为其他查询任务的数据源,可以选择支持 splitable 的 LZO 算法,这样既能对结果文件进行压缩,还可以并行的处理,这样就可以大大的提高 job 执行的速度了。
set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.G
zipCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
Hadoop集群支持的压缩算法:
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.SnappyCodec
org.apache.hadoop.io.compress.Lz4Codec
com.hadoop.compression.lzo.LzoCodec
com.hadoop.compression.lzo.LzopCodec