• Spark - 一文搞懂 Partitioner


    一.引言

    spark 处理 RDD 时提供了 foreachPartition 和 mapPartition 的方法对 partition 进行处理,一个 partition 内可能包含一个文件或者多个文件的内容,Partitioner 可以基于 pairRDD 的 key 实现自定义 partition 的内容。

    Partitioner 函数最基本的两个方法是 numPartitions 和 getPartition(key: Any):

    A.numPartitions: 获取总的分区数

    B.getPartition:

    根据 key 获取当前 partition 对应的分区数目,范围在 [0, numPartitions-1],这里的 partitionId 与 TaskContext.getPartitionId 的值一致,通过 hash(key) 得到 int 的 partitionNum 变量,相同 partitonNum 的 key 对应的 paidRDD 将分到同一个 partition 内处理

    常见的 Partition 分区类型有如下几种:

    分区函数分区方法
    HashPartitioner根据 hash(key) 分区
    RangePartitioner根据 Range 边界分区
    Partitioner根据自定义规则分区

    二.HashPartitioner

    1.源码分析

    hashPartitioner 基于 Object.hashcode % partitionNum 进行分区,需要注意 partitionNum 的值是需要 >= 0 的,partiionNum 的获取通过 getPartition 函数内的 nonNegativeMod 函数实现

    nonNegativeMod 在实现 hashCode % partitionNum 的基础上增加了非负性的要求,因为 partitionNum 是大于等于 0 的数目:

    2.代码测试

    1. val testRdd = sc.parallelize((0 until 500).toArray.zipWithIndex)
    2. val partitionNum = 5
    3. testRdd.partitionBy(new HashPartitioner(partitionNum)).foreachPartition(partition => {
    4. if (partition.nonEmpty) {
    5. val info = partition.toArray.map(_._1)
    6. val taskId = TaskContext.getPartitionId()
    7. info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} %$partitionNum=${num.hashCode() % 5}"))
    8. }
    9. })

    这里将 0-499 共 500 个数字 zipWithIndex 生成 pairRdd 并通过 HashPartitioner 生成 5 个 Partition,通过 TaskContext 获取 partitionId,为了日志一一打印,这里采用 local[1] 的配置 : 

    val conf = new SparkConf().setAppName("PartitionTest").setMaster("local[1]")
    

    可以看到红框内同一个 TaskId 对应的 partition 内的 key 都具有相同的 mod 值,所以被分到同一分区。 

    3.repartition

    正常使用的 repartition 函数采用 HashPartitioner 函数作为默认分区函数,下面尝试一下:

    1. println("=============================repartition=============================")
    2. testRdd.repartition(5).foreachPartition(partition => {
    3. if (partition.nonEmpty) {
    4. val info = partition.toArray.map(_._1)
    5. val taskId = TaskContext.getPartitionId()
    6. info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} %$partitionNum=${num.hashCode() % 5}"))
    7. }
    8. })

    与上面不同的是 taskId 有区别,但是相同 mod 的 key 仍然会分到同一分区:

     

    三.RangePartitioner

    1.源码分析

    RangePartitioner 根据范围将元素大致均匀的分配至不同分区 partition,范围通过传入 RDD 的内容采样来确定。 

    除了 partitions 的参数外,RangePartitioner 还需要将待分区的 rdd 传入供随机采样生成 rangeBounds 使用,相比于 HashPartition 直接 hashCodes % partitionNum 的操作,RangePartitioner 分区共分两步:

    A.获取分区 Boundary

    需要采样的分区样本大小上线为 1m,转换为 double 避免精度溢出,第一个 else 逻辑内考虑如果一个分区内包含的项目数远远超过平均数,则从中重新采样,以确保该分区能够收集到足够的采样数目,最下面的 if 函数使用所需的采样概率对不平衡分区重新采样,最终得到分区的边界,这里可以抽空单独拎出来研究一下。举个例子大致理解下,假如所有 partition 内的 key 的范围是 0-500,随机生成5个分区,则生成 101-203-299-405 这样的区间,每一个数字代表其分区的上界,例如分区0的上界为 101,分区1的上界为 203,依次类推,最终生成 5 个分区。

    B.根据Boundary获取分区

    如果分区数组长度不大于 128,则进行简单的暴力循环搜索,如果超过 128,则进行二分查找,同时提供根据 ascending 参数决定 partitionId 的顺序或逆序。这与之前 Spark-Scala 数据特征分桶时采用的优化策略一致,有兴趣可以看看:Scala - 数值型特征分桶

     

    2.代码测试

    1. val testRdd = sc.parallelize((0 until 500).toArray.zipWithIndex)
    2. testRdd.partitionBy(new RangePartitioner(5, testRdd)).foreachPartition(partition => {
    3. if (partition.nonEmpty) {
    4. val info = partition.toArray.map(_._1)
    5. val taskId = TaskContext.getPartitionId()
    6. info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} length: ${info.length}"))
    7. }
    8. })

    依然使用 500 个纯数字 RDD 进行 range 分区的测试,为了验证大致均分的思想,这里最后不再打印 mod 结果,转而打印每个 partition 内元素的数量,可以看到这次每组数量不像之前 HashPartitioner 得到的一样均匀,而是介于 500/5=100 的上下,但是总数为 500。

     

    四.SelfPartitioner

    1.源码分析

    自定义 Partitioner 主要实现下述两个功能,上面也提到了,再简单补充下:

    numPartitions: 获取总的分区数

    getPartition: 获取 key 对应的分区 id

     

    2.代码实现

    A.SelfPartitioner

    基于上面 RangePartitioner 分区不均匀的情况,我们采用 SelfParitioner 自定义分区的方式实现均匀分区,这里偷懒直接生成了对应的上界 boundary,实际场景中 boundary 应该基于 partitionNum 的数量动态生成,getPartition 函数内引入了 break 机制,不熟悉的同学可以移步:Scala - 优雅的break,随后就是基础的暴力循环,如果找到上界则返回上界对应的 index 作为分区 id。

    1. import scala.util.control.Breaks._
    2. class SelfPartition(partitionNum: Int) extends Partitioner {
    3. val boundary: Array[(Int, Int)] = Array(100, 200, 300, 400, 500).zipWithIndex
    4. override def numPartitions: Int = partitionNum
    5. override def getPartition(key: Any): Int = {
    6. val keyNum = key.toString.toInt
    7. var partitionNum = 0
    8. breakable(
    9. boundary.foreach(bound => {
    10. if (keyNum < bound._1) {
    11. partitionNum = bound._2
    12. break()
    13. }
    14. })
    15. )
    16. partitionNum
    17. }
    18. }

    B.运行结果

    1. val testRdd = sc.parallelize((0 until 500).toArray.zipWithIndex)
    2. testRdd.partitionBy(new SelfPartition(5)).foreachPartition(partition => {
    3. if (partition.nonEmpty) {
    4. val info = partition.toArray.map(_._1)
    5. val taskId = TaskContext.getPartitionId()
    6. info.take(3).map(num => println(s"taskId: $taskId num: $num hashNum: ${num.hashCode()} length: ${info.length}"))
    7. }
    8. })

    通过 SelfPartitioner 分区后可以看到 0-499 共 500 个元素被均匀分配到 5 个 Partition 内,除了最简单的 boundary 方法分区外,也可以自定义 hash 方法,key 的类型默认为 Any,如果 key 不是 scala 的基本数据类型,则使用 key.asInstanceOf[T] 转换即可。 

     

    五.repartitionAndSortWithinPartitions

    1.函数分析

    除了正常的分区需求外,spark 还提供 repartitionAndSortWithinPartitions 函数,该函数根据给定的分区器 Partitioner 进行分区划分得到新的 RDD,并根据每个键进行排序,使得 RDD 中的数据保持一定顺序,该方法比 repartition + sorting 更加高效,因为它把排序机制放入了 shuffle 的过程中。 

    源码中该方法位于 OrderedRddFunctions 类内,只支持传入分区函数 Partitioner,ordering 排序规则需要使用 implict 传入隐函数的方法定义:

    对于需要分区的 key: Any,需要定义隐函数保证其实现 Ordering 接口才能实现分区后排序,否则只能分区没有排序。 

    2.代码实现

    A.分区排序依据

    在分区函数的基础上,增加了 Ordering 隐函数,这里 Partitioner 函数仍然负责根据 key 得到分区 Id,和上面不同的时,分区的同时对 rdd 进行 shuffle,其中 order 的规则由隐函数给出,这里通过比较二者的分数进行排序,如果想要逆序只需要添加负号即可 -(x.score - y.score)。

    1. // 学生类
    2. case class Student(name: String, score: Int)
    3. // 隐函数-Ordering
    4. implicit object StudentOrdering extends Ordering[Student] {
    5. override def compare(x: Student, y: Student): Int = {
    6. x.score - y.score
    7. }
    8. }
    9. class SelfSortPartition(partitionNum: Int) extends Partitioner {
    10. val boundary: Array[(Int, Int)] = Array(100, 200, 300, 400, 500).zipWithIndex
    11. override def numPartitions: Int = partitionNum
    12. override def getPartition(key: Any): Int = {
    13. val stuKey = key.asInstanceOf[Student]
    14. val keyNum = stuKey.name.toInt
    15. var partitionNum = 0
    16. breakable(
    17. boundary.foreach(bound => {
    18. if (keyNum < bound._1) {
    19. partitionNum = bound._2
    20. break()
    21. }
    22. })
    23. )
    24. partitionNum
    25. }
    26. }

    B.主函数

    这里使用 0-499 的 String 类型作为学生的编号,Score 则采取 math.random x 100 进行模拟,分区使用 Student 的 name id,所以每个元素的分区不变,变的是每个元素的顺序。

    1. println("=============================SortPartition=============================")
    2. val studentRdd = sc.parallelize((0 until 500).toArray.map(num => (Student(num.toString, (math.random * 100).toInt), true)))
    3. studentRdd.take(5).foreach(println(_))
    4. studentRdd.repartitionAndSortWithinPartitions(new SelfSortPartition(5)).foreachPartition(partition => {
    5. if (partition.nonEmpty) {
    6. val taskId = TaskContext.getPartitionId()
    7. println("===========================")
    8. partition.toArray.take(5).map(stu => {
    9. println(s"TaskId: ${taskId} Name: ${stu._1.name} Score: ${stu._1.score}")
    10. })
    11. }
    12. }

    由于使用 x.score - y.score 顺序计数,所以按分数从小到大排序:

    这一届是带过最差的学生,咋还能考0分。

    C.其他写法

    除了 StudentOrdering 的写法,也可以采用直接 Object Student 的写法,这里 A <: Student 表示任何 A 的子类都支持该隐式调用,关于 <: 相关知识可以参考:Scala Generic 泛型类详解 - T

    1. object Student {
    2. implicit def orderingByGradeStudentScore[A <: Student]: Ordering[A] = {
    3. Ordering.by(stu => stu.score)
    4. }
    5. }
    6. implicit object StudentOrdering extends Ordering[Student] {
    7. override def compare(x: Student, y: Student): Int = {
    8. x.score - y.score
    9. }
    10. }

    如果想要支持多重排序,可以在元祖内增加多个字段,会优先比较 name 再比较 score,以此类推,同理如果想要逆序,例如根据分数逆序排列,则改为 (stu.name,-1 * stu.score)

    1. object Student {
    2. implicit def orderingByGradeStudentScore[A <: Student]: Ordering[A] = {
    3. Ordering.by(stu => (stu.name, stu.score))
    4. }
    5. }

    如果对应的分区 key 没有实现 implict 的比较隐函数,则函数会直接报灰,无法编译:

     

    六.总结

    Partitioner 的一般用法大致就这些,除了三种 HashPartitioner 函数外,Spark 也可以通过 repartitionAndSortWithinPartitions 实现分区 + 排序的需求,总体来说,Partitioner 支持用户自定义分区规则去规划任务的 task 需要处理什么样的 partition 数据,对于精细化处理和区域化定制的需求十分方便,除此之外,一些需要顺序处理的数据或者顺序存储的数据,通过 SortWithinPartitions 的方法也可以提高效率,非常奈斯👍。最后继续感叹命名的抽象性,partition -  分片、隔墙,现在看到屏风就像看到了 RDD。

  • 相关阅读:
    服务器安装mysql后无法远程连接
    一线互联网大厂普遍使用的Docker,掌握这套面试题,让领导主动涨薪
    FFmpeg 中 Filters 使用文档介绍
    一、软件工程概述+练习题
    简析 Linux 的 CPU 时间
    Greenplum数据库数据分片策略Hash分布——GUC gp_use_legacy_hashops
    关于笔试编程题被坑的输入问题,acm模式下的python输入究竟如何写?
    Redis 清空数据库
    springboot项目作为静态文件服务器
    真实感渲染:课程介绍
  • 原文地址:https://blog.csdn.net/BIT_666/article/details/125543359