• HIVE优化和数据倾斜、合并小文件


    执行计划(explain)

    EXPLAIN [EXTENDED(详细) | DEPENDENCY | AUTHORIZATION] query

    没有生成MR任务的explain

    hive (default)> explain select * from emp;
    Explain
    STAGE DEPENDENCIES:
      Stage-0 is a root stage
    
    STAGE PLANS:
      Stage: Stage-0
        Fetch Operator
          limit: -1
          Processor Tree:
            TableScan
              alias: emp
              Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: empno (type: int), ename (type: string), job (type: string), mgr (type: int), hiredate (type: string), sal (type: double), comm (type: double), deptno (type: int)
                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
                Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
                ListSink
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    生成MR任务的explain

    hive (default)> explain select deptno, avg(sal) avg_sal from emp group by deptno;
    Explain
    STAGE DEPENDENCIES:
      Stage-1 is a root stage
      Stage-0 depends on stages: Stage-1
    STAGE PLANS:
      Stage: Stage-1
        Map Reduce
          Map Operator Tree:
              TableScan
                alias: emp
                Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  expressions: sal (type: double), deptno (type: int)
                  outputColumnNames: sal, deptno
                  Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
                  Group By Operator
                    aggregations: sum(sal), count(sal)
                    keys: deptno (type: int)
                    mode: hash
                    outputColumnNames: _col0, _col1, _col2
                    Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
                    Reduce Output Operator
                      key expressions: _col0 (type: int)
                      sort order: +
                      Map-reduce partition columns: _col0 (type: int)
                      Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
                      value expressions: _col1 (type: double), _col2 (type: bigint)
          Execution mode: vectorized
          Reduce Operator Tree:
            Group By Operator
              aggregations: sum(VALUE._col0), count(VALUE._col1)
              keys: KEY._col0 (type: int)
              mode: mergepartial
              outputColumnNames: _col0, _col1, _col2
              Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: _col0 (type: int), (_col1 / _col2) (type: double)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
                File Output Operator
                  compressed: false
                  Statistics: Num rows: 1 Data size: 7020 Basic stats: COMPLETE Column stats: NONE
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
    
      Stage: Stage-0
        Fetch Operator
          limit: -1
          Processor Tree:
            ListSink
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    Fetch 抓取

    fetch抓取是指hive中对某些情况的查询可以不必使用mapreduce计算,例如select * from emp; 这种情况下hive可以简单读取emp对应的存储目录下的文件,然后输出结果到控制台。

    在hive-default.xml.template文件中hive.fetch.task.conversion默认为more,老版本为minimal,修改为more后,在全局查找、字段查找、limit查找都不走mapreduce。

    本地模式

    hive的执行通常会触发整个hadoop集群的MR启动。有时候hive的输入量很小,这种情况下为了查询触发MR任务消耗的时候会比实际job的执行时间会多的多。
    对大多数情况下,hive通过本地模式单机上处理所有的任务,对于小数据集,执行时间可以被明显缩短。

    可以设置hive.exec.model.local.auto为true来启动这个优化。 同时设置set
    hive.exec.mode.local.auto.inputbytes.max=50000000; 当输入量小于这个值的时候,启动单机。
    或者设置set
    hive.exec.mode.local.auto.input.files.max=10;当输入最大文件数小于这个数的时候,启动单机。

    小表join大表(MapJoin)

    旧版本:将key相对分散、且数据量小的表放在join左边,可以有效减少内存溢出错误的发生。然后再进一步使用map join 让小的维度表(小于1000条数据)先在map端完成join操作。
    新版本:对小表join大表和大表join小表做了优化,小表放在左右两边都行。

    开启mapjoin:set hive.auto.convert.join=true;默认是true。

    mapjoin工作机制:
    1.local task a 扫描小表,将数据转换为hash table写入到本地哈希文件中,然后等到下一阶段mr任务启动的时候,加入到distribute cache中缓存。
    2.task b 是一个没有reduce的MR,启动MR,在map阶段 task扫描大表b,然后用大表b的每一条数据去跟distribute cache进行join,直接输出结果。
    3.因为mapjoin没有reduce,由map端直接输出结果文件,所以由多少个task就有多少结果文件。

    大表join大表

    大表join大表 有时候join超时是因为大表中某些key数据太多了,相同的key发送到相同的reducer中,导致内存不足。
    很多情况下,这些key对应的都是异常数据,我们需要在sql中进行过滤或者转换。

    空值异常key需要过滤

    如果空值是异常数据的话,需要过滤
    join前加where xx is not null

    空值非异常需要转换

    有时候某个key空值很多,但是相对应的数据不是异常数据,需要要包含在join中,这时可以将表a的key为空的字段赋予一个随机的值,使得数据随即均匀的落到不同reducer中。
    select a.* from tablea a full join bigtableb b on nvl(n.id,rand()) = b.id;

    Group by 优化

    默认情况下,map阶段的同一key值数据发送到同一个reducer中,当一个key数据过大时就出现数据倾斜问题。
    但是并不是所有的聚合操作都必须在reducer端处理完成,很多聚合操作可以在map端进行部份聚合,然后在reduecr得出最后结果。
    设置:

    set hive.map.aggr = true; 是否在map端聚合
    set hive.groupby.mappper.在map端聚合的数据条数
    set hive.groupby.skewindata =true;有数据倾斜的时候开启负载均衡。

    负载均衡group by 流程:
    当选项为true的时候,生成的查询计划会有两个MR job。第一个job中,Map的输出结果会随机分配到reduce中,每个reduce做部分聚合,并输出结果。
    这样的好处是相同的group by key 极有可能分发到不同的reduce中。达到负载均衡的目的。
    第二个job再根据预处理的结果按照group by key 分布到reduce中,这个过程保证相同的group by key可以分布到同一个reduce,完成最终的聚合操作。
    代码:

    set hive.groupby.skewindata = true;
    select deptno from emp group by deptno;
    
    • 1
    • 2

    笛卡尔积

    尽量避免笛卡尔积,join的时候不加on条件或者无效的on条件,hive只能用1个reducer处理笛卡尔积。

    行列过滤

    列处理:只拿需要的列,尽量分区使用,少用select *
    行处理:先子查询,再关联表。避免全表关联。。

     select b.id from bigtable b
    join (select id from bigtable where id <= 10 ) o on b.id = o.id;
    
    • 1
    • 2

    合理设置map和reduce数

    hive的作业通常会通过input的目录产生一个或者多个map任务
    所以主要的决定因素有:input的文件总个数,input的文件大小,集群设置块block的大小。

    是不是map数越多就越好

    答案是否定的,一个任务如果有很多小文件(远远小于block大小128M),则每个小文件都会被当成一个block,用一个mapper进行处理,而一个map的启动和初始化的时间远远大于逻辑处理时间,就会造成很大的资源浪费,而且执行的mapper数是受限的。

    是不是每个mapper 处理接近128M的文件块,就可以高枕无忧了。

    不一定,如果一个文件大小接近128M,但是这个文件只有一两个字段,有几千万行的数据。正常用一个map去完成,如果map处理的逻辑复杂,用一个map处理也会耗时很多、

    合理设置mapper数

    复杂文件增加mapper数

    input的文件很大时,任务逻辑复杂,map执行效率缓慢时候,可以考虑增加mapper数,使得每个mapper处理的数据量减少,提高任务效率。
    增加mapper量方法:

    > 根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,调整maxSize最大值就行,让maxsize最大值小于blocksize就能增加mapper数
    > 例如: 
    > set mapreduce.input.fileinputformat.split.maxsize=100; select count(*) from emp; 
    Hadoop job information for Stage-1: number of mappers: 6; number of reducers:
    
    • 1
    • 2
    • 3
    • 4

    减少mapper数(小文件合并)

    1、在map执行之前将小文件合并,减少mapper数。combineHiceInputFormat具有对小文件合并的功能,hiveinputformat没有该功能。
    例如

    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    
    • 1

    2、在mapReduce任务结束后合并小文件的设置:

    在map-only任务结束时合并小文件,默认是true
    SET hive.merge.mapfiles = true;
    在map-reduce任务结束时合并小文件,默认是false
    SET hive.merge.mapredfiles = true;
    合并文件大小,默认256M
    SET hive.merge.size.per.task = 268435456;
    当输出文件平均值小于该大小的时候,启动一个独立的map-reduce任务进行合并
    SET hive.merge.smallfiles.avgsize = 16777216;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    合理设置reducer数

    过多的启动和初始化reducer也会耗费很多时间和资源。
    且过多的reduce生产,输出的文件也多很多,如果这些输出文件作为下一个任务的输入,也会出现小文件过多的问题。
    但是如果reducer数很少,任务逻辑复杂的话,会很耗费时间,所以合理的设置reducer数也很重要。在设置reducer数的时候,要考虑两个原则:处理大数据量利用合适的reducer数 和使得单个reduce任务处理的数据量大小也要合适。
    设置reducer量方法:

    1、在hadoop的mapred-default.xml文件修改
    set mapreduce.job.reduces = 15;
    2、
    修改每个reducer处理的数据量的大小 ,默认是256M
    hive.exec.reducers.bytes.per.reducer=256000000
    每个任务最大的reduce数,默认是1009
    hive.exec.reducers.max=1009
    计算reducer数的公式,这个条件使用的前提是 mapreduce.job.reduces=-1 就是我们没有人为设置reducer数量,让hive自己推断
    N=min(参数2,总输入数据量/参数1)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    并行执行

    hive在一个查询的时候会转化成一个或者多个阶段,可能是map-reduce阶段,抽样阶段,合并阶段,limit阶段或者其他阶段。默认情况下hive一次只会执行一个阶段。
    可能存在某个job包含多个阶段,这些阶段并非相互依赖,就是说有些阶段是可以并行执行的。使得整个job的执行时间缩短。
    设置:

    hive.exec.parallel为true就可以并行执行,但是job并行执行阶段比较多的情况下,集群利用率就会增加,如果没资源,并行也不会并起来。
    set hive.exec.parallel=true;              //打开任务并行执行
    set hive.exec.parallel.thread.number=16;  //同一个sql允许最大并行度,默认为8。
    
    • 1
    • 2
    • 3

    严格模式

    hive可以设置严格模式防止一些危险操作。
    1、分区表不使用分区过滤。
    hive.strict.checks.no.partition.filter设置为true时,对分区表,如果where语句没有限制分区条件限制范围,是不允许执行语句的。
    开启这个通常是因为分区表有很大的数据集,且数据增加迅速,如果不限制那么查询消耗的资源非常多。
    2、使用order by没有limit过滤
    将hive.strict.checks.orderby.no.limit设置为true时,对使用了order by 语句的查询,要求必须使用limit。
    order by操作为了执行排序会将所有结果发送到同一个reducer中处理,要求增加limit反正reducer执行过程消耗时间过长。
    3、笛卡尔积
    将hive.strict.checks.cartesian.product设置为true时,会限制笛卡尔积的查询。
    笛卡尔积出现的时候,如果表大,这个查询会出现不可控的情况。

    影响效率的语句(SQL优化)

    union

    尽量不要使用union(union会去重复),先union all 再group by 去重

    Count(distinct) 去重统计

    如果是大数据量的话,count(distinct) 会触发一个reduce task完成count(distinct),这一个reduce处理的数据量太大了,导致整个job很难完成。
    一般count(distinct)使用group by 再count()的方式替换。但是要注意group by 的数据倾斜。(会多起一个job,但是在大数据量的时候,是比原来快)
    select count(id) from (select id from bigtable group by id) a;

    用in 代替join

    如果要用另一张表的字段限制某张表的字段,用in比用join 快

    select id,name
    from table1 a 
    join table2 b 
    on a.id = b.id ;
    
    • 1
    • 2
    • 3
    • 4

    用in 代替

    select id, name
    from table1 a 
    where id in (select id from table2 b)
    
    • 1
    • 2
    • 3

    left semi join 用来代替in和exists更高效的方法

    select a.id,a.name
    from table1 a
    left semi join table2 b 
    on a.id = b.id 
    
    • 1
    • 2
    • 3
    • 4

    限制:
    1、left semi join 右边的表只能在on处设置过滤条件,在where 和select 都不能过滤
    2、left semi join 只是传递表的join key给map阶段,所以left semi join 结果只能出现左表。
    3、left semi join 右表遇到重复记录时候,会选择跳过,性能会更高。

    jvm重用

    让jvm在一个map中重新使用多次

    数据倾斜

    数据倾斜原因:1、task中需要处理大量相同key 的数据,大量相同key分发给1个reduce,出现倾斜。2、任务读取不可分割的大文件,只能被1以map读取,出现map阶段的倾斜。

    key类型

    空值引发数据倾斜

    实际业务中有大量的空值或无意义的数据参与计算中,表中大量的null值,如果表join操作,必定会引发shuffle产生,使得大量空值在分配给1个reduce中。
    注意:虽然一个空值一个不为空,join不上,但是shuffle阶段的hash操作时,key的hash结果都是一样的,都会被分发1个reduce中。
    解决方案:
    1、不让nul值参与计算,然后再把空值union回

    SELECT *
    FROM log a
     JOIN users b
     ON a.user_id IS NOT NULL
      AND a.user_id = b.user_id
    UNION ALL
    SELECT *
    FROM log a
    WHERE a.user_id IS NULL;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2、给空值null赋值随机值,这样hash结果不一样,就会进入到不同的reduce中。

    SELECT *
    FROM log a
     LEFT JOIN users b ON CASE 
       WHEN a.user_id IS NULL THEN concat('hive_', rand())
       ELSE a.user_id
      END = b.user_id;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    不同数据类型引发数据倾斜

    两个表join,表a的key为int,表b的key是string,当按key去join的时候,默认的hash操作会按int类型的id进行分配,然后string类型的id都会被分配到同一个id,从而进入到同一个redce数据倾斜、
    解决方案:将string类型转换为int类型

    join 业务key倾斜

    如果业务数据本身存在热点key,即高频访问key这样的特性,key本身就分布不均匀,那么mr join操作的时候必然会引起数据倾斜。

    解决方案:
    默认情况下,map阶段的同一key值数据发送到同一个reducer中,当一个key数据过大时就出现数据倾斜问题。
    但是并不是所有的聚合操作都必须在reducer端处理完成,很多聚合操作可以在map端进行部份聚合,然后在reduecr得出最后结果。

    设置:
    set hive.map.aggr = true; 是否在map端聚合
    set hive.groupby.mappper.在map端聚合的数据条数
    set hive.groupby.skewindata =true;有数据倾斜的时候开启负载均衡。
    
    • 1
    • 2
    • 3
    • 4

    负载均衡group by 流程:
    当选项为true的时候,生成的查询计划会有两个MR job。第一个job中,Map的输出结果会随机分配到reduce中,每个reduce做部分聚合,并输出结果。
    这样的好处是相同的group by key 极有可能分发到不同的reduce中。达到负载均衡的目的。
    第二个job再根据预处理的结果按照group by key 分布到reduce中,这个过程保证相同的group by key可以分布到同一个reduce,完成最终的聚合操作。
    代码:

    set hive.groupby.skewindata = true;
    select deptno from emp group by deptno;
    
    • 1
    • 2

    不可切分文件引发数据倾斜

    像GZIP压缩的文件是不可被切分的,只会被一个map读取
    解决方案:将GZIP等不可切分的压缩文件转换为可切分的bzip2或者zip等可切分算法。

    小文件合并

    产生小文件方式:

    1、直接插入,insert into table value ,一次插入一个文件
    2、load插入数据,load一个文件,hive就一个文件,load一个文件夹,hive文件就是文件夹里面的文件
    3、基于查询加载数据insert overwrite table xx select xx :导入数据的时候会启动MR程序,有多少个reduce就有多少个文件
    文件数计算公式:
    非分区表:文件数=reduce数
    分区表:文件数=reduce数*分区数

    小文件过多影响

    1、小文件过多会导致 存储小文件的hdfs中的namanode元数据特别大,占用太多内存,影响hdfs性能。
    2、对于hive说,查询会被转换为MR程序计算,一个小文件都会被当初一个块然后启动一个MR,每启动一个MR任务的启动时间和初始化时间都远远大于逻辑处理时间,会造成大量资源浪费。

    解决小文件过多

    1、使用hive自带的concate命令,合并小文件

    对于非分区表
    alter table A concatenate;
    对于分区表
    alter table B partition(day=20201224) concatenate;
    注意:concatenate 命令只支持 RCFILE 和 ORC 文件类型。 
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、调整参数 ,参考合理设置map和reduce数
    3、使用hadoop的archive将小文件归档
    Hadoop Archive简称HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能将小文件打包成一个HAR文件,减少namenode的内存占用的同时,仍然允许文件的访问。

    用来控制归档是否可用
    set hive.archive.enabled=true;
    通知Hive在创建归档时是否可以设置父目录
    set hive.archive.har.parentdir.settable=true;
    控制需要归档文件的大小
    set har.partfile.size=1099511627776;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    使用以下命令进行归档
    ALTER TABLE A ARCHIVE PARTITION(dt='2020-12-24', hr='12');
    对已归档的分区恢复为原文件
    ALTER TABLE A UNARCHIVE PARTITION(dt='2020-12-24', hr='12');
    
    • 1
    • 2
    • 3
    • 4

    如果是新集群

    如果是新集群,没有遗留文件,建议hive用orc文件格式,启用izo压缩,这样小文件过多可以使用hive自带命令concatenate快速合并。

    问做过hive优化

    答:主要是一些数据倾斜优化、小文件处理优化和语句优化以及一些设置
    数据倾斜的优化:包括key倾斜和读取不可切分文件倾斜,其中key倾斜包括空值倾斜和key数据类型倾斜和业务key倾斜,
    小文件合并优化
    sql语句优化像union all+group by 代替union,group by +count 代替count(distinct),in 代替join,left semi join 代替 in existi,以及行列过滤避免使用select *,
    以及一些参数设置,像开启严格模式(禁止分区表全表查询,order by 不加limit,笛卡尔积),或者并行执行(非依赖阶段可以并行执行),本地模式(hive通过本地模式单机上处理所有的任务),fetch抓取(在全局查找、字段查找、limit查找都不走mapreduce)等

  • 相关阅读:
    AlexNet——训练花数据集
    react+taro的复制功能
    NBA体育决策和数据挖掘分析
    基于springboot的房产销售系统
    上海交航带动内河运输发展
    产品思维训练 | 如何有效提高问答网站中的问题回复率?
    重建恐龙化石,摄影测量在古生物学中有怎样的意义?
    sqlserver 解析导入xml
    纷享销客罗旭对话旷视唐文斌:数字化的AI革命之路
    JESD204B时钟网络
  • 原文地址:https://blog.csdn.net/weixin_43859562/article/details/126232066