• Hadoop3教程(十一):MapReduce的详细工作流程


    (94)MR工作流程

    本小节将展示一下整个MapReduce的全工作流程。

    Map阶段

    首先是Map阶段:

    在这里插入图片描述

    1. 首先,我们有一个待处理文本文件的集合;

    2. 客户端开始切片规划;

    3. 客户端提交各种信息(如切片规划文件、代码文件及其他配置数据)到yarn;

    4. yarn接收信息,计算所需的MapTask数量(按照切片数);

    5. MapTask启动,读取输入文件,默认使用的是TextInputFormat。输出KV对,以TextInputFormat为例,K是偏移量(行在整个文件的字节数),V是这一行的内容;

    6. TextInputFormat读取完毕后,将得到的KV对都输入Mapper(),做自定义业务逻辑处理(核心处理部分);

    7. Mapper()处理完的数据,放入outputCollector,也被叫做环形缓冲区;环形缓冲区是位于内存中的,其实就是个缓冲数组,里面每行数据是分左右两部分,右边一部分是KV数据位,存放的是输入进来的K值和V值,左边一部分是对应的索引数据,存放的信息有:本行KV对的索引、本行KV对的分区、keystart以及valuestart;这里的keystart和valuestart都是指数据在内存中的存储位置,(keystart~valuestart)表示本行key值的存储起止位置,而(valuestart~下一行数据的keystart)表示本行value值的存储起止位置,其他行以此类推。

      环形缓冲区默认大小是100M,它有个有趣的机制用来协调写 + 磁盘持久化。当写满到80%的时候,环形缓冲区会开始进行反向逆写操作

      什么是反向逆写呢?

      可以结合数组做简单理解,就是假设数组有100个位置,即索引位0~99,当写到80%位置,即从索引0开始,到索引79写完了之后,就开始反向逆写,从索引99开始往前写,依次是98/97这样子。

      为什么要这么设置?

      很简单,当写满到80%的时候,系统会开启一个线程,将这80%的数据持久化到磁盘,但持久化的同时,一般希望不会影响正常的写,于是留了20%的空位置,供正常的写操作。因此是持久化 + 写,并行运行。

      想象一下,如果规定只有写满到100%之后才能持久化到磁盘,或者说溢出到磁盘,那么在它持久化的过程中,整个写流程就必须暂停,直到持久化完成后,环形缓冲区清空后才能继续写,这个时间消耗未免太长,效率太低。这么看的话,它这个80%后开始逆写的设置,还挺棒的。

      这里有个潜在的问题,就是如果系统写的很快,在没有持久化完那80%之前,那20%的空位置就写满了,这时候会发生什么情况?

      这时候,写流程就不得不暂停,直到持久化完成之后再恢复写。

    8. 注意,上一步中持久化,或者说溢写数据之前,会先将数据分区(不同分区的数据在Reduce阶段将会被送进不同的ReduceTask)。然后分区内做排序,一般使用快排。

      那排序是针对什么来排呢?

      不是数据的KV,而是数据的那几个索引。

    9. 将数据溢出至文件。注意,单次溢写的数据虽然是写在一个文件里,但是是分区且分区内有序的。

    10. 在数据溢出数次后,我们就有了好几个文件,接下来我们将这些文件merge,做归并排序,相当于是合并成一个文件,然后将结果存储在磁盘。

    11. 做预聚合。比如说如果有两个,那可以直接合并成。当然,这一步并不是必要的,可以结合实际场景具体看是否需要。

    到这里,一个MapTask的工作就正式结束了,其他的MapTask就是重复以上过程。

    Reduce阶段

    Reduce阶段:

    在这里插入图片描述

    1. 一般情况下,等所有MapTask任务都完成后,就会启动响应数据的ReduceTask,并告知每个ReduceTask它需要处理的数据范围。

      这里说的是一般情况下,实际上我们也可以设置,等到一部分MapTask完成之后就先启动几个ReduceTask做处理,相当于Map阶段和Reduce阶段同时进行。这个比较适合MapTask很多的情况,比如说有100个MapTask,等到100个都执行完,才进入Reduce阶段,未免太慢了,所以可以这样并行走。

    2. ReduceTask 主动 从MapTask的结果数据中去拉取需要的数据,然后做合并文件 + 归并排序

      举个例子,ReduceTask_1可能会从MapTask_1拉取指定分区数据,也会从MapTask_2中拉取该分区的数据,这样的话就会有多个文件,而且虽然每个文件内部是有序的(MapTask处理过),但是不同文件之间可能是无序的,因此合并文件 + 归并排序,是很有必要的。

    3. 对上一步产生的结果,一次读取一组,送进Reducer()去做业务逻辑处理。这里的一组是KEY值相同作为一组,因为上一步中已经排序过了,所以KEY值相同的会被放在一起,直接取这一组就可以了。

    4. 分组,暂且不表;

    5. Reducer()处理完了之后,由OutputFormat往外输出,默认是TextOutputFormat,即输出成文本文件。

    这就是整个MR处理的流程。

    参考文献

    1. 【尚硅谷大数据Hadoop教程,hadoop3.x搭建到集群调优,百万播放】
  • 相关阅读:
    JMeter 扩展开发:自定义 Java Sampler
    机器学习分类模型评价指标之ROC 曲线、 ROC 的 AUC 、 ROI 和 KS
    Android - 进程
    Servlet的基础详细笔记
    消息队列一文全解!!!
    一台服务器上部署 Redis 伪集群
    vscode一键生成佛祖保佑永无bug
    日渐流行的零代码,究竟适用哪些场景?
    深度网络架构的设计技巧(三)之ConvNeXt:打破Transformer垄断的纯CNN架构
    tp5自定义命令行
  • 原文地址:https://blog.csdn.net/wlh2220133699/article/details/133850432