• 2023_Spark_实验十一:RDD高级算子操作


    1. //checkpoint :
    2. sc.setCheckpointDir("hdfs://Master:9000/ck"// 设置检查点
    3. val rdd = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) // 执行wordcount任务的转换
    4. rdd.checkpoint // Mark this RDD for checkpointing.
    5. rdd.isCheckpointed
    6. rdd.count //触发计算,日志显示:ReliableRDDCheckpointData: Done checkpointing RDD 27 to hdfs://hadoop001:9000/ck/fce48fd4-d76f-
    7. 4322-8d23-6a48d1aed7b5/rdd-27, new parent is RDD 28
    8. rdd.isCheckpointed // res61: Boolean = true
    9. rdd.getCheckpointFile // Option[String] = Some(hdfs://Master:9000/ck/b9a5add8-18d8-4056-9e8e-271d9522a29c/rdd-4)

    coalesce :

    总所周知,spark的rdd编程中有两个算子repartition和coalesce。公开的资料上定义为,两者都是对spark分区数进行调整的算子。

            repartition会经过shuffle,其实际上就是调用的coalesce(shuffle=true)。

            coalesce,默认shuffle=false,不会经过shuffle。

            当前仅针对coalesce算子考虑,我们看一下官方的定义:

            大概意思为:如果你想要从1000个分区到100个分区,并且不经过shuffle,近乎平均分配10个父分区到1个子分区。

            首先我说下我个人简单理解:不经过shuffle,就意味着coalesce算子前后都是在一个stage中的。从该stage开始到coalesce算子之前的任务的迭代执行的并行度都是1000,从coalesce算子开始到该stage结束的任务的迭代执行的并行度都是100。

    1. val rdd1 = sc.parallelize(1 to 1010)
    2. // 重新分区,分为两个2 ,不产生shuffle
    3. val rdd2 = rdd1.coalesce(2, false)
    4. // 获取新的RDD分区数
    5. rdd2.partitions.length
    6. def func1(index:Int,iter:Iterator[Int]):Iterator[String] = {
    7. iter.toList.map(x=>"[PartID:"+index + ",value=" + x +"]").iterator
    8. }
    9. // 查看分区后的结果:
    10. rdd2.mapPartitionsWithIndex(func1).collect
    11. repartition:
    12. val rdd1 = sc.parallelize(1 to 104)
    13. val rdd2 = rdd1.repartition(5)

    collect、toArray

    RDD转换为Scala的数组。

    collectAsMap

    与collect、toArray相似。collectAsMap将key-value型的RDD转换为Scala的map。

    注意:map中如果有相同的key,其value只保存最后一个值。

    1. # 创建一个2分区的RDD
    2. scala> var z = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
    3. z: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[129] at parallelize at :21
    4. # 输出所有分区的数据
    5. scala> z.collect
    6. res44: Array[(String, Int)] = Array((cat,2), (cat,5), (mouse,4), (cat,12), (dog,12), (mouse,2))
    7. # 转化为字典
    8. scala> z.collectAsMap
    9. res45: scala.collection.Map[String,Int] = Map(dog -> 12, cat -> 12, mouse -> 2)
    10. scala>

    1. collectAsMap
    2. val rdd = sc.parallelize(List(("a"1), ("b"2)))
    3. rdd.collectAsMap
    4. //res2: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)

    combineByKey与aggregateByKey相比较:

    1. 1.相同点:

    • 两者都能映射key值分别进行分区内计算和分区间计算。

        2. 不同点:

    • combineByKey有三个参数列表而且不需要初始值,而aggregateByKey只有两个参数列表且需要初始值。

    aggregateByKey分区内计算示意图

    1. //aggregateByKey存在函数颗粒化,有两个参数列表
    2. //第一个参数列表,需要传递一个参数,表示为初始值
    3. // 主要当碰见第一个key时候,和value进行分区内计算
    4. //第二个参数列表,需要传递2个参数
    5. // 第一个参数表示分区内计算
    6. // 第二个参数表示分区间计算
    7. rdd.aggregateByKey(zeroValue = 0)(
    8. (x, y) => math.max(x, y),
    9. (x, y) => x + y
    10. ).collect().foreach(println)

    combineByKey分区内计算示意图

    1. //combineByKey方法需要三个参数:
    2. //第一个参数表示:将相同key的第一个数据进行结构转换,实现操作
    3. //第二个参数:分区内的计算规则
    4. //第三个参数:分区间的计算规则
    5. val newRDD: RDD[(String,(Int,Int))] = rdd.combineByKey(v=>(v,1),
    6. (t:(Int,Int),v:Int)=> {t._1 + v, t._2 + v},
    7. (t1:(Int,Int),t2:(Int,Int))=>{t1._1+t2._1,t1._2 + t2._2})

    combineByKey与aggregateByKey两者的核心区别,就是在组内初始计算时少许不同。

    combineByKey // 是在这个PairRDDFunctions类下的方法

    1. val rdd1 = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_, 1))
    2. val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) // 等价与reduceByKey(_ + _),结果:Array[(String, Int)] =
    3. Array((is,1), (Giuyang,1), (love,2), (capital,1), (Guiyang,1), (I,2), (of,1), (Guizhou,2), (the,1))
    4. rdd2.collect
    5. val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) // 将每个key的value各自加10,结果:Array[(String, Int)]
    6. rdd3.collect
    7. val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
    8. val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
    9. val rdd6 = rdd5.zip(rdd4)

    一、Rdd行动算子

    1、【countByKey】统计存储在rdd中元组的key的个数,key是相同的就会进行计数+1。通过这个key 会生成一个Map Map中的key是原有中的key,value是原有key的个数;

    2、【countByValue】统计在Rdd中存储元素的个数。会将rdd中每一个元组看作为一个value,若这个元组中元素是相同的,此时就会将生成Map中的value+1;

    3、【filterByRange】对rdd中的元素过滤,并返回指定内容的数据。该函数作用于键值对RDD,对RDD中的元素进行过滤,返回键在指定范围中的元素;

    4、【flatMapValues】主要是对存在元组中的value进行扁平化处理;

    countByKey // 计算每个键出现的次数

    1. val rdd1 = sc.parallelize(List(("a"1), ("b"2), ("b"2), ("c"2), ("c"1)))
    2. rdd1.countByKey
    3. rdd1.countByValue  // countByValue返回每个值的出现次数

    filterByRange  // 【filterByRange】对rdd中的元素过滤,并返回指定范围的内容数据

    1. val rdd1 = sc.parallelize(List(("e"5), ("c"3), ("d"4), ("c"2), ("a"1)))
    2. val rdd2 = rdd1.filterByRange("b""d")
    3. rdd2.collect

    flatMapValues

    1. val rdd3 = sc.parallelize(List(("a""1 2"), ("b""3 4")))
    2. rdd3.flatMapValues(_.split(" "))

    foldByKey

    函数原型:

    1. def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
    2. def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
    3. def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

     作用:将RDD[K,V]根据K将V做折叠、合并处理,zeroValue作为初始参数,调用func得到V,

    再根据Key按照func对V进行调用。

    例子:

    1. scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2)))
    2. rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :27
    3. scala> rdd1.foldByKey(0)(_+_).collect
    4. res3: Array[(String, Int)] = Array((A,2), (B,3))

    说明: 将0应用到_+_上,Array(("A",0+0),("A",2+0)) 再进一步处理得到Array(("A",0+2))最终得到Array(("A",2))

    foldByKey

    函数原型:

    1. def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
    2. def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
    3. def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

     作用:将RDD[K,V]根据K将V做折叠、合并处理,zeroValue作为初始参数,调用func得到V,

    再根据Key按照func对V进行调用。

    例子:

    1. scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2)))
    2. rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :27
    3. scala> rdd1.foldByKey(0)(_+_).collect
    4. res3: Array[(String, Int)] = Array((A,2), (B,3))

    说明: 将0应用到_+_上,Array(("A",0+0),("A",2+0)) 再进一步处理得到Array(("A",0+2))最终得到Array(("A",2))

    foldByKey

    1. val rdd1 = sc.parallelize(List("dog""wolf""cat""bear"), 2)
    2. val rdd2 = rdd1.map(x => (x.length, x))
    3. val rdd3 = rdd2.foldByKey("")(_+_)
    4. val rdd = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_, 1))
    5. rdd.foldByKey(0)(_+_)

    foreachPartition  // foreachPartition是spark-core的action算子,该算子源码中的注释是:Applies a function func to each parition of this RDD.(将函数func应用于此RDD的每个分区)

    1. val rdd1 = sc.parallelize(List(123456789), 3)
    2. rdd1.foreachPartition(x => println(x.reduce(_ + _)))

    keyBy

    1. val rdd1 = sc.parallelize(List("dog""salmon""salmon""rat""elephant"), 3)
    2. val rdd2 = rdd1.keyBy(_.length)
    3. rdd2.collect

    keys values

    1. val rdd1 = sc.parallelize(List("dog""tiger""lion""cat""panther""eagle"), 2)
    2. val rdd2 = rdd1.map(x => (x.length, x))
    3. rdd2.keys.collect
    4. rdd2.values.collect

  • 相关阅读:
    sass 骚用笔记
    springboot服务和python服务如何自定义启动banner
    南阳市卧龙区中医院综合楼施工组织设计及投标报价
    Vue学习笔记
    D数树,牛客小白月赛78,思维
    文件服务器 — File Browser
    C++ 之多态总结
    电力巡检/电力抢修行业解决方案:AI+视频技术助力解决巡检监管难题
    JDK1.8更便捷获取时间的方法:LocalDateTime、LocalDate、LocalTime、Period
    【Linux 基础】df -h 的输出信息解读
  • 原文地址:https://blog.csdn.net/pblh123/article/details/133081483