• hive on tez学习之官网和源码


    Hive on Tez - Apache Hive - Apache Software Foundation

    简单来说 hive是根本,执行引擎目前用过spark 和mr,现在是tez。

    一般来说mr 有点拉跨主要基于磁盘,spark是基于内存计算,通过spark划分宽窄依赖并且形成dag图,然后执行。其实tez和spark本身是差不多的 只不过可能底层思想不一样,现在来学习。

    Multiple reduce stages

    Whenever a query has multiple reduce sinks (that cannot be combined, i.e.: no correlation between the partition keys), Hive will break the plan apart and submit one MR job per sink. All of the MR jobs in this chain need to be scheduled one-by-one and each one has to re-read the output of the previous job from HDFS and shuffle it. In Tez several reduce sinks can be linked directly and data can be pipelined without the need of temporary HDFS files. This pattern is referred to as MRR (Map - reduce - reduce*).

    就是将以前的map->reduce->map->reduce 简化为map-reduce-reduce

    Pipelining

    More than just MRR, Tez allows for sending the entire query plan at once thus enabling the framework to allocate resources more intelligently as well as pipelining data through the various stages. This is a huge improvement for more complicated queries as it eliminates IO/sync barriers and scheduling overhead between individual stages. An example would be a query that aggregates two tables in subqueries in the from clause and joins the resulting relations.

    一次性发送整个查询计划,并且智能的分配好资源。

    减少各个阶段之间的IO/同步障碍和调度开销。例如,一个查询聚合了from子句中的子查询中的两个表,并联接了结果关系。 这里说实话不是特别懂,好像就是说是把子查询子查询里的聚合结果先查出来了。

    In memory versus disk writes

    Currently any shuffle is performed the same way regardless of the data size. Sorted partitions are written to disk, pulled by the reducers, merge-sorted and then fed into the reducers. Tez allows for small datasets to be handled entirely in memory, while no such optimization is available in map-reduce. Many warehousing queries sort or aggregate small datasets after the heavy lifting is done. These would benefit from an in memory shuffle.

    根据数据大小的不同采用不同的方式,小数据直接放到内存里。

    Joins

    Distributed join algorithms are difficult to express in map-reduce. A regular shuffle join for instance has to process different inputs in the same map task, use tags to be written to disk for distinguishing tables, use secondary sort to get the rows from each table in a predictable order, etc. Tez is a much more natural platform to implement these algorithms.

    For example: It is possible to have one Tez task take multiple bipartite edges as input thus exposing the input relations directly to the join implementation. The case where multiple tasks feed into the same shuffle join task will be referred to as multi-parent shuffle join.

    Fine-tuned algorithms

    All sorting in map-reduce happens using the same binary sort, regardless of the data type. Hive might for instance choose to use a more effective integer-only sort when possible. Tez makes that available.

    Since Hive uses map-reduce to compute aggregations, processing will always boil down to a sort-merge even though we’re not actually interested in the sort order. Tez will allow for more efficient hash-based algorithms to do the same.

    Limit processing

    Tez allows complete control over the processing, including being able to stop processing when limits are met (without simply skipping records or relying on file formats/input formats.) It’s also possible to define specific edge semantics, which could be used to provide a generic top-k edge to simplify “limit” processing.

    说了这么多就是牛b好了。

    直接上案例。

    select
      i_item_desc
      ,i_category
      ,i_class
      ,i_current_price
      ,i_item_id
      ,itemrevenue
      ,itemrevenue*100/sum(itemrevenue) over (partition by i_class) as revenueratio
    from
      (select
         i_item_desc
         ,i_category
         ,i_class
         ,i_current_price
         ,i_item_id
         ,sum(ws_ext_sales_price) as itemrevenue
       from
         web_sales
         join item on (web_sales.ws_item_sk = item.i_item_sk)
         join date_dim on (web_sales.ws_sold_date_sk = date_dim.d_date_sk)
       where
         i_category in ('1', '2', '3')
         and year(d_date) = 2001 and month(d_date) = 10
       group by
         i_item_id
         ,i_item_desc
         ,i_category
         ,i_class
         ,i_current_price) tmp
    order by
      i_category
      ,i_class
      ,i_item_id
      ,i_item_desc
      ,revenueratio;

    这个sql 也不复杂 三表内连接后过滤where 然后groupby 的结果数据 select+个window

    Plan with TEZ

    Stage 0:

    Local Work: Generate hash table for date dim

    Stage 1:

    Map: SMB join item + web_sales, mapjoin date_dim + web_sales, map-side group by/aggregate

    Reduce 1: Reduce side group by/aggregate, shuffle for windowing

    Reduce 2: Compute windowing function, shuffle for order by

    Reduce 3: Order by, write to HDFS

    Plan without TEZ

    Local Work: Generate hash table for date dim

    Stage 1:

    Map: SMB join item + web_sales, mapjoin date_dim + web_sales, map-side group by/aggregate

    Reduce: Reduce side group by/aggregate, write to HDFS

    Stage 2:

    Map: Read tmp file, shuffle for windowing

    Reduce: Compute windowing function, write to HDFS

    Stage 3:

    Map: Read tmp file, shuffle for order by

    Reduce: Order by, write to HDFS

     注意看without Tez这里多了2个write to hdfs。一个是子查询join的write 一个是window的write

    ——————————————————————————————————————————

    直接实战。直接就是参数调优。

    hive on tez有哪些参数。

    Configuration Properties - Apache Hive - Apache Software Foundation

    Tez

    Apache Tez was added in Hive 0.13.0 (HIVE-4660 and HIVE-6098).  For information see the design document Hive on Tez, especially the Installation and Configuration section.

    Besides the configuration properties listed in this section, some properties in other sections are also related to Tez: 下面的这些参数也是会影响hive o

    hive.execution.engine
    hive.server2.tez.default.queues
    hive.server2.tez.sessions.per.default.queue
    hive.server2.tez.initialize.default.sessions
    hive.stats.max.variable.length
    hive.stats.list.num.entries
    hive.stats.map.num.entries
    hive.stats.map.parallelism (Hive 0.13 only; removed in Hive 0.14)
    hive.stats.join.factor
    hive.stats.deserialization.factor
    hive.tez.dynamic.semijoin.reduction
    hive.tez.min.bloom.filter.entries
    hive.tez.max.bloom.filter.entries
    hive.tez.bloom.filter.factor
    hive.tez.bigtable.minsize.semijoin.reduction
    hive.explain.user


    hive.jar.directory

    Default Value: null
    Added In: Hive 0.13.0 with HIVE-5003 and HIVE-6098, default changed in HIVE-6636
    This is the location that Hive in Tez mode will look for to find a site-wide installed Hive instance.  See hive.user.install.directory for the default behavior.

    hive.user.install.directory

    Default Value: hdfs:///user/
    Added In: Hive 0.13.0 with HIVE-5003 and HIVE-6098
    If Hive (in Tez mode only) cannot find a usable Hive jar in hive.jar.directory, it will upload the Hive jar to / and use it to run queries.

    hive.compute.splits.in.am

    Default Value: true
    Added In: Hive 0.13.0 with HIVE-5522 and HIVE-6098
    Whether to generate the splits locally or in the ApplicationMaster (Tez only).

    hive.rpc.query.plan

    Default Value: false
    Added In: Hive 0.13.0 with HIVE-5522 and HIVE-6098
    Whether to send the query plan via local resource or RPC.

    hive.prewarm.enabled        --这个看不太懂不过好像和container有关。

    Default Value: false
    Added In: Hive 0.13.0 with HIVE-6391 and HIVE-6360
    Enables container prewarm for Tez (0.13.0 to 1.2.x) or Tez/Spark (1.3.0+). This is for Hadoop 2 only.

    hive.prewarm.numcontainers

    Default Value: 10
    Added In: Hive 0.13.0 with HIVE-6391 and HIVE-6360
    Controls the number of containers to prewarm for Tez (0.13.0 to 1.2.x) or Tez/Spark (1.3.0+). This is for Hadoop 2 only.

    hive.merge.tezfiles        --这个是合并小文件的不用说肯定要开启。

    Default Value: false
    Added In: Hive 0.13.0 with HIVE-6498 and HIVE-6360
    Merge small files at the end of a Tez DAG.

    hive.tez.input.format        --hive读取文件的inputformat

    Default Value: org.apache.hadoop.hive.ql.io.HiveInputFormat
    Added In: Hive 0.13.0 with HIVE-6498 and HIVE-6360
    The default input format for Tez. Tez groups splits in the AM (ApplicationMaster).

    hive.tez.input.generate.consistent.splits

    Default Value: true
    Added In: Hive 2.1.0 with HIVE-9850,  HIVE-10104 and HIVE-12078
    Whether to generate consistent split locations when generating splits in the AM. 

    Setting to false randomizes the location and order of splits depending on how threads generate.

    Relates to LLAP.

    hive.tez.container.size        --这种有size的一般都要注意 mapper的container大小

    Default Value: -1
    Added In: Hive 0.13.0 with HIVE-6498 and HIVE-6360
    By default Tez will spawn containers of the size of a mapper. This can be used to overwrite the default.

    hive.tez.java.opts

    Default Value: (empty)
    Added In: Hive 0.13.0 with HIVE-6498 and HIVE-6360
    By default Tez will use the Java options from map tasks. This can be used to overwrite the default.

    hive.convert.join.bucket.mapjoin.tez        --分桶join

    Default Value: false
    Added In: Hive 0.13.0 with HIVE-6447
    Whether joins can be automatically converted to bucket map joins in Hive when Tez is used as the execution engine (hive.execution.engine is set to "tez").

    hive.tez.log.level

    Default Value: INFO
    Added In: Hive 0.13.0 with HIVE-6743
    The log level to use for tasks executing as part of the DAG. Used only if hive.tez.java.opts is used to configure Java options.

    hive.localize.resource.wait.interval

    Default Value: 5000
    Added In: Hive 0.13.0 with HIVE-6782
    Time in milliseconds to wait for another thread to localize the same resource for Hive-Tez.

    hive.localize.resource.num.wait.attempts

    Default Value: 5
    Added In: Hive 0.13.0 with HIVE-6782
    The number of attempts waiting for localizing a resource in Hive-Tez.

    hive.tez.smb.number.waves

    Default Value: 0.5
    Added In: Hive 0.14.0 with HIVE-8409
    The number of waves in which to run the SMB (sort-merge-bucket) join. Account for cluster being occupied. Ideally should be 1 wave.

    hive.tez.cpu.vcores        --这个要注意,这不就是spark的excutor的cores么

    Default Value: -1
    Added In: Hive 0.14.0 with HIVE-8452
    By default Tez will ask for however many CPUs MapReduce is configured to use per container. This can be used to overwrite the default.

    hive.tez.auto.reducer.parallelism        --reduce并行度是否开启

    Default Value: false
    Added In: Hive 0.14.0 with HIVE-7158
    Turn on Tez' auto reducer parallelism feature. When enabled, Hive will still estimate data sizes and set parallelism estimates. Tez will sample source vertices' output sizes and adjust the estimates at runtime as necessary.

    hive.tez.max.partition.factor        --reduce并行度开多少个。

    Default Value: 2
    Added In: Hive 0.14.0 with HIVE-7158
    When auto reducer parallelism is enabled this factor will be used to over-partition data in shuffle edges.

    hive.tez.min.partition.factor        --并行度比例吧。比如100个reduce开启25个。猜的。

    Default Value: 0.25
    Added In: Hive 0.14.0 with HIVE-7158
    When auto reducer parallelism is enabled this factor will be used to put a lower limit to the number of reducers that Tez specifies.

    hive.tez.exec.print.summary        --打印概况,有点乱用

    Default Value: false
    Added In: Hive 0.14.0 with HIVE-8495
    If true, displays breakdown of execution steps for every query executed on Hive CLI or Beeline client.

    hive.tez.exec.inplace.progress

    Default Value: true
    Added In: Hive 0.14.0 with HIVE-8495
    Updates Tez job execution progress in-place in the terminal when Hive CLI is used.

    ——————————————————————————————————————————

    上述都是hive的参数。那么当我们运行hive on tez的时候 tez的参数要不要设置呢?

    这个参数是在cdp的tez配置里找到的,但是官网没有找到。。。

    tez.am.resource.memory.mb=2048        --The amount of memory to be used by the Application Master. Used only if the value is not specified explicitly by the DAG definition.

    tez.task.resource.memory.mb=1536  --The amount of memory to be used by launched tasks. Used only if the value is not specified explicitly by the DAG definition.

    还有一些参数具体看 tez源码org.apache.tez.mapreduce.hadoop.MRJobConfig / org.apache.tez.mapreduce.grouper.TezSplitGrouper

    tez.grouping.max-size=1073741824      --

    tez.grouping.split-waves=1.7        --

    /**
     * The multiplier for available queue capacity when determining number of
     * tasks for a Vertex. 1.7 with 100% queue available implies generating a
     * number of tasks roughly equal to 170% of the available containers on the
     * queue. This enables multiple waves of mappers where the final wave is slightly smaller
     * than the remaining waves. The gap helps overlap the final wave with any slower tasks
     * from previous waves and tries to hide the delays from the slower tasks. Good values for
     * this are 1.7, 2.7, 3.7 etc. Increase the number of waves to make the tasks smaller or
     * shorter.
    这个的意思大概就是多搞几个排队,充分利用资源
     */
    mapreduce.map.cpu.vcores=1
    mapreduce.reduce.cpu.vcores=1

    ​———————————————————————————————————————————

    ​​​​​​How initial task parallelism works

      • Apps using Tez have the ability to determine the number of tasks reading the initial external data for a job (the number of mappers in MapReduce parlance). Here is a short description of how that works.

      • There is an InputInitializer specified for the initial vertex reading the external input. During vertex initialization, the InputInitializer is invoked to determine the number of shards of the external data distributed across the cluster. In MapReduce parlance, these would be called input splits and would be determined by the InputFormat for that external input.
      • If Tez grouping is enabled for the splits, then a generic grouping logic is run on these splits to group them into larger splits. The idea is to strike a balance between how parallel the processing is and how much work is being done in each parallel process.
      • First, Tez tries to find out the resource availability in the cluster for these tasks. For that, YARN provides a headroom value (and in future other attributes may be used). Lets say this value is T.

        int totalResource = getContext().getTotalAvailableResource().getMemory();

        计算总资源大小 假设=T

      • Next, Tez divides T with the resource per task (say M) to find out how many tasks can run in parallel at one (ie in a single wave). W = T/M.  
      • 计算   任务数W = 总资源T/每个task需要多少资源
      • Next W is multiplied by a wave factor (from configuration - tez.grouping.split-waves) to determine the number of tasks to be used. Lets say this value is N.

        int taskResource = getContext().getVertexTaskResource().getMemory();

        float waves = conf.getFloat(

                TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES,

                TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); --default=1.7

        int numTasks = (int)((totalResource * waves)/taskResource);

      • If there are a total of X splits (input shards) and N tasks then this would group X/N splits per task. Tez then estimates the size of data per task based on the number of splits per task.
      • If this value is between tez.grouping.max-size & tez.grouping.min-size then N is accepted as the number of tasks. If not, then N is adjusted to bring the data per task in line with the max/min depending on which threshold was crossed.

        if (lengthPerGroup > maxLengthPerGroup) {

          // splits too big to work. Need to override with max size.

          int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;

        ...

        else if (lengthPerGroup < minLengthPerGroup) {

          // splits too small to work. Need to override with size.

          int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;

      • For experimental purposes tez.grouping.split-count can be set in configuration to specify the desired number of groups. If this config is specified then the above logic is ignored and Tez tries to group splits into the specified number of groups. This is best effort.

        int configNumSplits = conf.getInt(TEZ_GROUPING_SPLIT_COUNT, 0);

        if (configNumSplits > 0) {

          // always use config override if specified

          desiredNumSplits = configNumSplits;

      • After this, the grouping algorithm is executed. It groups splits by node locality, then rack locality, while respecting the group size limits.
         
    • If the final number of splits is X then X tasks are created in the initial vertex and they are executed on the cluster to read the external data.

  • 相关阅读:
    SpringBoot中的日志使用
    关于滑块验证码的问题
    (十四)OpenCV中的自带颜色表操作cv::LUT
    数据结构习题(快期末了)
    iRDMA Flow Control Introduction
    Microsoft 10/11 命令行打开系统设置页(WUAP,!WIN32)
    YOLOv5+BiSeNet——同时进行目标检测和语义分割
    Nuxt3 中使用 ESLint
    2-1 C++类的转换函数与禁止隐士转换(explicit)
    如何使用ArcGIS Pro直接获取道路中心线
  • 原文地址:https://blog.csdn.net/cclovezbf/article/details/126835916