是一个Hadoop MapReduce框架的修改版本,其目标是为了高效支持迭代,递归数据分析任务。
MapReduce做什么
Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:
一是数据或计算的规模相对原任务要大大缩小;二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;三是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reducer负责对map阶段的结果进行汇总。至于需要多少个Reducer,用户可以根据具体问题,通过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值。
一个比较形象的语言解释MapReduce:
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。

与Hadoop比较的四点改变
1.提供了一套新的编程接口,简化了迭代表达式的表达;
2.HaLoop的master节点包含了一个新的loop控制模块用于指定迭代的停止条件;
3.为迭代计算使用了一个新的任务调度器,可以将数据本地化。
4.Haloop的缓存和索引应用程序在slave节点上。
数据挖掘、网页排名、图像分析、模型拟合等超大规模的数据挖掘与分析需求的增长使得MapReduce和Dryad处理平台自然的出现。
但以上两种平台对于迭代程序的内嵌支持较为缺乏,而本文章中呈现的基于Hadoop MapReduceframework改进的HaLoop则在编程上支持迭代的应用并通过使任务调度器循环感知和添加各种缓存机制显著提高了它们的效率。
通过真实的评估,与Hadoop相比HaLoop减少了1.85倍的查询时间,而且在mapper和reducers之间shuffles的数据只有4%。
shuffles?
在处理过程中需要把mapper阶段的数据传递给reducer阶段,这个过程可以广义地称为Shuffle,是 MapReduce 框架中最关键的一个流程。
在对高度可扩展的并行数据处理平台的需求日益上升的情况下,Hadoop作为MapReduce实现,在许多地方得到应用。但Hadoop并没有直接支持这些迭代的数据分析应用,程序员必须通过手动发布多个MapReduce作业并使用驱动程序来编写执行程序从而实现迭代程序。
而在手动编写迭代程序中有两个损失性能的问题:
在网页排名、后代查询中,MapReduce作业所需要的数据(网页指向表,朋友表)是不变的,但Hadoop会在每次迭代中都被处理和shuffled。这种情况在大多数迭代的模型拟合算法、大多数网页/图像排名算法或网络查询等都有体现。
基于前两个例子中出现的问题**,HaLoop便出现了,它被设计用来高效地处理上述类型的应用。**
HaLoop扩展了MapReduce,并基于两个简单的直觉:
HaLoop继承了Hadoop的基本分布式计算模型和体系结构,依靠分布式文件系统(HDFS)存储每个作业的输入和输出。
HaLoop(集群)主要由两部分组成:
一个master结点
客户端提交作业到master结点上。对于每个被提交的作业,master都会安排一些并行任务在slave节点上运行。
多个slave结点
每个slave节点都有一个任务跟踪守护进程,用于与主节点通信并管理每个任务的执行。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7ShSot8R-1656124723417)(https://s3-us-west-2.amazonaws.com/secure.notion-static.com/e54ae24b-0f43-4706-a496-96a3202492ce/Untitled.png)]
在这张示例图中,有3个job,每个job有3个tasks在slave节点上运行。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-s15RpQ6m-1656124723418)(https://s3-us-west-2.amazonaws.com/secure.notion-static.com/50f1dc0b-78bc-4ea7-908d-e1db9eff4fb0/Untitled.png)]](https://1000bd.com/contentImg/2022/06/27/023809165.png)
HaLoop的迭代任务被抽象为:
∗ ∗ R 0 **R_0 ∗∗R0代表初始的结果, R i R_i Ri代表第 i i i次迭代结果, L L L 是迭代计算中保持不变的数据。当迭代达到固定点(当结果在相邻两次迭代都不改变时) R i + 1 = R i R_{i+1}=R_i Ri+1=Ri时,程序将终止。**
fixpoint?
典型地,fixpoint被定义为连续两次迭代恰恰相等的点,但是HaLoop也支持近似fixpoint的概念,就是说当连续两次迭代的差小于用户设定的阈值或达到最大的迭代次数时,计算停止。在机器学习和复杂的分析中,两种近似fixpoints对于表示收敛条件是非常有用的。例如对于PageRank使用用户设定的阈值或固定的迭代次数作为循环终止的条件是很常见的。
HaLoop的主要编程接口:
Map :将输入的
<
k
e
y
,
v
a
l
u
e
>
<key,value>
<key,value>元组转化为
<
i
n
k
e
y
,
i
n
v
a
l
u
e
>
<in_key,in_value>
<inkey,invalue>元组
Reduce:处理中间元组,将相同
i
n
_
k
e
y
in\_key
in_key的元组合并输出
<
o
u
t
k
e
y
,
o
u
t
v
a
l
u
e
>
<out_key,out_value>
<outkey,outvalue>。接口包括用来缓存不变值的新参数,这个不变值与in_key关联。
AddMap AddReduce表示由超过一个MapReduce步骤组成的循环体。AddMap(AddReduce)通过一个整数表示步骤的顺序来关联一个Map(Reduce)函数。
HaLoop默认测试两次迭代是否相等来决定是否要停止计算。为了指定一个近似fixpoin的终止条件,程序员可以使用下面的函数:
SetFixedPointThreshold:在连续两次迭代的距离上设置一个界限。如果这个界限被超越,近似fixpoint还没有到达,计算还要继续。
ResultDistance:函数计算了两个具有相同out_key的out_value集合的距离。
SetMaxNumOfIterations:提供了循环终止条件的进一步控制。如果已经执行到最大的迭代次数,HaLoop就会终止一个作业。
SetIterationInput:关联一个输入源与一个特定的迭代。
AddStepInput:关联循环体中额外的输入源和一个中间map-reduce对。
AddInvariantTable:指定一个循环不变的输入表(一个HDFS文件)。在作业执行期间,HaLoop将在集群结点上缓存这张表。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZaTImD5D-1656124723419)(https://s3-us-west-2.amazonaws.com/secure.notion-static.com/91747a83-86c7-414b-9bd5-7acbddf77408/Untitled.png)]
在HaLoop中,用户程序指定循环集,框架控制循环执行,但在Hadoop中,控制循环是应用程序的责任。
这节介绍了HaLoop任务调度器。该调度器给迭代程序提供了比Hadoop调度器更好的调度。
在论文中的定义:

HaLoop任务调度的目标就是实现任务具有迭代的局部性。
HaLoop的目标是将数据映射放在同一台物理机器上,使得数据可以更加容易的在迭代间缓存并重用。
具有inter-iteration locality性质的调度提供了从过去的迭代再利用循环不变数据的可行性。因为关系图L在迭代中不变,其被缓存并在每次迭代中重复使用。
而mappers M10,M11会有相同的结果。没有必要重新计算这些mapper的输出,也没有必要将它们传达给reduce。
HaLoop的调度器跟踪每个物理机器上的被每个map和reduce任务处理的数据分区。它使用这个信息来调度随后的任务,调度的重点正是考虑inter-iteration locality。
由于任务的inter-iteration locality,一个物理结点通常只需访问特定的循环不变数据分区。为了减少I/O代价,HaLoop在物理结点的本地磁盘上缓存了这些数据分区,以便随后的再利用。为了加快处理速度,HaLoop索引了缓存的数据。如果一个缓存变得不可用了,它会自动从map任务所在的物理结点或HDFS重新加载。
HaLoop支持三种类型的缓存:reducer输入缓存,reducer输出缓存,和mapper输入缓存。程序员可以通过HaLoop API选择开启或关闭一个缓存类型。
**适合迭代间缓存的文件是那些不由任何作业产生的、在多次迭代中由同一步骤重复读取的文件。**实验结果表明,即使只读取两次的文件,对其缓存也是值得的。
在一个迭代作业执行之前,主节点通过检查整个作业的依赖图来识别缓存机会。通过构造**有向偶图,HaLoop通过一些条件确定第一个迭代中的哪个步骤写入缓存,以及接下来的迭代中哪个(些)步骤读取缓存。**
reducer输入缓存避免了在多次迭代中使用同一映射器重新处理相同数据。对于那些通过循环不变量检测过程(或通过HaLoop API AddInvariantTable)确定为可缓存的数据集,HaLoop将缓存所有还原器的还原输入,并为缓存的数据创建本地索引。
分解器输入缓存适用于PageRank、HITS、各种递归关系查询,以及任何其他针对大型不变数据的重复连接算法。还原器输入缓存要求每个映射器输出元组t的分区函数f满足三个条件:
Reducer输出缓存在每个reducer结点上存储和索引最近的本地输出。这个缓存被用来减少评估fixpoint终止条件的开销。就是说,如果application必须通过比较当前的迭代输出与上一次迭代的输出来测试收敛的条件,reducer输出缓存使框架能够用分布式的方式去比较。
Reducer输出缓存被用在需要在每次迭代后对fixpoint进行评估的应用上。比如,在PageRank中,用户可能会设置这样一个收敛条件,指定整个排名在相邻两次迭代的差别低于设定好的阈值。有了reducer输出缓存,不需要分离的MapReduce步骤,fixpoint就可以用分布式的方式去评估。在所有Reduce函数完成之后,每个reducer在reduce过程中评估fixpoint的条件并把这个本地的评估结果报告给master结点,master结点算出最后的应答。
Hadoop尝试将任务与其输入数据放在同一位置。在现实世界中的Hadoop集群上,数据本地映射器的速率大约为60–95%,甚至更低。HaLoop的mapper输入缓存是为了避免在非初始迭代期间mapper的非本地化数据读取。在第一次迭代中,如果一个mapper非本地化读取了一个拆分的输入,该输入将会被缓存到mapper所在的物理结点上的本地磁盘。然后,随着循环感知任务的调度,在后续的迭代中,所有mappers只从本地磁盘中读取数据,或来自HDFS或来自本地文件系统。
Hadoop的容错机制的目标是确保单一任务的失败会引发一定量的恢复工作。一个失败的任务可以在不同的节点上被重新安排,其输入可以从分布式文件系统中重新加载(也许是访问一个冗余的副本)。其的工作量保持在一个比较小的常数内。
而在HaLoop中,一个迭代中的一个步骤出现故障,可能需要重新执行同一迭代中所有前面的步骤或所有前面的迭代中的任务。避免递归恢复是HaLoop中容错机制的一个关键设计目标。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-H1oiDKPM-1656124723420)(https://s3-us-west-2.amazonaws.com/secure.notion-static.com/b8c67d15-04f6-42ba-bc3f-d365c792f8b8/Untitled.png)]](https://1000bd.com/contentImg/2022/06/27/023809531.png)
任务失败,从属节点故障。(为了缓存,有可能首次迭代会被重新运行)
