作业与其每个任务都有一个状态信息(作业或任务的运行状态,Map和Reduce的进度,计数器值,状态消息或描述),这些状态信息在作业期间不断改变
1.Map进度标准是处理输入所占比例
2.Reduce进度标准是copy/merge/reduce(与shuffle的3阶段相对应)对整个进度的比例
Task有独立的进程,进程每个3秒检查一次任务更新标志,若有更新则报告给TaskTracker
TaskTracker每隔5秒给JobTracker发一次心跳信息,而JobTracker将合并这些更新产生一个表明所有运行作业及其任务状态的全局视图
同时JobClient通过每秒查询JobTracker来获得最新状态,并且输出到控制台上
当JobTracker收到作业最后一个任务已完成的通知后,便把作业的状态设置为"成功"。然后在JobClient查询状态时便知道作业已成功完成,于是JobClient打印一条消息告知用户,最后从runJob()方法返回
最后JobTracker清空作业的工作状态,指示TaskTracker也清空作业的工作状态
提升理解点
系统中有99%的Map任务都完成了,只有少数几个Map老是进度很慢怎么办?
Hadoop会把任务分配到多个节点,当中一些慢的节点会限制整体程序的执行速度,这时Hadoop会引入“推测执行”过程:
作业中大多数任务都已经完成,hadoop平台会在几个空闲节点上调度执行剩余任务的复制,当任务完成时会向JobTracker通告。
任何一个首先完成的复制任务将成为权威复制,如果其他复制任务还在推测执行中,hadoop会告诉TaskTracker去终止这些任务并丢弃其输出,然后reducer会从首先完成的mapper那里获取输入数据
mapred.job.reuse.jvm.num.tasks默认为1,即每个Task都启动一个JVM来运行任务,当值为-1时,表示JVM可以无限制重用
当值为-1时,TaskTracker先判断当前当前节点是否有slot剩余,
如果没有slot槽位才会判断当前分配的slot槽位中的JVM是否已经将当前task任务运行完,
如果task已经运行完,才会复用当前JVM(同一Job的JVM才会复用)
当一个Job的Task数目很大(尤其Task耗时很小),由于频繁的JVM停启会造成很大开销,进行JVM复用会使同一个Job的一些静态数据得到共享,从而是集群性能得到很大提升,但是JVM重用会导致在同一个JVM中的碎片增加,导致JVM性能变差
将数据写入本地磁盘前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作
将数据写入本地磁盘前,线程根据ReduceTask的数量将数据分区,一个Reduce任务对应一个分区的数据
这样做的目的是为了避免有些Reduce任务分配到大量数据,而有些Reduce任务分到很少的数据,甚至没有分到数据的尴尬局面
如果此时设置了Combiner,将排序后的结果进行Combine操作,这样做的目的是尽可能少地执行数据写入磁盘的操作
合并的过程中会不断地进行排序和Combine操作
其目的有两个,一是尽量减少每次写人磁盘的数据量;二是尽量减少下一复制阶段网络传输的数据量
最后合并成了一个已分区且已排序的文件
为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略
由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此ReduceTask只需对所有数据进行一次归并排序即可
合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到Reduce函数
环形缓冲区其实只是一个简单的buffer(),缓冲区的初始化源码如下
private IntBuffer kvmeta;//存储元数据信息 **注意这是IntBuffer存储int,至于元数据为什么都是int下面会有答案
byte[] kvbuffer;//环形缓冲区的数组
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);//通过配置文件得到了环形缓冲区的大小(默认100M)
....................
int maxMemUsage = sortmb << 20;//通过位运算把100m转化成对应的字节
kvbuffer = new byte[maxMemUsage]; //创建一个buffer
bufvoid = kvbuffer.length;
kvmeta = ByteBuffer.wrap(kvbuffer)//通过伪装得到一个元数据数组
.order(ByteOrder.nativeOrder())
.asIntBuffer();//100M位例值为26214400
setEquator(0);//设置初始的赤道的位置为0
bufstart = bufend = bufindex = equator;//初始化buffer参数
kvstart = kvend = kvindex;//初始化meta参数
....................
private void setEquator(int pos) {
final int aligned = pos-(pos% METASIZE);//在这里pos为0, 所以aligned=0
kvindex = ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
}
环形缓冲区默认大小为100MB, 也可以通过 mapreduce.task.io.sort.mb属性来配置
1将输出不断写到环形缓冲区,一旦缓冲区快要溢出时(80%)(mapreduce.map.sort.spill.percent 配置),
系统将启动一个后台线程把缓冲区中的内容写到磁盘(spill阶段)
在写磁盘过程中,Map输出继续被写到缓冲区,但如果在此期间缓冲区被填满,那么Map就会阻塞直到写磁盘过程完成
在写本地磁盘之前,MapReduce的Job会预断Reduce任务的个数(编程时设定好的)
假如此时有多个Reduce任务,后台线程会根据数据最终要送往的Reduce进行相应分区(调用Partitioner对象的getPartition()可查)
默认的Partitioner使用Hash算法来分区,简单地说就是通过key.hashCode()%R来计算获得分区的编号,其中R为Reduce的个数,每个Reduce对于一个分区
Map的输出与分区信息一起被保存
针对每个分区中的数据使用快速排序算法按照Key进行内部排序
如果设置了Combine函数,则会在排序的结果上进行Combine局部聚合运算
Spill文件根据情况被合并成一个大的已分区且已排序的输出文件
如指定Combine,并且溢出写次数至少为3(min.num.spills.for.combine配置)时,则Combine会在输出文件写入磁盘前运行
运行Combine的意义在于使Map输出更紧凑,使写到本地磁盘和传给Reduce的数据更少
如果spill文件数量大于mapreduce.map.combiner.minspills配置的数值,那么在合并文件写入之前会再次运行Combine
mapreduce.task.io.sort.factor属性配置每次最多合并的文件数默认为10,即一次最多合并10个spill文件,多轮合并后,最终所有的输出文件被合并为一个大文件
默认Map的输出不压缩,mapred.compress.map.output属性设置为true配置
Reduce端通过HTTP的方式请求Map端输出文件的分区
1.Reduce从各个Map任务的输出文件中提取属于自己分区的那部分
每个Map任务的完成时间可能是不一样的,Reduce任务在Map任务结束之后会尽快取走输出结果
2.Reduce可以并行获取Map数据,线程默认为5个(mapred.reduce.parallel.copies配置)
3.数据被Reduce提取后,Map机器不会立刻删除数据
这样Reduce任务执行失败时还能重新提取
Map的输出数据是在整个作业完成后才被删除的
由JobTracker通知TaskTracker可以删除Map输出数据
4.如Map输出的数据足够小,会被复制到Reduce任务的JVM内存中
mapreduce.reduce.shuffle.input.buffer.percent配置JVM堆内存中有多少比例可以用于存放map任务的输出结果
1.接上面第4步,Reduce任务的JVM内存缓冲区中数据达到配置的阈值时
这些数据会在内存中被合并,并写入机器磁盘