Shuffle,中文的意思就是洗牌。之所以需要Shuffle,是因为具有某种共同特征的一类数据需要最终汇聚(aggregate)到一个计算节点上进行计算。这些数据分布在各个存储节点上并且由不同节点的计算单元处理。以最简单的Word Count为例,其中数据保存在Node1、Node2和Node3;经过处理后,这些数据最终会汇聚到Nodea、Nodeb处理,如下图所示。
这个数据重新打乱然后汇聚到不同节点的过程就是Shuffle。但是实际上,Shuffle过程可能会非常复杂:
1)数据量会很大,比如单位为TB或PB的数据分散到几百甚至数千、数万台机器上。
2)为了将这个数据汇聚到正确的节点,需要将这些数据放入正确的Partition,因为数据大小已经大于节点的内存,因此这个过程中可能会发生多次硬盘续写。
3)为了节省带宽,这个数据可能需要压缩,如何在压缩率和压缩解压时间中间做一个比较好的选择?
4)数据需要通过网络传输,因此数据的序列化和发序列化也变得相对复杂。
一般来说,每个Task处理的数据可以完全载入内存(如果不能,可以减小每个Partition的大小),因此Task可以做到在内存中计算。除非非常复杂的计算逻辑,否则为了容错而持久化中间的数据是没有太大收益的,毕竟中间某个过程出错了可以从头开始计算。但是对于Shuffle来说,如果不持久化这个中间结果,一旦数据丢失,就需要重新计算依赖的全部RDD,因此有必要持久化这个中间结果。
一句话总结:Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂。
在DAG调度的过程中,Stage阶段的划分是根据是否有shuffle过程,也就是存在Shuffle Dependency宽依赖的时候,需要进行shuffle,这时候会将作业job划分成多个Stage;并且在划分Stage的时候,构建Shuffle Dependency的时候进行shuffle注册,获取后续数据读取所需要的Shuffle Handle,最终每一个job提交后都会生