• Hive调优总结(三)-语法和运行参数层面


    HQL语法和运行参数层面,主要跟大家讲讲如果写出高效的HQL,以及如果利用一些控制参数来调优HQL的执行。这是HQL调优的一个大头。

    1. 查看Hive执行计划

    Hive 的 SQL 语句在执行之前需要将 SQL 语句转换成 MapReduce 任务,因此需要了解具体的转换过程,可以在 SQL 语句中输入如下命令查看具体的执行计划。

    查看执行计划,添加extended关键字可以查看更加详细的执行计划
    explain [extended] query
    
    • 1
    • 2

    2. 列裁剪

    列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时,如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。
    Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他的列。这样做可以节省读取开销:中间表存储开销和数据整合开销

    set hive.optimize.cp = true; ## 列裁剪,取数只取查询中需要用到的列,默认是true
    
    • 1

    3. 谓词下推

    将 SQL 语句中的 where 谓词逻辑都尽可能提前执行,减少下游处理的数据量。对应逻辑优化器是PredicatePushDown。

    set hive.optimize.ppd=true; ## 默认是true
    
    • 1

    示例程序:

    select a.*, b.* from a join b on a.id = b.id where b.age > 20;
    
    • 1
    select a.*, c.* from a join (select * from b where age > 20) c on a.id = c.id;
    
    • 1

    4.分区裁剪

    列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时,如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。

    在查询的过程中只选择需要的分区,可以减少读入的分区数目,减少读入的数据量。

    Hive 中与分区裁剪优化相关的则是:

    set hive.optimize.pruner=true; ## 默认是true
    
    • 1

    在 HiveQL 解析阶段对应的则是 ColumnPruner 逻辑优化器。

    select * from student where department = "AAAA";
    
    • 1

    5.合并小文件

    如果一个mapreduce job碰到一对小文件作为输入,一个小文件启动一个Task
    
    • 1

    Map 输入合并
    在执行 MapReduce 程序的时候,一般情况是一个文件的一个数据分块需要一个 mapTask 来处理。但是如果数据源是大量的小文件,这样就会启动大量的 mapTask 任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少 mapTask 任务数量。

    ## Map端输入、合并文件之后按照block的大小分割(默认)
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    ## Map端输入,不合并
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    
    • 1
    • 2
    • 3
    • 4

    Map/Reduce输出合并
    大量的小文件会给 HDFS 带来压力,影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来消除影响。

    ## 是否合并Map输出文件, 默认值为true
    set hive.merge.mapfiles=true;
    ## 是否合并Reduce端输出文件,默认值为false
    set hive.merge.mapredfiles=true;
    ## 合并文件的大小,默认值为256000000
    set hive.merge.size.per.task=256000000;
    ## 每个Map 最大分割大小
    set mapred.max.split.size=256000000;
    ## 一个节点上split的最少值
    set mapred.min.split.size.per.node=1; // 服务器节点
    ## 一个机架上split的最少值
    set mapred.min.split.size.per.rack=1; // 服务器机架
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    hive.merge.size.per.task 和 mapred.min.split.size.per.node 联合起来:

    1、默认情况先把这个节点上的所有数据进行合并,如果合并的那个文件的大小超过了256M就开启另外一个文
    件继续合并
    2、如果当前这个节点上的数据不足256M,那么就都合并成一个逻辑切片。
    
    • 1
    • 2
    • 3

    现在有100个Task,总共有10000M的数据, 平均一下,每个Task执行100M的数据的计算。

    假设只启动10个Task,每个Task就要执行1000M的数据。 如果只有2个Task, 5000M。

    6.合理设置MapTask并行度

    第一:MapReduce中的MapTask的并行度机制

    Map数过大:当输入文件特别大,MapTask 特别多,每个计算节点分配执行的 MapTask 都很多,这时候可以考虑减少 MapTask 的数量。增大每个 MapTask 处理的数据量。而且 MapTask 过多,最终生成的结果文件数也太多。

    1、Map阶段输出文件太小,产生大量小文件
    2、初始化和创建Map的开销很大
    
    • 1
    • 2

    Map数太小:当输入文件都很大,任务逻辑复杂,MapTask 执行非常慢的时候,可以考虑增加
    MapTask 数,来使得每个 MapTask 处理的数据量减少,从而提高任务的执行效率。

    1、文件处理或查询并发度小,Job执行时间过长
    2、大量作业时,容易堵塞集群
    
    • 1
    • 2

    在 MapReduce 的编程案例中,我们得知,一个MapReduce Job 的 MapTask 数量是由输入分片
    InputSplit 决定的。而输入分片是由 FileInputFormat.getSplit() 决定的。一个输入分片对应一个
    MapTask,而输入分片是由三个参数决定的:

    参数默认值意义
    dfs.blocksize128MHDFS默认数据块大小
    mapreduce.input.fileinputformat.split.minsize1最小分片大小(MR)
    mapreduce.input.fileinputformat.split.maxsize256M最大分片大小(MR)

    输入分片大小的计算是这么计算出来的:

    long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
    
    • 1

    默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启用一个
    MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处理效率
    两种经典的控制MapTask的个数方案:减少MapTask数 或者 增加MapTask数

    1、减少 MapTask 数是通过合并小文件来实现,这一点主要是针对数据源
    2、增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数
    
    重点注意:不推荐把这个值进行随意设置!
    推荐的方式:使用默认的切块大小即可。如果非要调整,最好是切块的N倍数
    
    • 1
    • 2
    • 3
    • 4
    • 5

    NodeManager节点个数:N ===》 Task = ( N * 0.95) * M

    第二:合理控制 MapTask 数量
    1、减少 MapTask 数可以通过合并小文件来实现
    2、增加 MapTask 数可以通过控制上一个 ReduceTask 默认的 MapTask 个数

    计算方式

    输入文件总大小:total_size
    HDFS 设置的数据块大小:dfs_block_size
    default_mapper_num = total_size / dfs_block_size
    
    • 1
    • 2
    • 3

    MapReduce 中提供了如下参数来控制 map 任务个数,从字面上看,貌似是可以直接设置 MapTask 个数的样子,但是很遗憾不行,这个参数设置只有在大于 default_mapper_num 的时候,才会生效。

    set mapred.map.tasks=10; ## 默认值是2
    
    • 1

    那如果我们需要减少 MapTask 数量,但是文件大小是固定的,那该怎么办呢?
    可以通过 mapred.min.split.size 设置每个任务处理的文件的大小,这个大小只有在大于
    dfs_block_size 的时候才会生效

    split_size = max(mapred.min.split.size, dfs_block_size)
    split_num = total_size / split_size
    compute_map_num = Math.min(split_num, Math.max(default_mapper_num,
    mapred.map.tasks))
    
    • 1
    • 2
    • 3
    • 4

    这样就可以减少 MapTask 数量了。
    总结一下控制 mapper 个数的方法:

    1、如果想增加 MapTask 个数,可以设置 mapred.map.tasks 为一个较大的值
    2、如果想减少 MapTask 个数,可以设置 maperd.min.split.size 为一个较大的值
    3、如果输入是大量小文件,想减少 mapper 个数,可以通过设置 hive.input.format 合并小文件
    
    • 1
    • 2
    • 3

    如果想要调整 mapper 个数,在调整之前,需要确定处理的文件大概大小以及文件的存在形式(是大量小文件,还是单个大文件),然后再设置合适的参数。不能盲目进行暴力设置,不然适得其反。
    MapTask 数量与输入文件的 split 数息息相关,在 Hadoop 源码
    org.apache.hadoop.mapreduce.lib.input.FileInputFormat 类中可以看到 split 划分的具体逻
    辑。可以直接通过参数 mapred.map.tasks (默认值2)来设定 MapTask 数的期望值,但它不一定会
    生效。

    7.合理设置ReduceTask并行度

    如果 ReduceTask 数量过多,一个 ReduceTask 会产生一个结果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个 Job 的输入,则会出现小文件需要进行合并的问题,而且启动和初始化ReduceTask 需要耗费资源。

    如果 ReduceTask 数量过少,这样一个 ReduceTask 就需要处理大量的数据,并且还有可能会出现数据倾斜的问题,使得整个查询耗时长。 默认情况下,Hive 分配的 reducer 个数由下列参数决定:

    Hadoop MapReduce 程序中,ReducerTask 个数的设定极大影响执行效率,ReducerTask 数量与输出文件的数量相关。如果 ReducerTask 数太多,会产生大量小文件,对HDFS造成压力。如果
    ReducerTask 数太少,每个ReducerTask 要处理很多数据,容易拖慢运行时间或者造成 OOM。这使得Hive 怎样决定 ReducerTask 个数成为一个关键问题。遗憾的是 Hive 的估计机制很弱,不指定
    ReducerTask 个数的情况下,Hive 会猜测确定一个ReducerTask 个数,基于以下两个设定:

    参数1:hive.exec.reducers.bytes.per.reducer (默认256M)
    参数2:hive.exec.reducers.max (默认为1009)
    参数3:mapreduce.job.reduces (默认值为-1,表示没有设置,那么就按照以上两个参数
    进行设置)
    
    • 1
    • 2
    • 3
    • 4

    ReduceTask 的计算公式为:

    N = Math.min(参数2,总输入数据大小 / 参数1)
    
    • 1

    可以通过改变上述两个参数的值来控制 ReduceTask 的数量。 也可以通过

    set mapred.map.tasks=10;
    set mapreduce.job.reduces=10;
    
    • 1
    • 2

    通常情况下,有必要手动指定 ReduceTask 个数。考虑到 Mapper 阶段的输出数据量通常会比输入有大幅减少,因此即使不设定 ReduceTask 个数,重设 参数2 还是必要的。
    依据经验,可以将 参数2 设定为 M * (0.95 * N) (N为集群中 NodeManager 个数)。一般来说,
    NodeManage 和 DataNode 的个数是一样的。

    8.Join优化

    Join优化整体原则:

    1、优先过滤后再进行Join操作,最大限度的减少参与join的数据量
    2、小表join大表,最好启动mapjoin,hive自动启用mapjoin, 小表不能超过25M,可以更改
    3、Join on的条件相同的话,最好放入同一个job,并且join表的排列顺序从小到大:select a.*,
    b.*, c.* from a join b on a.id = b.id join c on a.id = c.i
    4、如果多张表做join, 如果多个链接条件都相同,会转换成一个JOb
    
    • 1
    • 2
    • 3
    • 4
    • 5

    优先过滤数据

    尽量减少每个阶段的数据量,对于分区表能用上分区字段的尽量使用,同时只选择后面需要使用到的列,最大
    限度的减少参与 Join 的数据量
    
    • 1
    • 2

    小表 join 大表原则

    小表 join 大表的时应遵守小表 join 大表原则,原因是 join 操作的 reduce 阶段,位于 join 左边
    的表内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存溢出的几率。join 中执行顺序是
    从左到右生成 Job,应该保证连续查询中的表的大小从左到右是依次增加的。
    
    • 1
    • 2
    • 3

    使用相同的连接键

    在 hive 中,当对 3 个或更多张表进行 join 时,如果 on 条件使用相同字段,那么它们会合并为一个
    MapReduce Job,利用这种特性,可以将相同的 join on 放入一个 job 来节省执行时间。
    
    • 1
    • 2

    尽量原子操作

    尽量避免一个SQL包含复杂的逻辑,可以使用中间表来完成复杂的逻辑。
    
    • 1

    大表Join大表

    1、空key过滤:有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的
    reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据
    是异常数据,我们需要在SQL语句中进行过滤。
    2、空key转换:有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join
    的结果中,此时我们可以表a中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer
    上
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    9. 启用 MapJoin

    这个优化措施,但凡能用就用! 大表 join 小表 小表满足需求: 小表数据小于控制条件时
    
    • 1

    MapJoin 是将 join 双方比较小的表直接分发到各个 map 进程的内存中,在 map 进程中进行 join 操
    作,这样就不用进行 reduce 步骤,从而提高了速度。只有 join 操作才能启用 MapJoin。

    ## 是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中。
    ## 对应逻辑优化器是MapJoinProcessor
    set hive.auto.convert.join = true;
    ## 刷入内存表的大小(字节)
    set hive.mapjoin.smalltable.filesize = 25000000;
    ## hive会基于表的size自动的将普通join转换成mapjoin
    set hive.auto.convert.join.noconditionaltask=true;
    ## 多大的表可以自动触发放到内层LocalTask中,默认大小10M
    set hive.auto.convert.join.noconditionaltask.size=10000000;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Hive 可以进行多表 Join。Join 操作尤其是 Join 大表的时候代价是非常大的。MapJoin 特别适合大小表join的情况。在Hive join场景中,一般总有一张相对小的表和一张相对大的表,小表叫 build table,大表叫 probe table。Hive 在解析带 join 的 SQL 语句时,会默认将最后一个表作为 probe table,将前面的表作为 build table 并试图将它们读进内存。如果表顺序写反,probe table 在前面,引发 OOM 的风险就高了。在维度建模数据仓库中,事实表就是 probe table,维度表就是 build table。这种 Join 方式在 map 端直接完成 join 过程,消灭了 reduce,效率很高。而且 MapJoin 还支持非等值连接。

    当 Hive 执行 Join 时,需要选择哪个表被流式传输(stream),哪个表被缓存(cache)。Hive 将
    JOIN 语句中的最后一个表用于流式传输,因此我们需要确保这个流表在两者之间是最大的。如果要在不同的 key 上 join 更多的表,那么对于每个 join 集,只需在 ON 条件右侧指定较大的表。
    也可以手动开启mapjoin:

    --SQL方式,在SQL语句中添加MapJoin标记(mapjoin hint)
    --将小表放到内存中,省去shffle操作
    // 在没有开启mapjoin的情况下,执行的是reduceJoin
    SELECT /*+ MAPJOIN(smallTable) */ smallTable.key, bigTable.value FROM
    smallTable JOIN bigTable ON smallTable.key = bigTable.key;
    
    /*+mapjoin(smalltable)*/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Sort-Merge-Bucket(SMB) Map Join

    它是另一种Hive Join优化技术,使用这个技术的前提是所有的表都必须是分桶表(bucket)和分桶排序的(sort)。分桶表的优化!
    具体实现:

    1、针对参与join的这两张做相同的hash散列,每个桶里面的数据还要排序
    2、这两张表的分桶个数要成倍数。
    3、开启 SMB join 的开关!
    
    • 1
    • 2
    • 3

    一些常见参数设置:

    ## 当用户执行bucket map join的时候,发现不能执行时,禁止查询
    set hive.enforce.sortmergebucketmapjoin=false;
    ## 如果join的表通过sort merge join的条件,join是否会自动转换为sort merge join
    set hive.auto.convert.sortmerge.join=true;
    ## 当两个分桶表 join 时,如果 join on的是分桶字段,小表的分桶数是大表的倍数时,可以启用
    mapjoin 来提高效率。
    # bucket map join优化,默认值是 false
    set hive.optimize.bucketmapjoin=false;
    ## bucket map join 优化,默认值是 false
    set hive.optimize.bucketmapjoin.sortedmerge=false;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    10. Join数据倾斜优化

    在编写 Join 查询语句时,如果确定是由于 join 出现的数据倾斜,那么请做如下设置:

    # join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
    set hive.skewjoin.key=100000;
    # 如果是join过程出现倾斜应该设置为true
    set hive.optimize.skewjoin=false;
    
    • 1
    • 2
    • 3
    • 4

    如果开启了,在 Join 过程中 Hive 会将计数超过阈值 hive.skewjoin.key(默认100000)的倾斜 key 对应的行临时写进文件中,然后再启动另一个 job 做 map join 生成结果.

    通过 hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个 job 的 mapper 数量,默认
    10000。

    set hive.skewjoin.mapjoin.map.tasks=10000;
    
    • 1

    11. CBO优化

    join的时候表的顺序的关系:前面的表都会被加载到内存中。后面的表进行磁盘扫描

    select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.id;
    
    • 1

    Hive 自 0.14.0 开始,加入了一项 “Cost based Optimizer” 来对 HQL 执行计划进行优化,这个功能通
    过 “hive.cbo.enable” 来开启。在 Hive 1.1.0 之后,这个 feature 是默认开启的,它可以 自动优化 HQL中多个 Join 的顺序,并选择合适的 Join 算法

    CBO,成本优化器,代价最小的执行计划就是最好的执行计划。传统的数据库,成本优化器做出最优化的执行计划是依据统计信息来计算的。Hive 的成本优化器也一样。
    Hive 在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成的。根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。
    要使用基于成本的优化(也称为CBO),请在查询开始设置以下参数:

    set hive.cbo.enable=true;
    set hive.compute.query.using.stats=true;
    set hive.stats.fetch.column.stats=true;
    set hive.stats.fetch.partition.stats=true;
    
    • 1
    • 2
    • 3
    • 4

    12. 怎样做笛卡尔积

    当 Hive 设定为严格模式(hive.mapred.mode=strict)时,不允许在 HQL 语句中出现笛卡尔积,这实
    际说明了 Hive 对笛卡尔积支持较弱。因为找不到 Join key,Hive 只能使用 1 个 reducer 来完成笛卡尔积。

    当然也可以使用 limit 的办法来减少某个表参与 join 的数据量,但对于需要笛卡尔积语义的需求来说,经常是一个大表和一个小表的 Join 操作,结果仍然很大(以至于无法用单机处理),这时 MapJoin 才是最好的解决办法。MapJoin,顾名思义,会在 Map 端完成 Join 操作。这需要将 Join 操作的一个或多个表完全读入内存。

    PS:MapJoin 在子查询中可能出现未知 BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的方法是,给 Join 添加一个 Join key,原理很简单:将小表扩充一列 join key,并将小表的条目复制数倍,join key 各不相同;将大表扩充一列 join key 为随机数。

    精髓就在于复制几倍,最后就有几个 reduce 来做,而且大表的数据是前面小表扩张 key 值范围里面随机出来的,所以复制了几倍 n,就相当于这个随机范围就有多大 n,那么相应的,大表的数据就被随机的分为了 n 份。并且最后处理所用的 reduce 数量也是 n,而且也不会出现数据倾斜。

    13. Group By优化

    默认情况下,Map 阶段同一个 Key 的数据会分发到一个 Reduce 上,当一个 Key 的数据过大时会产生数据倾斜。进行 group by 操作时可以从以下两个方面进行优化:
    1. Map端部分聚合
    事实上并不是所有的聚合操作都需要在 Reduce 部分进行,很多聚合操作都可以先在 Map 端进行部分聚合,然后在 Reduce 端的得出最终结果。

    ## 开启Map端聚合参数设置
    set hive.map.aggr=true;
    # 设置map端预聚合的行数阈值,超过该值就会分拆job,默认值100000
    set hive.groupby.mapaggr.checkinterval=100000
    
    • 1
    • 2
    • 3
    • 4

    2. 有数据倾斜时进行负载均衡
    当 HQL 语句使用 group by 时数据出现倾斜时,如果该变量设置为 true,那么 Hive 会自动进行负载均衡。策略就是把 MapReduce 任务拆分成两个:第一个先做预汇总,第二个再做最终汇总。

    # 自动优化,有数据倾斜的时候进行负载均衡(默认是false)
    set hive.groupby.skewindata=false;
    
    • 1
    • 2

    当选项设定为 true 时,生成的查询计划有两个 MapReduce 任务.

    1、在第一个 MapReduce 任务中,map 的输出结果会随机分布到 reduce 中,每个 reduce 做部分聚合
    操作,并输出结果,这样处理的结果是相同的`group by key`有可能分发到不同的 reduce 中,从而达到
    负载均衡的目的;
    2、第二个 MapReduce 任务再根据预处理的数据结果按照 group by key 分布到各个 reduce 中,最
    后完成最终的聚合操作。
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Map 端部分聚合:并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果,对应的优化器为 GroupByOptimizer。
    那么如何用 group by 方式同时统计多个列?

    select t.a, sum(t.b), count(t.c), count(t.d) from some_table t group by t.a;
    
    • 1

    下面是解决方法:

    select t.a, sum(t.b), count(t.c), count(t.d) from (
    select a,b,null c,null d from some_table
    union all
    select a,0 b,c,null d from some_table group by a,c
    union all
    select a,0 b,null c,d from some_table group by a,d
    ) t;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    14. Order By优化

    order by 只能是在一个 reduce 进程中进行,所以如果对一个大数据集进行 order by ,会导致一个
    reduce 进程中处理的数据相当大,造成查询执行缓慢。

    1、在最终结果上进行order by,不要在中间的大数据集上进行排序。如果最终结果较少,可以在一个
    reduce上进行排序时,那么就在最后的结果集上进行order by。
    2、如果是取排序后的前N条数据,可以使用distribute by和sort by在各个reduce上进行排序后前N
    条,然后再对各个reduce的结果集合合并后在一个reduce中全局排序,再取前N条,因为参与全局排序的
    order by的数据量最多是reduce个数 * N,所以执行效率会有很大提升。
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在Hive中,关于数据排序,提供了四种语法,一定要区分这四种排序的使用方式和适用场景。

    1、order by:全局排序,缺陷是只能使用一个reduce
    2、sort by:单机排序,单个reduce结果有序
    3、cluster by:对同一字段分桶并排序,不能和sort by连用
    4、distribute by + sort by:分桶,保证同一字段值只存在一个结果文件当中,结合sort by保证每
    个reduceTask结果有序
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Hive HQL 中的 order by 与其他 SQL 方言中的功能一样,就是将结果按某字段全局排序,这会导致所有 map 端数据都进入一个 reducer 中,在数据量大时可能会长时间计算不完。

    如果使用 sort by,那么还是会视情况启动多个 reducer 进行排序,并且保证每个 reducer 内局部有
    序。为了控制map 端数据分配到 reducer 的 key,往往还要配合 distribute by 一同使用。如果不加
    distribute by 的话,map 端数据就会随机分配到 reducer。

    提供一种方式实现全局排序:两种方式:
    1、建表导入数据准备

    create table if not exists student(id int, name string, sex string, age int,
    department string) row format delimited fields terminated by ",";
    
    load data local inpath "/home/bigdata/students.txt" into table student;
    
    • 1
    • 2
    • 3
    • 4

    2、第一种方式

    -- 直接使用order by来做。如果结果数据量很大,这个任务的执行效率会非常低
    select id,name,age from student order by age desc limit 3;
    
    • 1
    • 2

    3、第二种方式

    -- 使用distribute by + sort by 多个reduceTask,每个reduceTask分别有序
    set mapreduce.job.reduces=3;
    drop table student_orderby_result;
    
    -- 范围分桶 0 < 18 < 1 < 20 < 2
    create table student_orderby_result as select * from student distribute by (case
    when age > 20 then 0 when age < 18 then 2 else 1 end) sort by (age desc);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    关于分界值的确定,使用采样的方式,来估计数据分布规律。

    15. Count Distinct优化

    当要统计某一列去重数时,如果数据量很大,count(distinct) 就会非常慢,原因与 order by 类似,
    count(distinct) 逻辑只会有很少的 reducer 来处理。这时可以用 group by 来改写:

    -- 先 group by 在 count
    select count(1) from (
    select age from student
    where department >= "MA"
    group by age
    ) t;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    再来一个例子:
    优化前 ,一个普通的只使用一个reduceTask来进行count(distinct) 操作

    -- 优化前(只有一个reduce,先去重再count负担比较大):
    select count(distinct id) from tablename;
    
    • 1
    • 2

    优化后 ,但是这样写会启动两个MR job(单纯 distinct 只会启动一个),所以要确保数据量大到启动
    job 的 overhead 远小于计算耗时,才考虑这种方法。当数据集很小或者 key 的倾斜比较明显时,
    group by 还可能会比 distinct 慢。

    优化后 ,但是这样写会启动两个MR job(单纯 distinct 只会启动一个),所以要确保数据量大到启动
    job 的 overhead 远小于计算耗时,才考虑这种方法。当数据集很小或者 key 的倾斜比较明显时,
    group by 还可能会比 distinct 慢。
    
    • 1
    • 2
    • 3

    select t.a, count(t.b) , sum(t.c) from t group by t.a;
    select t.a, count(distinct t.b, t.c) from t group by t.a;

    16. 怎样写in/exists语句

    在Hive的早期版本中,in/exists语法是不被支持的,但是从 hive-0.8x 以后就开始支持这个语法。但是
    不推荐使用这个语法。虽然经过测验,Hive-2.3.6 也支持 in/exists 操作,但还是推荐使用 Hive 的一个高效替代方案:left semi join
    比如说:

    -- in / exists 实现
    select a.id, a.name from a where a.id in (select b.id from b);
    select a.id, a.name from a where exists (select id from b where a.id = b.id);
    
    • 1
    • 2
    • 3

    可以使用join来改写:

    select a.id, a.namr from a join b on a.id = b.id;
    
    • 1

    应该转换成:

    -- left semi join 实现
    select a.id, a.name from a left semi join b on a.id = b.id;
    
    • 1
    • 2

    17. 使用 vectorization 技术

    在计算类似 scan, filter, aggregation 的时候, vectorization 技术以设置批处理的增量大小为 1024 行
    单次来达到比单条记录单次获得更高的效率

    set hive.vectorized.execution.enabled=true ;
    set hive.vectorized.execution.reduce.enabled=true;
    
    • 1
    • 2

    18. 多重模式

    如果你碰到一堆SQL,并且这一堆SQL的模式还一样。都是从同一个表进行扫描,做不同的逻辑。
    有可优化的地方:如果有n条SQL,每个SQL执行都会扫描一次这张表。
    
    • 1
    • 2

    如果一个 HQL 底层要执行 10 个 Job,那么能优化成 8 个一般来说,肯定能有所提高,多重插入就是一个非常实用的技能。一次读取,多次插入,有些场景是从一张表读取数据后,要多次利用,这时可以使用 multi insert 语法:

    from sale_detail
    insert overwrite table sale_detail_multi partition (sale_date='2021',
    region='china' )
    select shop_name, customer_id, total_price where .....
    insert overwrite table sale_detail_multi partition (sale_date='2022',
    region='china' )
    select shop_name, customer_id, total_price where .....;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    说明:multi insert语法有一些限制。

    1、一般情况下,单个SQL中最多可以写128路输出,超过128路,则报语法错误。
    2、在一个multi insert中:
    对于分区表,同一个目标分区不允许出现多次。
    对于未分区表,该表不能出现多次。
    3、对于同一张分区表的不同分区,不能同时有insert overwrite和insert into操作,否则报错返回。
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Multi-Group by 是 Hive 的一个非常好的特性,它使得 Hive 中利用中间结果变得非常方便。例如:

    FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b
    ON (a.userid = b.userid and a.ds='2019-03-20' )) subq1
    INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2019-03-20')
    SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
    INSERT OVERWRITE TABLE school_summary PARTITION(ds='2019-03-20')
    SELECT subq1.school, COUNT(1) GROUP BY subq1.school;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    上述查询语句使用了 Multi-Group by 特性连续 group by 了 2 次数据,使用不同的 Multi-Group by。
    这一特性可以减少一次 MapReduce 操作。

    19. 启动中间结果压缩

    map 输出压缩

    set mapreduce.map.output.compress=true;
    set
    mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
    
    • 1
    • 2
    • 3

    中间数据压缩

    中间数据压缩就是对 hive 查询的多个 Job 之间的数据进行压缩。最好是选择一个节省CPU耗时的压缩方式。可以采用 snappy 压缩算法,该算法的压缩和解压效率都非常高。

    set hive.exec.compress.intermediate=true;
    set
    hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
    set hive.intermediate.compression.type=BLOCK;
    
    • 1
    • 2
    • 3
    • 4

    结果数据压缩

    最终的结果数据(Reducer输出数据)也是可以进行压缩的,可以选择一个压缩效果比较好的,可以减少数据的大小和数据的磁盘读写时间; 注:常用的 gzip,snappy 压缩算法是不支持并行处理的,如果数据源是 gzip/snappy压缩文件大文件,这样只会有有个 mapper 来处理这个文件,会严重影响查询效率。 所以如果结果数据需要作为其他查询任务的数据源,可以选择支持 splitable 的 LZO 算法,这样既能对结果文件进行压缩,还可以并行的处理,这样就可以大大的提高 job 执行的速度了。

    set hive.exec.compress.output=true;
    set mapreduce.output.fileoutputformat.compress=true;
    set
    mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.G
    zipCodec;
    set mapreduce.output.fileoutputformat.compress.type=BLOCK;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Hadoop集群支持的压缩算法:

    org.apache.hadoop.io.compress.DefaultCodec
    org.apache.hadoop.io.compress.GzipCodec
    org.apache.hadoop.io.compress.BZip2Codec
    org.apache.hadoop.io.compress.DeflateCodec
    org.apache.hadoop.io.compress.SnappyCodec
    org.apache.hadoop.io.compress.Lz4Codec
    com.hadoop.compression.lzo.LzoCodec
    com.hadoop.compression.lzo.LzopCodec
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • 相关阅读:
    (附源码)springboot苔藓植物科普网站 毕业设计 345641
    使用bedtools进行gwas基因注释
    Git和Gitee的区别
    【JavaWeb】Servlet属性设置
    消息队列 RabbitMQ入门:Linux(Docker)中安装和卸载RabbitMQ服务
    ISP 基础知识储备
    mysql和redis的同步
    javascript回调函数有什么用
    python3读取yaml文件
    【计算机组成原理】怎样判断补码乘法是否溢出
  • 原文地址:https://blog.csdn.net/qq_31557939/article/details/126114139