Hive on Tez - Apache Hive - Apache Software Foundation
简单来说 hive是根本,执行引擎目前用过spark 和mr,现在是tez。
一般来说mr 有点拉跨主要基于磁盘,spark是基于内存计算,通过spark划分宽窄依赖并且形成dag图,然后执行。其实tez和spark本身是差不多的 只不过可能底层思想不一样,现在来学习。
Whenever a query has multiple reduce sinks (that cannot be combined, i.e.: no correlation between the partition keys), Hive will break the plan apart and submit one MR job per sink. All of the MR jobs in this chain need to be scheduled one-by-one and each one has to re-read the output of the previous job from HDFS and shuffle it. In Tez several reduce sinks can be linked directly and data can be pipelined without the need of temporary HDFS files. This pattern is referred to as MRR (Map - reduce - reduce*).
就是将以前的map->reduce->map->reduce 简化为map-reduce-reduce
More than just MRR, Tez allows for sending the entire query plan at once thus enabling the framework to allocate resources more intelligently as well as pipelining data through the various stages. This is a huge improvement for more complicated queries as it eliminates IO/sync barriers and scheduling overhead between individual stages. An example would be a query that aggregates two tables in subqueries in the from clause and joins the resulting relations.
一次性发送整个查询计划,并且智能的分配好资源。
减少各个阶段之间的IO/同步障碍和调度开销。例如,一个查询聚合了from子句中的子查询中的两个表,并联接了结果关系。 这里说实话不是特别懂,好像就是说是把子查询子查询里的聚合结果先查出来了。
Currently any shuffle is performed the same way regardless of the data size. Sorted partitions are written to disk, pulled by the reducers, merge-sorted and then fed into the reducers. Tez allows for small datasets to be handled entirely in memory, while no such optimization is available in map-reduce. Many warehousing queries sort or aggregate small datasets after the heavy lifting is done. These would benefit from an in memory shuffle.
根据数据大小的不同采用不同的方式,小数据直接放到内存里。
Distributed join algorithms are difficult to express in map-reduce. A regular shuffle join for instance has to process different inputs in the same map task, use tags to be written to disk for distinguishing tables, use secondary sort to get the rows from each table in a predictable order, etc. Tez is a much more natural platform to implement these algorithms.
For example: It is possible to have one Tez task take multiple bipartite edges as input thus exposing the input relations directly to the join implementation. The case where multiple tasks feed into the same shuffle join task will be referred to as multi-parent shuffle join.
All sorting in map-reduce happens using the same binary sort, regardless of the data type. Hive might for instance choose to use a more effective integer-only sort when possible. Tez makes that available.
Since Hive uses map-reduce to compute aggregations, processing will always boil down to a sort-merge even though we’re not actually interested in the sort order. Tez will allow for more efficient hash-based algorithms to do the same.
Tez allows complete control over the processing, including being able to stop processing when limits are met (without simply skipping records or relying on file formats/input formats.) It’s also possible to define specific edge semantics, which could be used to provide a generic top-k edge to simplify “limit” processing.
说了这么多就是牛b好了。
直接上案例。
select
i_item_desc
,i_category
,i_class
,i_current_price
,i_item_id
,itemrevenue
,itemrevenue*100/sum(itemrevenue) over (partition by i_class) as revenueratio
from
(select
i_item_desc
,i_category
,i_class
,i_current_price
,i_item_id
,sum(ws_ext_sales_price) as itemrevenue
from
web_sales
join item on (web_sales.ws_item_sk = item.i_item_sk)
join date_dim on (web_sales.ws_sold_date_sk = date_dim.d_date_sk)
where
i_category in ('1', '2', '3')
and year(d_date) = 2001 and month(d_date) = 10
group by
i_item_id
,i_item_desc
,i_category
,i_class
,i_current_price) tmp
order by
i_category
,i_class
,i_item_id
,i_item_desc
,revenueratio;
这个sql 也不复杂 三表内连接后过滤where 然后groupby 的结果数据 select+个window
Plan with TEZ
Stage 0:
Local Work: Generate hash table for date dim
Stage 1:
Map: SMB join item + web_sales, mapjoin date_dim + web_sales, map-side group by/aggregate
Reduce 1: Reduce side group by/aggregate, shuffle for windowing
Reduce 2: Compute windowing function, shuffle for order by
Reduce 3: Order by, write to HDFS
Plan without TEZ
Local Work: Generate hash table for date dim
Stage 1:
Map: SMB join item + web_sales, mapjoin date_dim + web_sales, map-side group by/aggregate
Reduce: Reduce side group by/aggregate, write to HDFS
Stage 2:
Map: Read tmp file, shuffle for windowing
Reduce: Compute windowing function, write to HDFS
Stage 3:
Map: Read tmp file, shuffle for order by
Reduce: Order by, write to HDFS
注意看without Tez这里多了2个write to hdfs。一个是子查询join的write 一个是window的write
——————————————————————————————————————————
直接实战。直接就是参数调优。
hive on tez有哪些参数。
Configuration Properties - Apache Hive - Apache Software Foundation
Apache Tez was added in Hive 0.13.0 (HIVE-4660 and HIVE-6098). For information see the design document Hive on Tez, especially the Installation and Configuration section.
Besides the configuration properties listed in this section, some properties in other sections are also related to Tez: 下面的这些参数也是会影响hive o
hive.execution.engine
hive.server2.tez.default.queues
hive.server2.tez.sessions.per.default.queue
hive.server2.tez.initialize.default.sessions
hive.stats.max.variable.length
hive.stats.list.num.entries
hive.stats.map.num.entries
hive.stats.map.parallelism (Hive 0.13 only; removed in Hive 0.14)
hive.stats.join.factor
hive.stats.deserialization.factor
hive.tez.dynamic.semijoin.reduction
hive.tez.min.bloom.filter.entries
hive.tez.max.bloom.filter.entries
hive.tez.bloom.filter.factor
hive.tez.bigtable.minsize.semijoin.reduction
hive.explain.user
hive.jar.directory
Default Value: null
Added In: Hive 0.13.0 with HIVE-5003 and HIVE-6098, default changed in HIVE-6636
This is the location that Hive in Tez mode will look for to find a site-wide installed Hive instance. See hive.user.install.directory for the default behavior.
hive.user.install.directory
Default Value: hdfs:///user/
Added In: Hive 0.13.0 with HIVE-5003 and HIVE-6098
If Hive (in Tez mode only) cannot find a usable Hive jar in hive.jar.directory, it will upload the Hive jar to
hive.compute.splits.in.am
Default Value: true
Added In: Hive 0.13.0 with HIVE-5522 and HIVE-6098
Whether to generate the splits locally or in the ApplicationMaster (Tez only).
hive.rpc.query.plan
Default Value: false
Added In: Hive 0.13.0 with HIVE-5522 and HIVE-6098
Whether to send the query plan via local resource or RPC.
hive.prewarm.enabled --这个看不太懂不过好像和container有关。
Default Value: false
Added In: Hive 0.13.0 with HIVE-6391 and HIVE-6360
Enables container prewarm for Tez (0.13.0 to 1.2.x) or Tez/Spark (1.3.0+). This is for Hadoop 2 only.
hive.prewarm.numcontainers
Default Value: 10
Added In: Hive 0.13.0 with HIVE-6391 and HIVE-6360
Controls the number of containers to prewarm for Tez (0.13.0 to 1.2.x) or Tez/Spark (1.3.0+). This is for Hadoop 2 only.
hive.merge.tezfiles --这个是合并小文件的不用说肯定要开启。
Default Value: false
Added In: Hive 0.13.0 with HIVE-6498 and HIVE-6360
Merge small files at the end of a Tez DAG.
hive.tez.input.format --hive读取文件的inputformat
Default Value: org.apache.hadoop.hive.ql.io.HiveInputFormat
Added In: Hive 0.13.0 with HIVE-6498 and HIVE-6360
The default input format for Tez. Tez groups splits in the AM (ApplicationMaster).
hive.tez.input.generate.consistent.splits
Default Value: true
Added In: Hive 2.1.0 with HIVE-9850, HIVE-10104 and HIVE-12078
Whether to generate consistent split locations when generating splits in the AM.
Setting to false randomizes the location and order of splits depending on how threads generate.
Relates to LLAP.
hive.tez.container.size --这种有size的一般都要注意 mapper的container大小
Default Value: -1
Added In: Hive 0.13.0 with HIVE-6498 and HIVE-6360
By default Tez will spawn containers of the size of a mapper. This can be used to overwrite the default.
hive.tez.java.opts
Default Value: (empty)
Added In: Hive 0.13.0 with HIVE-6498 and HIVE-6360
By default Tez will use the Java options from map tasks. This can be used to overwrite the default.
hive.convert.join.bucket.mapjoin.tez --分桶join
Default Value: false
Added In: Hive 0.13.0 with HIVE-6447
Whether joins can be automatically converted to bucket map joins in Hive when Tez is used as the execution engine (hive.execution.engine is set to "tez").
hive.tez.log.level
Default Value: INFO
Added In: Hive 0.13.0 with HIVE-6743
The log level to use for tasks executing as part of the DAG. Used only if hive.tez.java.opts is used to configure Java options.
hive.localize.resource.wait.interval
Default Value: 5000
Added In: Hive 0.13.0 with HIVE-6782
Time in milliseconds to wait for another thread to localize the same resource for Hive-Tez.
hive.localize.resource.num.wait.attempts
Default Value: 5
Added In: Hive 0.13.0 with HIVE-6782
The number of attempts waiting for localizing a resource in Hive-Tez.
hive.tez.smb.number.waves
Default Value: 0.5
Added In: Hive 0.14.0 with HIVE-8409
The number of waves in which to run the SMB (sort-merge-bucket) join. Account for cluster being occupied. Ideally should be 1 wave.
hive.tez.cpu.vcores --这个要注意,这不就是spark的excutor的cores么
Default Value: -1
Added In: Hive 0.14.0 with HIVE-8452
By default Tez will ask for however many CPUs MapReduce is configured to use per container. This can be used to overwrite the default.
hive.tez.auto.reducer.parallelism --reduce并行度是否开启
Default Value: false
Added In: Hive 0.14.0 with HIVE-7158
Turn on Tez' auto reducer parallelism feature. When enabled, Hive will still estimate data sizes and set parallelism estimates. Tez will sample source vertices' output sizes and adjust the estimates at runtime as necessary.
hive.tez.max.partition.factor --reduce并行度开多少个。
Default Value: 2
Added In: Hive 0.14.0 with HIVE-7158
When auto reducer parallelism is enabled this factor will be used to over-partition data in shuffle edges.
hive.tez.min.partition.factor --并行度比例吧。比如100个reduce开启25个。猜的。
Default Value: 0.25
Added In: Hive 0.14.0 with HIVE-7158
When auto reducer parallelism is enabled this factor will be used to put a lower limit to the number of reducers that Tez specifies.
hive.tez.exec.print.summary --打印概况,有点乱用
Default Value: false
Added In: Hive 0.14.0 with HIVE-8495
If true, displays breakdown of execution steps for every query executed on Hive CLI or Beeline client.
hive.tez.exec.inplace.progress
Default Value: true
Added In: Hive 0.14.0 with HIVE-8495
Updates Tez job execution progress in-place in the terminal when Hive CLI is used.
——————————————————————————————————————————
上述都是hive的参数。那么当我们运行hive on tez的时候 tez的参数要不要设置呢?
这个参数是在cdp的tez配置里找到的,但是官网没有找到。。。
tez.am.resource.memory.mb=2048 --The amount of memory to be used by the Application Master. Used only if the value is not specified explicitly by the DAG definition.
tez.task.resource.memory.mb=1536 --The amount of memory to be used by launched tasks. Used only if the value is not specified explicitly by the DAG definition.
还有一些参数具体看 tez源码org.apache.tez.mapreduce.hadoop.MRJobConfig / org.apache.tez.mapreduce.grouper.TezSplitGrouper
tez.grouping.max-size=1073741824 --
tez.grouping.split-waves=1.7 --
/** * The multiplier for available queue capacity when determining number of * tasks for a Vertex. 1.7 with 100% queue available implies generating a * number of tasks roughly equal to 170% of the available containers on the * queue. This enables multiple waves of mappers where the final wave is slightly smaller * than the remaining waves. The gap helps overlap the final wave with any slower tasks * from previous waves and tries to hide the delays from the slower tasks. Good values for * this are 1.7, 2.7, 3.7 etc. Increase the number of waves to make the tasks smaller or * shorter. 这个的意思大概就是多搞几个排队,充分利用资源 */
mapreduce.map.cpu.vcores=1
mapreduce.reduce.cpu.vcores=1
———————————————————————————————————————————
Apps using Tez have the ability to determine the number of tasks reading the initial external data for a job (the number of mappers in MapReduce parlance). Here is a short description of how that works.
First, Tez tries to find out the resource availability in the cluster for these tasks. For that, YARN provides a headroom value (and in future other attributes may be used). Lets say this value is T.
|
Next W is multiplied by a wave factor (from configuration - tez.grouping.split-waves) to determine the number of tasks to be used. Lets say this value is N.
|
If this value is between tez.grouping.max-size & tez.grouping.min-size then N is accepted as the number of tasks. If not, then N is adjusted to bring the data per task in line with the max/min depending on which threshold was crossed.
|
For experimental purposes tez.grouping.split-count can be set in configuration to specify the desired number of groups. If this config is specified then the above logic is ignored and Tez tries to group splits into the specified number of groups. This is best effort.
|