• Hadoop3教程(二十):MapReduce的工作机制总结


    (109)MapTask工作机制

    MapTask的完整工作流程如图:

    在这里插入图片描述

    依图可见,MapTask一共分为5个阶段:

    • Read阶段
    • Map阶段
    • Collect阶段
    • 溢写阶段
    • Merge阶段

    1) Read阶段:首先是job的提交流程,客户端会读取待处理文件的信息,然后做切片规划,接着把信息和切片规划都提交到YARN,YARN会计算出所需要的MapTask的数量,之后才会正式进入MapTask;

    MapTask会调用InputFormat读取数据,K是偏移量,v是这一行的文本,这就是Read阶段;

    2) Map阶段,会进入自定义Mapper,执行我们在map()里定义的业务逻辑;

    3) Collect阶段。把Map阶段处理的数据,通过调用OutputCollector.collect()来持续的发送到环形缓冲区,这个过程中会给每个KV对打上分区标记;

    4) Spill阶段,即溢写阶段。当缓冲区写到80%之后,触发溢写,MR会将环形缓冲区的数据排序后写到本地磁盘上,生成一个临时文件。

    4.1) 溢写时,首先使用快速排序算法,对缓冲区内的数据进行排序。排序的方式是,先按照分区编号Partition进行排序,然后再按照key进行排序。经过这样排序之后,数据以分区为单位聚集在一起,同时区内数据是按照key有序;

    4.2) 按照分区编号从小到大依次将每个分区的数据写入临时文件output/spillN.out中,其中N表示当前溢写的次数。如果设置了Combiner,则会先进行分区内Combiner之后,再写入文件;

    4.3) 将分区数据的元信息写到内存索引数据结构spillRecord中,其中每个分区数据的元信息包括:在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。方便后续按照分区提取数据。如果当前内存索引大小占用超过1MB,则会将内存索引写到output/spillN.out.index里。

    5) Merge阶段。当前MapTask的所有数据处理完成后,会对自己溢写输出的所有文件做归并,确保最终只生成一个数据文件。(避免小文件创建和读取的开销)

    最终的大文件合并完成后,MapTask会将其保存到output/file.out里,同时生成相应的索引文件output/file.out.index

    在文件合并过程中,是以分区为单位进行合并。对于某个分区,采用多轮递归合并的方式,每轮合并mapreduce.task.io.sort.factor(默认为10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

    (110)ReduceTask工作机制&并行度

    ReduceTask工作机制

    ReduceTask的完整工作流程如下图:

    在这里插入图片描述

    由图可知,ReduceTask分为3个阶段:

    • Copy阶段
    • Sort阶段(或者是merge + sort阶段)
    • Reduce阶段

    1) Copy阶段,每个ReduceTask负责处理一个分区的数据,首先每个ReduceTask会从各个MapTask归并完的文件里,将对应分区的数据拷贝过来。相当于是从每个MapTask都取一片数据,如果这片数据大小超过阈值,那么就写到磁盘上,否则就直接放进内存里。

    2) 接下来进入Sort阶段,每个ReduceTask会对自己拉取过来的那堆数据,做归并排序。

    ReduceTask在远程拷贝数据的同时,也会启动两个后台线程,对内存和磁盘上的文件进行合并,以防止内存中数据过多,或者磁盘上文件过多。由于每个MapTask对自己的数据已经进行了局部排序,因此ReduceTask只需要对拿到的所有数据进行一次归并排序即可。(子序列有序,合并多个子序列的算法)

    3) 最后进入Reduce阶段,将上一阶段的文件送进Reduce()里,执行自定义的业务逻辑,并调用绑定的OutputFormat,将处理结果以指定形式输出到指定介质。

    MapTask和ReduceTask的并行度决定机制

    MapTask并行度是由切片个数决定的,而切片个数则是由输入文件和切片规则决定的。

    那ReduceTask的并行度是由谁决定的?

    ReduceTask的数量是可以手动设置的,通过在驱动类里编写如下代码:

    job.setNumReduceTasks(4);		// 设置为4个ReduceTask
    
    • 1

    设置ReduceTask数量=0的时候,表示没有reduce阶段,输出文件个数跟map阶段一致。

    如果数据分布不均匀的话,就有可能在Reduce阶段产生数据倾斜(单个分区数据过多,导致与其对应的ReduceTask处理压力过大),这个可以实际观察一下。

    如果分区数不是1,但是设置的ReduceTask数量是1,那么将不会执行分区阶段。因为分区的时候会先判断ReduceTaskNum是否大于1,不大于1就不会执行分区逻辑。这个其实之前的时候也有讲过来着。

    既然ReduceTask的数量是需要人为指定的,那么这个数量设置多少比较合适?

    只能说不一定,需要考虑业务的需求,还需要结合集群的性能,来综合评定。

    在有的情况下,需要计算全局汇总结果,那就只有一个ReduceTask就够了。

    教程里做了一个简单的测试,我直接贴一下:

    实验环境:1个Master节点,16个Slave节点,CPU都是8GHZ,内存都是2G。

    当处理1GB数据时,改变ReduceTask的数量,总处理时长的变化为:

    MapTask =16
    ReduceTask151015162025304560
    总时间8921461109288100128101145104

    所以可以看到,一般来讲,ReduceTask的数量,不是越多越好,同样的,也不是越少越好。还是得视情况具体分析。

    (122)MapReduce开发总结

    接下来,按照执行的顺序,简单过一下MR处理过程中涉及到的一些组件,或者说一些环节:

    1) 输入数据接口:InputFormat

    默认的是TextInputFormat,输入的K是偏移量(功能上可简单理解成行号,但其实不是),V是对应的一行内容。但是一个文件至少占一片,因此处理不了大量小文件的场景。

    处理大量小文件时应该用CombineTextInputFormat,可以把多个文件合并在一起统一切片,其中会涉及虚拟存储的概念;

    2)逻辑处理接口:Mapper

    数据进入Mapper之后,会按顺序执行下面三个方法:

    • setup(),负责初始化
    • map(),用户的业务逻辑
    • cleanup(),最后调用,关闭资源;

    之后,数据进入到shuffle阶段。

    3)分区:Partitioner

    默认的是HashPartitioner,即按照key的hashcode值 % numReducer个数来计算key所属分区。

    可根据业务需要,自定义分区类。

    4)排序:Comparable

    MR中的排序默认是字典排序,即按照字母或者数字,由小到大排序。
    不同场景下的排序,拥有不同的名字:

    • 部分排序:即每个输出的文件,内部有序,文件和文件之间无序;
    • 全排序:相当于一个Reduce,对所有数据排序,慎用,所有数据放在一台节点上,容易OOM;
    • 二次排序:即拥有两个排序条件,它属于自定义排序的范畴,需要实现WritableCompare接口,并重写其中的compareTo()来添加大小比较的逻辑;

    5)合并:Combiner(可选阶段)

    Combiner其实是一类运行在单个MapTask的Reducer。

    通过在shuffle的几个阶段里做预聚合,如可以将(a,1)、(a,1)这样的数据聚合成(a,2),从而减少数据量,避免无谓的IO传输。

    前提是不影响最终的业务逻辑(求和没啥问题),其实就是提前在map阶段聚合,这样算是降低了Reducer端的压力,这也是解决数据倾斜的一个方法。

    同样的,Combiner也可以自定义,可编写自己需要的、同时不影响业务的聚合逻辑。

    6)逻辑处理接口:Reducer

    数据进入Reducer之后,会按顺序执行下面三个方法:

    • setup() 初始化
    • reduce() 用户的业务逻辑;
    • clearup() 关闭清理资源

    7)输出数据接口:OutputFormat

    默认是TextOutputFormat,即将每个KV对,作为一行输出到目标文件;同样支持自定义。

    参考文献

    1. 【尚硅谷大数据Hadoop教程,hadoop3.x搭建到集群调优,百万播放】
  • 相关阅读:
    LDA代码训练报错记录
    hive 自定义函数、GenericUDF、GenericUDTF(自定义函数计算给定字符串的长度、将一个任意分割符的字符串切割成独立的单词)
    Git 分布式版本控制工具03Git常用命令:Git全局设置+本地与远程仓库操作获取Git仓库+标签操作+忽略名单+工作区、暂存区、版本库+分支操作+暂时保存
    接口使用的最佳时机
    LeetCode669. Trim a Binary Search Tree
    playwright录制脚本
    本教程旨在为正在构建,部署和使用CarbonData的最终用户和开发人员提供故障排除。
    CentOS 安装 tomcat 并设置 开机自启动
    注意力机制的qkv
    【跟着大佬学JavaScript】之数组去重(结果对比)
  • 原文地址:https://blog.csdn.net/wlh2220133699/article/details/133894677