• MapRecuce框架原理


    InputFormat数据输入

    切片与MapTask并行度决定机制

    MapTask的并行度决定Map阶段的任务处理并发度,进而影响整个Job的处理速度

    MapTask并行度决定机制

    数据块: Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。

    数据切片: 数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。

    一个Job的Map阶段并行度由客户端在提交Job时的切片数决定。每一个Split切片分配一个MapTask并行实时处理。在默认情况下,切片大小等于BlockSize。切片时不考虑数据集整体,而是逐个针对每一个文件单独切片。

    源码解读

    1. 程序先找到数据存储的目录

    2. 开始遍历目录下的每一个文件

    3. 遍历第一个文件

      1. 获得文件大小fs.sizeOf()

      2. 计算切片大小

        computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize

      3. 默认情况下,切片大小=blocksize

      4. 开始切片,形成第一个切片:0128M,第二个切片128256M …(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)

      5. 将切片信息写到一个切片规划文件中

      6. 整个切片的核心过程在getSplit()方法中完成

      7. InputSplit只记录了切片的元数据信息,比如起始位置,长度以及所在节点列表等。

    4. 提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据规划文件计算开启MapTask个数

    FileInputFormat切片机制

    计算切片大小的公式

    Math.max(minSize,Math.min(maxSize,blockSize))
    mapreduce.input.fileinputformat.split.minsize=1 默认值为 1
    mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认值Long.MAXValue
    因此,默认情况下,切片大小=blockSize
    
    • 1
    • 2
    • 3
    • 4

    切片大小设置

    maxSize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数值。

    minSize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大

    获得切片信息API

    //获得切片的文件名称
    String name=inputSplit.getPath().getName();
    //根据文件类型获取切片信息
    FileSplit inputSplit=(FileSplit)context.getInputSplit();
        
    
    • 1
    • 2
    • 3
    • 4
    • 5

    CombineTextInputFormat

    框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。

    1)应用场景:

    CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

    2)虚拟存储切片最大值设置

    CombineTextInputFormat.setMaxInputSplitSize(job, maxSize);

    注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

    3)切片机制

    生成切片过程包括:虚拟存储过程和切片过程二部分。

    (1)虚拟存储过程:

    将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。

    例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。

    (2)切片过程:

    ​ (a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。

    ​ (b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。

    有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:

    1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)

    最终会形成3个切片,大小分别为:

    (1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

    MapReduce工作流程

    在这里插入图片描述

    在这里插入图片描述

    Shuffle机制

    Shuffle机制

    Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

    在这里插入图片描述

    Partition分区

    要求将统计结果按照条件输出到不同文件中。

    默认分区

    public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
      public int getPartition(K2 key, V2 value,
                              int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

    自定义分区

    1. 继承Partitioner类
    2. 重写getPartition()方法
    3. 在Job驱动中,设置自定义Partitioner
    4. 自定义Partition后,要根据自定义分区的逻辑设置相应的ReduceTask

    分区总结

    (1)如果RedceTask的数量>getPatition的结果数,则会多产生几个空的输出文件part-r-000xx;

    (2)如果1

    (3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReciceTask,最终也就只会产生一个结果文件part–00000 ;

    (4)分区号必须从零开始,逐一累加。

    Comparable排序

    排序是MapReduce框架中最重要的操作之一

    MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序。

    默认排序是按照字典顺序排序,且使用快速排序

    在这里插入图片描述

    自定义排序WritableComparable原理分析

    bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

    Combiner合并

    combiner的意义就是对每一个MapTask的数据进行局部汇总,以减少网络传输量。减轻ReduceTask的工作量。

    Combiner能够使用的前提是不能影响最终的业务逻辑。

    自定义Combiner

    1. 继承Reducer,重写reduce方法
    2. 驱动类中设置:job.setCombinerClass()
  • 相关阅读:
    MySQL之DQL
    Docker容器化技术(使用Dockerfile制作Nginx镜像)
    Parsing error: The keyword ‘interface‘ is reserved配置优化
    转载: 又拍云【PrismCDN 】低延时的P2P HLS直播技术实践
    基于springboot爱心捐赠小程序毕业设计-附源码211711
    最刁钻的阿里面试官总结的面试者常用面试题,看看你会哪些?
    什么是 PKI? 公钥基础设施的定义和指南
    ansible copy 模块
    【想法】取代NI的 PCIe-8371
    fdbus之CBaseMessage
  • 原文地址:https://blog.csdn.net/weixin_57025392/article/details/127716644