• Spark RDD 转换算子


    RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 类型

    一、Value 类型

    1.map

    函数签名:def map[U: ClassTag](f: T => U): RDD[U]

    将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

    代码:

    1. object RddMapTest {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    4. val sc = new SparkContext(sparkConf)
    5. var rdd = sc.makeRDD(List(1,2,3,4))
    6. //var rdd2 = rdd.map(x=>x*2)
    7. var rdd2 = rdd.map(_*2)
    8. rdd2.saveAsTextFile("outMap")
    9. }
    10. }

    2.mapPartitions

    函数签名 def mapPartitions[U: ClassTag](

    f: Iterator[T] => Iterator[U],

    preservesPartitioning: Boolean = false): RDD[U]

    代码:

    1. object RddMapTest2{
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setAppName("test2").setMaster("local[*]")
    4. val sc = new SparkContext(sparkConf)
    5. val rdd = sc.makeRDD(List(5,6,7,8))
    6. val rdd2 = rdd.mapPartitions(datas=>{
    7. datas.map(_*2)
    8. })
    9. rdd2.saveAsTextFile("outMap2")
    10. }
    11. }

     3.map 和 mapPartitions 的区别?

    • 数据处理角度

    Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子 是以分区为单位进行批处理操作。

    • 功能的角度

    Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。 MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据

    • 性能的角度

    Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处 理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能 不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。

    4.mapPartitionsWithIndex

    函数签名

    def mapPartitionsWithIndex[U: ClassTag](

    f: (Int, Iterator[T]) => Iterator[U],

    preservesPartitioning: Boolean = false): RDD[U] 

    代码:

    1. /**需求,RddMapTest的0分区乘与2;1分区不动
    2. *
    3. */
    4. object RddMapTest{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test3").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(List(4,5,8,9),2)
    9. val rdd2:RDD[(Int)] = rdd.mapPartitionsWithIndex((index,datas)=>{
    10. index match {
    11. case 0 => datas.map(_*2)
    12. case _ => datas
    13. }
    14. })
    15. rdd2.saveAsTextFile("outMap4")
    16. }
    17. }

     5.flatMap

    函数签名 def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

    将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

    代码:

    1. object FlatMapDemo {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    4. val sc = new SparkContext(sparkConf)
    5. val rdd = sc.makeRDD(List(List(1,2),List(3,4),List(5,6)))
    6. rdd.flatMap(list => list).collect().foreach(println)
    7. }
    8. }

     6.glom

    函数签名 def glom(): RDD[Array[T]]

    将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

    代码: 

    1. /**
    2. * 分区内取最大值,分区间最大值求和
    3. */
    4. object glomDemo {
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.textFile("input\\word.txt",3)
    9. val rdd2 = rdd.flatMap(line =>{
    10. line.split(",")
    11. })
    12. val rdd3 = rdd2.map(
    13. value=>{
    14. Integer.parseInt(value)
    15. }).glom().map(arr=>arr.max)
    16. rdd3.saveAsTextFile("out10")
    17. }
    18. }

    word.txt的数据

    1,5,6,7,100
    -1,50,14,1000
    6,5,-100,20
    

     7.groupBy

    函数签名 def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

    代码: 

    1. object groupByDemo {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    4. val sc = new SparkContext(sparkConf)
    5. val rdd = sc.textFile("input\\word.txt",2)
    6. val rdd2 = rdd.flatMap(_.split(",")).map(Integer.parseInt(_)).groupBy(_%2)
    7. rdd2.saveAsTextFile("out11")
    8. }
    9. }

    8.filter

    函数签名 def filter(f: T => Boolean): RDD[T]

    代码:

    1. /**
    2. * 过滤字符串
    3. */
    4. object FilterDemo2 {
    5. def main(args: Array[String]): Unit = {
    6. //创建SparkConfig
    7. var sparkConf = new SparkConf().setAppName("text").setMaster("local[*]")
    8. //创建SparkContext
    9. val sc = new SparkContext(sparkConf)
    10. val rdd = sc.makeRDD(Array("java","js","python","php"))
    11. rdd.filter(_.charAt(0)=='j').collect().foreach(println)
    12. }
    13. }

    9.sample

    函数签名 def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 

     代码:

    1. object SampleDemo {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    4. val sc = new SparkContext(sparkConf)
    5. val rdd:RDD[Long]=sc.range(1,100)
    6. //放回抽样
    7. // val rdd2 = rdd.sample(true,0.3)
    8. //不放回抽样
    9. val rdd2 = rdd.sample(false,0.3)
    10. print(rdd2.collect().mkString(","))
    11. }
    12. }
    13. /**
    14. *循环抽样
    15. */
    16. object SampleDemo2 {
    17. def main(args: Array[String]): Unit = {
    18. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    19. val sc = new SparkContext(sparkConf)
    20. val rdd:RDD[Long]=sc.range(1,100)
    21. for(a <- 1 to 10){
    22. //不放回抽样
    23. val rdd2 = rdd.sample(false,0.3,1)
    24. println(rdd2.collect().mkString(","))
    25. }
    26. }
    27. }
    28. /**
    29. * 数据倾斜
    30. */
    31. object SampleDemo3 {
    32. def main(args: Array[String]): Unit = {
    33. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    34. val sc = new SparkContext(sparkConf)
    35. val rdd = sc.makeRDD(Array("A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A",
    36. "A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A",
    37. "A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A",
    38. "B","B","B","B","B","B","B","B","C","D","E","F","G"))
    39. val rdd2 = rdd.map((_,1)).sample(true,0.3).reduceByKey(_+_).map(line=>(line._2,line._1)).sortBy(_._1,false).take(1)
    40. rdd2.foreach(line =>{
    41. println(line._1,line._2)
    42. })
    43. sc.stop()
    44. }
    45. }

    10.distinct

    函数签名

    def distinct()(implicit ord: Ordering[T] = null): RDD[T]

    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

    代码:

    1. object DistinctDemo {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    4. val sc = new SparkContext(sparkConf)
    5. val rdd = sc.makeRDD(Array(1,2,3,1,1,2,2,3,3,5))
    6. //第一种去重
    7. // val rdd2 = rdd.distinct()
    8. //第二种去重
    9. // val rdd2 = rdd.map(x => (x,null)).reduceByKey((x,y)=> x,8).map(_._1)
    10. //对RDD采用多个Task去重,提高并发度
    11. val rdd2 = rdd.distinct(5)
    12. rdd2.foreach(println)
    13. }
    14. }

     11.coalesce

    函数签名

    def coalesce(numPartitions: Int,

    shuffle: Boolean = false,

    partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

    (implicit ord: Ordering[T] = null) : RDD[T]

    函数说明

    根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率 当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少 分区的个数,减小任务调度成本

    代码:

    1. object coalesceDemo {//执行Shuffle
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    4. val sc = new SparkContext(sparkConf)
    5. val rdd = sc.makeRDD(Array(1,2,3,4,5,6),3)
    6. val rdd2 = rdd.mapPartitionsWithIndex((index,datas)=>{
    7. datas.foreach(data=>{
    8. println(s"${index}->${data}")
    9. })
    10. datas
    11. })
    12. rdd2.collect()
    13. //缩减分区至2个,
    14. // val rdd3 = rdd.coalesce(2)默认不Shuffle,
    15. val rdd3 = rdd.coalesce(2,true)执行Shuffle
    16. val rdd4 = rdd3.mapPartitionsWithIndex((index,datas)=>{
    17. datas.foreach(data=>{
    18. println(s"${index}->${data}")
    19. })
    20. datas
    21. })
    22. rdd4.collect()
    23. }
    24. }

    12.repartition

    函数签名 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

    函数说明

    该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。

    代码:

    1. /**
    2. * repartition()重新分区,可以大转小,小转大
    3. */
    4. object coalesceDemo2 {//执行Shuffle
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(1,2,3,4,5,6),2)
    9. val rdd2 = rdd.mapPartitionsWithIndex((index,datas)=>{
    10. datas.foreach(data=>{
    11. println(s"${index}->${data}")
    12. })
    13. datas
    14. })
    15. rdd2.collect()
    16. val rdd3 = rdd.repartition(3)//执行Shuffle
    17. val rdd4 = rdd3.mapPartitionsWithIndex((index,datas)=>{
    18. datas.foreach(data=>{
    19. println(s"${index}->${data}")
    20. })
    21. datas
    22. })
    23. rdd4.collect()
    24. }
    25. }

     13.sortBy

    函数签名 def sortBy[K]( f: (T) => K,ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

    代码: 

    1. object sortByDemo {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    4. val sc = new SparkContext(sparkConf)
    5. val rdd = sc.makeRDD(Array(1,2,3,4,5,6),3)
    6. rdd.sortBy(a=>a,false).collect().foreach(println)
    7. }
    8. }

    二、双 Value 类型

     14.intersection

    函数签名 def intersection(other: RDD[T]): RDD[T]

    代码:

    1. /**
    2. * intersection()交集
    3. * 共同的元素
    4. */
    5. object IntersectionDemo {
    6. def main(args: Array[String]): Unit = {
    7. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    8. val sc = new SparkContext(sparkConf)
    9. val rdd1 = sc.makeRDD(Array(1,2,3),3)
    10. val rdd2 = sc.makeRDD(Array(3,4,5),3)
    11. val rdd3 = rdd1.intersection(rdd2)
    12. //打印分区号和数据
    13. val rdd4 = rdd3.mapPartitionsWithIndex((index,datas)=>{
    14. datas.foreach(data=>{
    15. println(s"${index}->${data}")//1->1 2->2
    16. })
    17. datas
    18. })
    19. rdd4.collect()
    20. }
    21. }

    15.union

    函数签名 def union(other: RDD[T]): RDD[T]

    代码:

    1. object UnionDemo {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    4. val sc = new SparkContext(sparkConf)
    5. val rdd1 = sc.makeRDD(Array(1,2,3),3)
    6. val rdd2 = sc.makeRDD(Array(3,4,5),3)
    7. // rdd1.union(rdd2).collect().foreach(println)//1,2,3,3,4,5
    8. val rdd3 = rdd1.union(rdd2)
    9. // rdd3.saveAsTextFile("out\\out13")
    10. //打印分区号和数据
    11. val rdd4 = rdd3.mapPartitionsWithIndex((index,datas)=>{
    12. datas.foreach(data=>{
    13. println(s"${index}->${data}")
    14. })
    15. datas
    16. })
    17. rdd4.collect()
    18. }
    19. }

     16.subtract

    函数签名 def subtract(other: RDD[T]): RDD[T]

    代码:

    1. /**
    2. * subtract ()差集
    3. * rdd1->rdd2 1,2
    4. * rrd2 ->rdd1 4,5
    5. */
    6. object SubtractDemo {
    7. def main(args: Array[String]): Unit = {
    8. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    9. val sc = new SparkContext(sparkConf)
    10. val rdd1 = sc.makeRDD(Array(1,2,3),3)
    11. val rdd2 = sc.makeRDD(Array(3,4,5),3)
    12. val rdd3 = rdd1.subtract(rdd2)
    13. //打印分区号和数据
    14. val rdd4 = rdd3.mapPartitionsWithIndex((index,datas)=>{
    15. datas.foreach(data=>{
    16. println(s"${index}->${data}")//1->1 2->2
    17. })
    18. datas
    19. })
    20. rdd4.collect()
    21. }
    22. }

     17.zip

    函数签名 def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

    代码:

    1. object ZipDemo {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    4. val sc = new SparkContext(sparkConf)
    5. val rdd1 = sc.makeRDD(Array("zhangsan","lisi","wangwu"),3)
    6. val rdd2 = sc.makeRDD(Array(20,18,21),3)
    7. val rdd3 = rdd1.zip(rdd2)
    8. rdd3.collect().foreach(println)
    9. }
    10. }

    三、Key - Value 类型

    18.partitionBy

    函数签名 def partitionBy(partitioner: Partitioner): RDD[(K, V)]

    代码:

    1. object ParTest {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    4. val sc = new SparkContext(sparkConf)
    5. val rdd = sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c")))
    6. var rdd2 = rdd.partitionBy(new HashPartitioner(2))
    7. rdd2.saveAsTextFile("out\\out14")
    8. sc.stop()
    9. }
    10. }

     19.reduceByKey

    函数签名

    def reduceByKey(func: (V, V) => V): RDD[(K, V)]

    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

    代码:

    1. /**
    2. * reduceByKey()按照K聚合V
    3. */
    4. object ParTest2{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array((1,"a"),(1,"b"),(3,"c"),(3,"d"),(3,"k")))
    9. val rdd2 = rdd.reduceByKey((x,y) =>x +y)
    10. rdd2.saveAsTextFile("out\\out15")
    11. sc.stop()
    12. }
    13. }

     20.groupByKey

    函数签名

    def groupByKey(): RDD[(K, Iterable[V])]

    def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

    代码:

    1. /**
    2. *groupByKey()按照K重新分组,求value的最大值
    3. */
    4. object ParTest3{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
    9. val rdd2 = rdd.groupByKey()
    10. rdd2.map(line =>(line._1,line._2.max)).collect().foreach(println)
    11. sc.stop()
    12. }
    13. }

     21.reduceByKey 和 groupByKey 的区别?

    • 从 shuffle 的角度:

    reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的 数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较 高。

    • 从功能的角度:

    reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚 合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那 么还是只能使用 groupByKey

     22.aggregateByKey

    函数签名

    def aggregateByKey[U: ClassTag](zeroValue: U)

    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

    代码:

    1. /**
    2. * aggregateByKey()
    3. */
    4. object aggregateByKeyTest {
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
    9. rdd.aggregateByKey(0)(Math.max(_,_),_+_).collect().foreach(println)
    10. sc.stop()
    11. }
    12. }

     23.foldByKey

    函数签名 def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

    代码:

    1. /***
    2. * foldByKey()分区内
    3. */
    4. object foldByKeyTest{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
    9. rdd.foldByKey(0)(_+_).collect().foreach(println)
    10. sc.stop()
    11. }
    12. }

     24.combineByKey

    函数签名

    def combineByKey[C]( createCombiner: V => C,

    mergeValue: (C, V) => C,

    mergeCombiners: (C, C) => C): RDD[(K, C)]

    代码: 

    1. /**
    2. * combineByKey()转换结构后分区内和分区间操作
    3. */
    4. object combineByKeyTest{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)))
    9. val rdd2 = rdd.combineByKey(
    10. (_,1),
    11. (acc:(Int,Int),v)=>(acc._1 + v,acc._2+1),
    12. (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
    13. )
    14. rdd2.map{
    15. case(key,value)=>{
    16. (key,value._1/value._2.toDouble)
    17. }
    18. }.collect().foreach(println)
    19. sc.stop()
    20. }
    21. }

     25.sortByKey

    函数签名

    def sortByKey(ascending: Boolean = true,

    numPartitions: Int = self.partitions.length) : RDD[(K, V)]

    代码:

    1. /**
    2. * sortByKey()按照K进行排序
    3. */
    4. object sortByKeyTest{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
    9. rdd.sortByKey(true).collect().foreach(println)
    10. }
    11. }

     26.join

    函数签名 def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

    代码:

    1. /**
    2. * join()连接 将相同key对应的多个value关联在一起
    3. */
    4. object joinTest{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd:RDD[(Int,String)]= sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    9. val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))
    10. rdd.join(rdd1).collect().foreach(println)
    11. sc.stop()
    12. }
    13. }

     27.cogroup

    函数签名 def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

    代码: 

    1. /**
    2. * cogroup() 类似全连接,但是在同一个RDD中对key聚合
    3. */
    4. object cogroupTest{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd:RDD[(Int,String)]= sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    9. val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
    10. rdd.cogroup(rdd1).collect().foreach(println)
    11. sc.stop()
    12. }
    13. }

     

  • 相关阅读:
    Docker系列第02部分:Docker安装与启动
    Elasticsearch是如何通过倒排索引来查询数据的
    基于Nodejs的心理咨询微信小程序的设计和实现
    开源共建 | TIS整合数据同步工具ChunJun,携手完善开源生态
    Shell 之 免交互编程(expect用法)
    《心流》摘录与笔记
    缓存优化必备:掌握冷热分离和重排序的优化技巧
    进程的概念
    EPICS记录参考--sub-Array记录(subArray)
    测试新人,如何快速上手一个陌生的系统!
  • 原文地址:https://blog.csdn.net/m0_55834564/article/details/125456669