• 2023_Spark_实验十二:Spark高级算子使用


    掌握Spark高级算子在代码中的使用
    相同点分析
    三个函数的共同点,都是Transformation算子。惰性的算子。
    不同点分析
    map函数是一条数据一条数据的处理,也就是,map的输入参数中要包含一条数据以及其他你需要传的参数。
    mapPartitions函数是一个partition数据一起处理,也即是说,mapPartitions函数的输入是一个partition的所有数据构成的“迭代器”,然后函数里面可以一条一条的处理,在把所有结果,按迭代器输出。也可以结合yield使用效果更优。
    rdd的mapPartitions是map的一个变种,它们都可进行分区的并行处理。
    两者的主要区别是调用的粒度不一样:map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。
    mapPartitionsWithIndex函数,其实和mapPartitions函数区别不大,因为mapPartitions背后调的就是mapPartitionsWithIndex函数,只是一个参数被close了。mapPartitionsWithIndex的函数可以或得partition索引号;
    假设一个rdd有10个元素,分成3个分区。如果使用map方法,map中的输入函数会被调用10次;而使用mapPartitions方法的话,其输入函数会只会被调用3次,每个分区调用1次。
    mapPartitionsWithIndex则是带上分区下标进行操作。
    1、mapPartitionWithIndex
     
    1. import org.apache.spark.{SparkConf,SparkContext}
    2. object mapPartitionWithIndex {
    3. def getPartInfo:(Int,Iterator[Int]) => Iterator[String] = (index:Int,iter:Iterator[Int]) =>{
    4. iter.map(x =>"[ PartId " + index +", elems: " + x + " ]")
    5. }
    6. def main(args: Array[String]): Unit = {
    7. val conf = new SparkConf().setMaster("local").setAppName("RddMapPartitionsWithIndexDemo")
    8. val sc = new SparkContext(conf)
    9. val rdd1 = sc.parallelize(List(1,2,3,4,5,9,6,7,8),numSlices =3 )
    10. val rdd2 = rdd1.mapPartitionsWithIndex(getPartInfo)
    11. rdd2.collect().foreach(println)
    12. }
    13. }

    2、aggregate
    首先我们来创建一个 RDD
    1. scala> val rdd1 = sc.parallelize(1 to 5)
    2. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :24
    3. scala> rdd1.collect
    4. warning: there was one feature warning; re-run with -feature for details
    5. res24: Array[Int] = Array(1, 2, 3, 4, 5)

    这个 RDD 仅有 1 个分片,包含 5 个数据: 1, 2, 3, 4, 5 。
     
    然后我们来应用一下 aggregate 方法。
     
    在使用 aggregate 之前,我们还是先定义两个要给 aggregate 当作输入参数的函数吧。
    1. scala> 
    2. // Entering paste mode (ctrl-D to finish)
    3. def pfun1(p1: Int, p2: Int): Int = {
    4. p1 * p2
    5. }
    6. // Exiting paste mode, now interpreting.
    7. pfun1: (p1: Int, p2: Int)Int
    8. scala>
    9.  
    10. scala> 
    11. // Entering paste mode (ctrl-D to finish)
    12. def pfun2(p3: Int, p4: Int): Int = {
    13. p3 + p4
    14. }
    15. // Exiting paste mode, now interpreting.
    16. pfun2: (p3: Int, p4: Int)Int
    17. scala>
    接着是第 2 个函数。就不再解释什么了。
     
    然后终于可以开始应用我们的 aggregate 方法了。
    1. scala> rdd1.aggregate(3)(pfun1, pfun2)
    2. res25: Int = 363
    3. scala>
    输出结果是 363 !这个结果是怎么算出来的呢?
     
    首先我们的 zeroValue 即初值是 3 。然后通过上面小节的介绍,我们知道首先会应用 pfun1 函数,因为我们这个 RDD 只有 1 个分片,所以整个运算过程只会有一次 pfun1 函数调用。它的计算过程如下:
     
    首先用初值 3 作为 pfun1 的参数 p1 ,然后再用 RDD 中的第 1 个值,即 1 作为 pfun1 的参数 p2 。由此我们可以得到第一个计算值为 3 * 1 = 3 。接着这个结果 3 被当成 p1 参数传入,RDD 中的第 2 个值即 2 被当成 p2 传入,由此得到第二个计算结果为 3 * 2 = 6 。以此类推,整个 pfun1 函数执行完成以后,得到的结果是  3 * 1 * 2 * 3 * 4 * 5 = 360 。这个 pfun1 的应用过程有点像是 “在 RDD 中滑动计算” 。
    在 aggregate 方法的第 1 个参数函数 pfun1 执行完毕以后,我们得到了结果值 360 。于是,这个时候就要开始执行第 2 个参数函数 pfun2 了。
     
    pfun2 的执行过程与 pfun1 是差不多的,同样会将 zeroValue 作为第一次运算的参数传入,在这里即是将 zeroValue 即 3 当成 p3 参数传入,然后是将 pfun1 的结果 360 当成 p4 参数传入,由此得到计算结果为 363 。因为 pfun1 仅有一个结果值,所以整个 aggregate 过程就计算完毕了,最终的结果值就是 363 
    1. import org.apache.spark.{SparkConfSparkContext}
    2. object RddAggregateDemo {
    3. def main(args: Array[String]): Unit = {
    4. val conf = new SparkConf().setMaster("local").setAppName("RddDemos")
    5. val sc = new SparkContext(conf)
    6. val rdd1 = sc.parallelize(List("12","23","345",""),numSlices = 2)
    7. //下面代码等价与rdd1.aggregate("")(x+y) => math.min(x.length,y.length)),(x,y)=>x+y))
    8. val result = rdd1.aggregate("")(func1,func2)
    9. println(result)
    10. }
    11. //(x,y)=>math.min(x.Length,y.length)
    12. def func1:(String,String) => String =(x:String,y:String) =>{
    13. println(" + x + ",x.len: " + x.length + ">, + y +",y.len: " + y.length + ">")
    14. val ret =math.min(x.length,y.length).toString
    15. println("func1 ret:" +ret)
    16. ret
    17. }
    18. //(x+y) => x+y
    19. def func2:(String,String)=>String=(x:String,y:String) =>{
    20. println("========" +(x+y))
    21. x+y
    22. }
    23. }

    3、aggregateByKey
    通过scala集合以并行化方式创建一个RDD
    scala> val pairRdd = sc.parallelize(List((“cat”,2),(“cat”,5),(“mouse”,4),(“cat”,12),(“dog”,12),(“mouse”,2)),2)
    pairRdd 这个RDD有两个区,一个区中存放的是:
    (“cat”,2),(“cat”,5),(“mouse”,4)
    另一个分区中存放的是:
    (“cat”,12),(“dog”,12),(“mouse”,2)
    然后,执行下面的语句
    scala > pairRdd.aggregateByKey(100)(math.max(_ , _), _ + _ ).collect
    结果:
    res0: Array[(String,Int)] = Array((dog,100),(cat,200),(mouse,200)
    下面是以上语句执行的原理详解:
    aggregateByKey的意思是:按照key进行聚合
    第一步:将每个分区内key相同数据放到一起
    分区一
    (“cat”,(2,5)),(“mouse”,4)
    分区二
    (“cat”,12),(“dog”,12),(“mouse”,2)
    第二步:局部求最大值
    对每个分区应用传入的第一个函数,math.max(_ , _),这个函数的功能是求每个分区中每个key的最大值
    这个时候要特别注意,aggregateByKe(100)(math.max(_ , _),_+_)里面的那个100,其实是个初始值
    在分区一中求最大值的时候,100会被加到每个key的值中,这个时候每个分区就会变成下面的样子
    分区一
    (“cat”,(2,5,100)),(“mouse”,(4,100))
    然后求最大值后变成:
    (“cat”,100), (“mouse”,100)
    分区二
    (“cat”,(12,100)),(“dog”,(12.100)),(“mouse”,(2,100))
    求最大值后变成:
    (“cat”,100),(“dog”,100),(“mouse”,100)
    第三步:整体聚合
    将上一步的结果进一步的合成,这个时候100不会再参与进来
    最后结果就是:
    (dog,100),(cat,200),(mouse,200)
    对PairRDD中相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey'函数最终返回的类型还是PairRDD,对应的结果是Key和聚合后的值,而aggregate函数直接返回的是非RDD的结果。
    1. import org.apache.spark.{SparkConfSparkContext}
    2. object RddAggregateByKeyDemo {
    3. def main(args: Array[String]): Unit = {
    4. val conf =new SparkConf().setMaster("local").setAppName("RddAggregateByKeyDemo")
    5. val sc = new SparkContext(conf)
    6. val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),numSlices = 2)
    7. val rdd = pairRDD.aggregateByKey(100)(func2,func2)
    8. val resultArray = rdd.collect
    9. resultArray.foreach(println)
    10. sc.stop()
    11. }
    12. def func2(index:Int,iter:Iterator[(String,Int)]):Iterator[String] = {
    13. iter.map(x=>"[partID:" + index + ",val: " +x +"]")
    14. }
    15. //(x,y)=>math.max(x+y)
    16. def func1:(Int,Int) => Int =(x:Int,y:Int) =>{
    17. println(" + x + "," +",y:"+ y + ">")
    18. val ret =math.max(x,y)
    19. println("func1 max:" +ret)
    20. ret
    21. }
    22. //(x+y) => x+y
    23. def func2:(Int,Int)=>Int=(x:Int,y:Int) =>{
    24. println("========func2 x :" + x + ",y:"+y)
    25. println("========func2 ret =====" +(x+y))
    26. x+y
    27. }
    28. }

  • 相关阅读:
    Python 潮流周刊第 42 期(摘要)+ 赠书《流畅的Python》6本
    【1++的C++进阶】之emplace详解
    【二叉树进阶】AVLTree-平衡二叉搜索树
    集Oauth2+Jwt实现单点登录
    QT:QSS自定义 QAbstractScrollArea实例
    Camunda 7.x 系列【57】流程设计器
    Server - Kubernetes (K8S) 运行 PyTorchJob 的 YAML 配置
    vue3前端excel导出;组件表格,自定义表格导出;Vue3 + xlsx + xlsx-style
    Linux内存管理(六):内存模型SPARSEMEM初始化
    Java FilterWriter类的简介说明
  • 原文地址:https://blog.csdn.net/pblh123/article/details/133084778