• MapReduce理论


    MapReduce

    • 可以分解成两个过程

    1.Map

    • 映射过程
    • 将一组数据按某种Map函数映射成新数据
    • 总之Map主要是映射、变换、过滤的过程

    2.Reduce

    • 归纳过程
    • 将若干组映射结果进行汇总并输出
    • 总之Reducce主要是分解、缩小、归纳的过程

    3.MapReduce过程

    在这里插入图片描述

    4.优点

    • 方便分步式计算编程
    • 动态增加资源,解决计算资源不足问题
    • 高容错
    • 适合海量数据计算,几千台服务器共同计算

    5.缺点

    • 不适合实时计算
    • 不擅长流式计算
    • 不擅长DAG有向无关图计算

    MR工作流程

    1.整体工作流程图解

    在这里插入图片描述

    2.详细讲解工作流程(结合源码)

    • 提交作业
      在这里插入图片描述
    • 初始化作业
      在这里插入图片描述
    • 任务分配
      在这里插入图片描述
    • 任务执行
      在这里插入图片描述
    • 进度和状态更新
    作业与其每个任务都有一个状态信息(作业或任务的运行状态,MapReduce的进度,计数器值,状态消息或描述),这些状态信息在作业期间不断改变
    	1.Map进度标准是处理输入所占比例
    	2.Reduce进度标准是copy/merge/reduce(与shuffle的3阶段相对应)对整个进度的比例
    Task有独立的进程,进程每个3秒检查一次任务更新标志,若有更新则报告给TaskTracker
    TaskTracker每隔5秒给JobTracker发一次心跳信息,而JobTracker将合并这些更新产生一个表明所有运行作业及其任务状态的全局视图
    同时JobClient通过每秒查询JobTracker来获得最新状态,并且输出到控制台上
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 作业完成
    JobTracker收到作业最后一个任务已完成的通知后,便把作业的状态设置为"成功"。然后在JobClient查询状态时便知道作业已成功完成,于是JobClient打印一条消息告知用户,最后从runJob()方法返回
    最后JobTracker清空作业的工作状态,指示TaskTracker也清空作业的工作状态
    
    • 1
    • 2
    • 全过程
      在这里插入图片描述

    提升理解点

    • 推测执行
    系统中有99%Map任务都完成了,只有少数几个Map老是进度很慢怎么办?
    	Hadoop会把任务分配到多个节点,当中一些慢的节点会限制整体程序的执行速度,这时Hadoop会引入“推测执行”过程:
    		作业中大多数任务都已经完成,hadoop平台会在几个空闲节点上调度执行剩余任务的复制,当任务完成时会向JobTracker通告。
    		任何一个首先完成的复制任务将成为权威复制,如果其他复制任务还在推测执行中,hadoop会告诉TaskTracker去终止这些任务并丢弃其输出,然后reducer会从首先完成的mapper那里获取输入数据
    
    • 1
    • 2
    • 3
    • 4
    • 任务JVM重用
    mapred.job.reuse.jvm.num.tasks默认为1,即每个Task都启动一个JVM来运行任务,当值为-1时,表示JVM可以无限制重用
    当值为-1时,TaskTracker先判断当前当前节点是否有slot剩余,
    如果没有slot槽位才会判断当前分配的slot槽位中的JVM是否已经将当前task任务运行完,
    如果task已经运行完,才会复用当前JVM(同一Job的JVM才会复用)
    当一个JobTask数目很大(尤其Task耗时很小),由于频繁的JVM停启会造成很大开销,进行JVM复用会使同一个Job的一些静态数据得到共享,从而是集群性能得到很大提升,但是JVM重用会导致在同一个JVM中的碎片增加,导致JVM性能变差
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.整体工作流程细化图解

    在这里插入图片描述

    4.MapTask

    • 图解
      在这里插入图片描述
    • Spill 阶段
    将数据写入本地磁盘前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作
    将数据写入本地磁盘前,线程根据ReduceTask的数量将数据分区,一个Reduce任务对应一个分区的数据
    	这样做的目的是为了避免有些Reduce任务分配到大量数据,而有些Reduce任务分到很少的数据,甚至没有分到数据的尴尬局面
    如果此时设置了Combiner,将排序后的结果进行Combine操作,这样做的目的是尽可能少地执行数据写入磁盘的操作
    
    • 1
    • 2
    • 3
    • 4
    • Combine 阶段
    合并的过程中会不断地进行排序和Combine操作
    其目的有两个,一是尽量减少每次写人磁盘的数据量;二是尽量减少下一复制阶段网络传输的数据量
    最后合并成了一个已分区且已排序的文件
    
    • 1
    • 2
    • 3

    5.ReduceTask

    • 图解
      在这里插入图片描述
    • Sort 阶段
    为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略
    由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此ReduceTask只需对所有数据进行一次归并排序即可
    
    • 1
    • 2
    • Write 阶段
    合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到Reduce函数
    
    • 1

    Sort阶段+Shuffle阶段

    1.该阶段干了什么?

    • Sort阶段是指对Map端输出的Key进行排序的过程,不同的Map可能输出相同的Key,相同的Key必须发送到同一个Reduce端处理
    • Shuffle阶段是指从Map的输出开始,包括系统执行排序以及传送Map输出到Reduce作为输入的过程
    • Shuffle实现的功能包括排序+分组+合并复杂数据处理
    • Shuffle阶段可以分为Map端的Shuffle和Reduce端的Shuffle

    2.Shuffle阶段和Sort阶段的工作过程

    在这里插入图片描述

    3.工作过程详解之Map端的Shuffle

    • Map函数开始产生输出时并不是简单地把数据写到磁盘,因为频繁的磁盘操作(IO)会导致性能严重下降,以下是Map端执行流程

    • Map将产生的输出数据写入内存的环形缓冲区
    环形缓冲区其实只是一个简单的buffer(),缓冲区的初始化源码如下
    	private IntBuffer kvmeta;//存储元数据信息 **注意这是IntBuffer存储int,至于元数据为什么都是int下面会有答案
        byte[] kvbuffer;//环形缓冲区的数组
    
        final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);//通过配置文件得到了环形缓冲区的大小(默认100M)
        ....................
      	int maxMemUsage = sortmb << 20;//通过位运算把100m转化成对应的字节
      	kvbuffer = new byte[maxMemUsage]; //创建一个buffer
      	bufvoid = kvbuffer.length;
      	kvmeta = ByteBuffer.wrap(kvbuffer)//通过伪装得到一个元数据数组 
        	.order(ByteOrder.nativeOrder())
         	.asIntBuffer();//100M位例值为26214400
      	setEquator(0);//设置初始的赤道的位置为0
      	bufstart = bufend = bufindex = equator;//初始化buffer参数
      	kvstart = kvend = kvindex;//初始化meta参数   
    	....................
        private void setEquator(int pos) {
            final int aligned = pos-(pos% METASIZE);//在这里pos为0, 所以aligned=0
            kvindex = ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 溢写spill
    环形缓冲区默认大小为100MB, 也可以通过 mapreduce.task.io.sort.mb属性来配置
    1将输出不断写到环形缓冲区,一旦缓冲区快要溢出时(80%)(mapreduce.map.sort.spill.percent 配置),
    系统将启动一个后台线程把缓冲区中的内容写到磁盘(spill阶段)
    在写磁盘过程中,Map输出继续被写到缓冲区,但如果在此期间缓冲区被填满,那么Map就会阻塞直到写磁盘过程完成
    
    • 1
    • 2
    • 3
    • 4
    • 分区+排序
    在写本地磁盘之前,MapReduceJob会预断Reduce任务的个数(编程时设定好的)
    假如此时有多个Reduce任务,后台线程会根据数据最终要送往的Reduce进行相应分区(调用Partitioner对象的getPartition()可查)
    默认的Partitioner使用Hash算法来分区,简单地说就是通过key.hashCode()R来计算获得分区的编号,其中RReduce的个数,每个Reduce对于一个分区
    Map的输出与分区信息一起被保存
    针对每个分区中的数据使用快速排序算法按照Key进行内部排序
    如果设置了Combine函数,则会在排序的结果上进行Combine局部聚合运算
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 合并Spill文件
    Spill文件根据情况被合并成一个大的已分区且已排序的输出文件
    如指定Combine,并且溢出写次数至少为3(min.num.spills.for.combine配置)时,则Combine会在输出文件写入磁盘前运行
    运行Combine的意义在于使Map输出更紧凑,使写到本地磁盘和传给Reduce的数据更少
    如果spill文件数量大于mapreduce.map.combiner.minspills配置的数值,那么在合并文件写入之前会再次运行Combine
    mapreduce.task.io.sort.factor属性配置每次最多合并的文件数默认为10,即一次最多合并10个spill文件,多轮合并后,最终所有的输出文件被合并为一个大文件
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 压缩
    默认Map的输出不压缩,mapred.compress.map.output属性设置为true配置
    
    • 1
    • 通过HTTP暴露输出结果
    Reduce端通过HTTP的方式请求Map端输出文件的分区
    
    • 1

    4.工作过程详解之Reduce端的Shuffle

    • ReduceTask在执行之前的工作就是不断地拉取当前job中每个MapTask的最终结果,然后对从不同地方拉取过来的数据不断地做merge,最终形成一个文件作为ReduceTask的输入文件,以下是Reduce端执行流程

    • Copy Phase
    1.Reduce从各个Map任务的输出文件中提取属于自己分区的那部分
    	每个Map任务的完成时间可能是不一样的,Reduce任务在Map任务结束之后会尽快取走输出结果
    2.Reduce可以并行获取Map数据,线程默认为5(mapred.reduce.parallel.copies配置) 
    3.数据被Reduce提取后,Map机器不会立刻删除数据
    	这样Reduce任务执行失败时还能重新提取
    	Map的输出数据是在整个作业完成后才被删除的
    	由JobTracker通知TaskTracker可以删除Map输出数据
    4.Map输出的数据足够小,会被复制到Reduce任务的JVM内存中 
    	mapreduce.reduce.shuffle.input.buffer.percent配置JVM堆内存中有多少比例可以用于存放map任务的输出结果
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • Merge Phase
    1.接上面第4步,Reduce任务的JVM内存缓冲区中数据达到配置的阈值时
    	这些数据会在内存中被合并,并写入机器磁盘
    
    • 1
    • 2

    MapReduce性能调优

    1.通用原则

    • Shuffle过程分配尽可能大的内存
    • Mapper和Reduce过程程序应该尽量减少内存的使用
    • 运行Map与Reduce的任务JVM内存通过mapred.child.java.opts配置
    • 容器内存大小通过mapreduce.map.memory.mb和 mapreduce.reduce.memory.mb配置(默认值都为1024MB)

    2.Map端优化

    • 避免写入多个spill文件,最好只有一个spill文件
    • 估计Map的输出大小,设置合理的 mapreduce.task.io.sort.*属性,使得spill文件数量最小
      在这里插入图片描述

    3.Reduce端优化

    • 让所有数据都保存在内存中可以达到最佳的性能
    • 通常情况下,内存都保留给Reduce函数,但是如果Reduce函数对内存需求不是很高,将mapreduce.reduce.merge.inmem.threshold(触发合并的Map输出文件数)设为0,mapreduce.reduce.input.buffer.percent(保存Map输出文件的堆内存比例)设为1.0,可以达到很好的性能提升
      在这里插入图片描述
  • 相关阅读:
    Java学习笔记整理总结:Spring+tomcat+Kafka+多线程面试笔记
    测试右移:线上质量监控 ELK 实战
    将c、c++变为python
    AutoRD 自动化罕见疾病挖掘:专注于从非结构化文本中提取罕见疾病信息并构建相应的知识图谱
    机器学习每周挑战——旅游景点数据分析
    抖音视频评论区用户采集工具使用教程
    softmax函数
    [附源码]计算机毕业设计JAVAjsp高校奖学金评定管理系统
    算法的时间复杂度
    推荐5款优质的黑科技软件,好不好用你来判断
  • 原文地址:https://blog.csdn.net/weixin_51699336/article/details/125352425