• Spark---RDD的创建分类和基础操作算子详解


    一、RDD的创建

    原生api提供了两种创建方式,一种就是读取文件textFile,还有一种就是加载一个scala集合parallelize。当然,也可以通过transformation算子来创建的RDD。

        //创建RDD
        //加载数据,textFile(参数1,参数2),参数1可以读取本地文件也可以读取hdfs上的文件,参数2为最小分区数量,但spark有自己的判断,在允许的范围内参数2有效,否则失效
        val rdd = sc.textFile("F:\\test\\words.txt")
        //适合加载一堆小文件,wholeTextFile(参数1,参数2),参数1可以读取本地文件也可以读取hdfs上的文件,参数2为最小分区数量,最多只能开到文件数量
        val rdd1 = sc.wholeTextFile("F:\\test\\words.txt")
    
        //从scala集合创建
        val list = List(1,2,3,4)
        val arr = Array(1,2,3,4)
        //parallelize(参数1,参数2)参数1为集合数据,参数2是指定分区数,没有就是没有指定分区数,默认是CPU核数
        val rdd2 = sc.parallelize(list)
        //makeRDD底层调用了parallelize
        val rdd3 = sc.makeRDD(arr)
    
        //从其他RDD转换而来
        val rdd4 = rdd1.flatMap(_.split(" "))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    二、RDD的分类及基本操作

    基本上分为两类:transformation和action

    1、transformation

    转换算子(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
    1、map算子
    对RDD集合中的每一个元素,都作用一次该func匿名函数,之后返回值为生成元素构成的一个新的RDD。

    	 //map映射
        val rdd = sc.parallelize(1 to 7)
        //将每一个元素扩大10倍
        val res = rdd.map(_*10)
        //打印输出
        println(res.collect().toBuffer)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.flatMap算子
    集合中的每一个元素,都要作用func匿名函数,返回0到多个新的元素,这些新的元素共同构成一个新的RDD。是一个one-to-many的操作

    	//flatMap=map+flatten
        val list = List(
          "jia jing kan kan kan",
          "gao di di di di",
          "zhan yuan qi qi"
        )
        //将集合转换为RDD
        val rdd = sc.parallelize(list)
        //按照指定分隔符进行切分
        val res = rdd.flatMap(_.split(" "))
        //将结果输出
        res.foreach(print)//zhanyuanqiqijiajingkankankangaodidididi
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.mapPartitions算子
    mapPartitions(p: Iterator[A] => Iterator[B])一次性处理一个partition分区中的数据。执行性能要高于map,但是其一次性将一个分区的数据加载到执行内存空间,如果该分区数据集比较大,存在OOM的风险。

    	//mapPartitions:一次操作一个分区的数据
        val rdd = sc.parallelize(Array(1,2,3,4,5),3)
        //一次操作一个分区的数据
        val res = rdd.mapPartitions(x=>Iterator(x.mkString("-")))
        res.foreach(println)
        /*1
        2-3
        4-5*/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    4、mapPartitionsWithIndex算子
    mapPartitionsWithIndex((index, p: Iterator[A] => Iterator[B])),该操作比mapPartitions多了一个index,代表就是后面p所对应的分区编号。

    	//mapPartitionsWithIndex:查看每个分区当中都保存了哪些元素
        val rdd = sc.parallelize(1 to 16,2)
        //查看每个分区当中都保存了哪些元素
        val res = rdd.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(",")))
        res.foreach(println)
        /*1:9-10-11-12-13-14-15-16
    	0:1-2-3-4-5-6-7-8*/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    5、sample算子
    sample(withReplacement, fraction, seed):随机抽样算子,去代替全量研究会出现类似数据倾斜(dataSkew)等问题,无法进行全量研究,只能用样本去评估整体。
    withReplacement:Boolean :有放回的抽样和无放回的抽样
    fraction:Double:样本空间占整体数据量的比例,大小在[0, 1],比如0.2, 0.65
    seed:Long:是一个随机数的种子,有默认值,通常不需要传参

    	//sample:随机抽样算子,样品的预期大小个数不确定
        val rdd = sc.parallelize(1 to 10)
        //随机抽取样本占总体的0.5,有放回,会有重复
        val res = rdd.sample(true,0.5)
        println(res.collect().toBuffer)//ArrayBuffer(3, 8, 10, 10)
        //无放回,不会有重复
        val res1 = rdd.sample(false,0.8)
        println(res1.collect().toBuffer)//ArrayBuffer(2, 3, 4, 5, 6, 7, 8, 9)
    
    	 //takeSample精确抽样,参数2为样本大小,确定抽几个
        val rdd = sc.parallelize(1 to 10)
        val res = rdd.takeSample(false,7)
        println(res.toBuffer)//ArrayBuffer(2, 3, 9, 8, 10, 6, 4)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    6、union算子
    rdd1.union(rdd2)
    相当于sql中的union all,进行两个rdd数据间的联合,需要说明一点是,rdd1如果有N个分区,rdd2有M个分区,那么union之后的分区个数就为N+M。

     	//union :整合两个RDD当中的元素,并且整合分区数
        val rdd1= sc.parallelize(1 to 5,3)
        val rdd2= sc.parallelize(3 to 7,2)
        rdd1.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
        /*0:1
        2:4,5
    	1:2,3*/
       	rdd2.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
       	/*0:3,4
    	1:5,6,7*/
        val res = rdd1.union(rdd2)
        //查看有多少元素
        res.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
        /*0:1
    	1:2,3
    	2:4,5
    	4:5,6,7
    	3:3,4*/
        println(res.collect().toBuffer)//ArrayBuffer(1, 2, 3, 4, 5, 3, 4, 5, 6, 7)
        //查看分区数
        println(res.getNumPartitions)//5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    7、join算子

    	//join:相同的key进行输出,不同的key不进行输出
        val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
        val rdd2 = sc.parallelize(Array((1,4),(2,5),(7,6)))
        //调用算子
        val res : RDD[(Int, (String, Int))]= rdd1.join(rdd2)
        println(res.collect().toBuffer) //ArrayBuffer((1,(a,4)), (2,(b,5)))
    
        //rightOuterJoin
        val res1 : RDD[(Int,(Option[String],Int))]= rdd1.rightOuterJoin(rdd2)
        println(res1.collect().toBuffer)//ArrayBuffer((1,(Some(a),4)), (2,(Some(b),5)), (7,(None,6)))
    
        //leftOuterJoin
        val res2:RDD[(Int,(String,Option[Int]))] = rdd1.leftOuterJoin(rdd2)
        println(res2.collect().toBuffer)//ArrayBuffer((1,(a,Some(4))), (2,(b,Some(5))), (3,(c,None)))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    8、coalesce算子
    coalesce(numPartition, shuffle=false): 分区合并的意思
    numPartition:分区后的分区个数
    shuffle:此次重分区是否开启shuffle,决定当前的操作是宽(true)依赖还是窄(false)依赖

    	//coalesce:分区合并
        val rdd :RDD[Int]= sc.parallelize(1 to 16,4)
        println(rdd.getNumPartitions)//4
        //查看每个分区当中都保存了哪些元素
        rdd.mapPartitionsWithIndex((index,item)=>Iterator{
          index+":"+item.mkString(",")
        }).foreach(println)
        /*3:13,14,15,16
    	0:1,2,3,4
    	1:5,6,7,8
    	2:9,10,11,12*/
        //缩减分区数,默认直接分区合并不会进行shuffle洗牌,也就是说默认只能缩减分区数不能增加
        val res:RDD[Int] = rdd.coalesce(3)
        //查看分区数
        println(res.getNumPartitions)//3
        //查看分区中都保存了那些元素
        res.mapPartitionsWithIndex((index,item)=>{
          Iterator(index+":"+item.mkString("-"))
        }).foreach(println)
        /*0:1-2-3-4
    	1:5-6-7-8
    	2:9-10-11-12-13-14-15-16*/
    	
        //如果想要增加分区数,将shuffle改为true
        val res:RDD[Int] = rdd.coalesce(5,true)
        //查看分区数
        println(res.getNumPartitions)//5
        //查看分区中都保存了那些元素
        res.mapPartitionsWithIndex((index,item)=>{
          Iterator(index+":"+item.mkString("-"))
        }).foreach(println)
        /*0:10-13
        2:2-6-12-15
        3:3-7-16
        1:1-5-11-14
        4:4-8-9*/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    9、repartition算子
    repartition底层调用了coalesce(numPartitions, shuffle = true),shuffle过程默认为ture

    	val rdd :RDD[Int]= sc.parallelize(1 to 16,4)
        println(rdd.getNumPartitions)
        //查看每个分区当中都保存了哪些元素
        rdd.mapPartitionsWithIndex((index,item)=>{
          Iterator(index+":"+item.mkString("-"))
        }).foreach(println)
        /*0:1-2-3-4
    	3:13-14-15-16
    	2:9-10-11-12
    	1:5-6-7-8*/
        //调用算子
        val res = rdd.repartition(2)
        println(res.getNumPartitions)
        //查看
        res.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString("-"))).foreach(println)
    	/*分区不是两两合并,而是重新洗牌分为两个分区
    	1:2-4-6-8-10-12-14-16
    	0:1-3-5-7-9-11-13-15*/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    10、sortBy算子
    sortBy(func,[ascending], [numTasks])
    ascending:true为升序,false为降序
    numTasks:分区数

    	//sortBy
        val rdd:RDD[(String,Int)] = sc.parallelize(List(("a",1),("b",8),("c",6)),3)
        println(rdd.getNumPartitions)//3
        //按照第二个字段进行排序
        val res = rdd.sortBy(_._2,false,2)
        println(res.collect().toBuffer)//ArrayBuffer((b,8), (c,6), (a,1))
        println(res.getNumPartitions)//2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    11、sortByKey([ascending], [numTasks])算子
    在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

        val rdd:RDD[(String,Int)] = sc.parallelize(List(("a",1),("b",8),("c",6)),4)
        val res = rdd.sortByKey(true,3)
        println(res.collect().toBuffer)//ArrayBuffer((a,1), (b,8), (c,6))
        println(res.getNumPartitions)//3
    
    • 1
    • 2
    • 3
    • 4

    12、groupBy和 groupByKey
    groupByKey相比较reduceByKey而言,没有本地预聚合操作,显然其效率并没有reduceByKey效率高,在使用的时候如果可以,尽量使用reduceByKey等去代替groupByKey。

    case class Student(id:Int,name:String,province:String)
        val stuRDD = sc.parallelize(List(
          Student(1, "张三", "安徽"),
          Student(2, "李梦", "山东"),
          Student(3, "王五", "甘肃"),
          Student(4, "周七", "甘肃"),
          Student(5, "Lucy", "黑吉辽"),
          Student(10086, "魏八", "黑吉辽")
        ))
        //按照省份进行排序
        //groupBy就是对不是kv键值对的数据进行分组
        val res = stuRDD.groupBy(stu=>stu.province)
        println(res.collect().toBuffer)//ArrayBuffer((安徽,CompactBuffer(Student(1,张三,安徽))), (黑吉辽,CompactBuffer(Student(5,Lucy,黑吉辽), Student(10086,魏八,黑吉辽))), (甘肃,CompactBuffer(Student(3,王五,甘肃), Student(4,周七,甘肃))), (山东,CompactBuffer(Student(2,李梦,山东))))
    
        //groupByKey针对的是kv键值对的数据,numPartition指的是分组之后的分区个数
        val stures=stuRDD.map(stu=>(stu.province,stu))
        //调用算子
        val result = stures.groupByKey()
        println(result.collect().toBuffer)//ArrayBuffer((安徽,CompactBuffer(Student(1,张三,安徽))), (黑吉辽,CompactBuffer(Student(5,Lucy,黑吉辽), Student(10086,魏八,黑吉辽))), (甘肃,CompactBuffer(Student(3,王五,甘肃), Student(4,周七,甘肃))), (山东,CompactBuffer(Student(2,李梦,山东))))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    13、reduceByKey算子

    	//reduceByKey,会进行预聚合,效率比groupbykey高,聚合的是key对应的value值
        case class Student(id: Int, name:String, province: String)
        val stuRDD = sc.parallelize(List(
          Student(1, "张三", "安徽"),
          Student(2, "李梦", "山东"),
          Student(3, "王五", "甘肃"),
          Student(4, "周七", "甘肃"),
          Student(5, "Lucy", "黑吉辽"),
          Student(10086, "魏八", "黑吉辽")
        ))
        //按照相同的省份进行聚合
        val res = stuRDD.map(stu=>(stu.province,1))
        val count = res.reduceByKey(_+_)
        println(count.collect().toBuffer)//ArrayBuffer((安徽,1), (黑吉辽,2), (甘肃,2), (山东,1))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    14、foldByKey算子

    	//foldByKey与reduceByKey的区别就是多了一个初始值
        case class Student(id: Int, name:String, province: String)
        val stuRDD = sc.parallelize(List(
          Student(1, "张三", "安徽"),
          Student(3, "王五", "甘肃"),
          Student(5, "Lucy", "黑吉辽"),
          Student(2, "李梦", "山东"),
          Student(4, "周七", "甘肃"),
          Student(10086, "魏八", "黑吉辽")
        ), 2)
        //查看每个分区当中都保存了哪些元素
        stuRDD.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
        /*1:Student(2,李梦,山东),Student(4,周七,甘肃),Student(10086,魏八,黑吉辽)
    	0:Student(1,张三,安徽),Student(3,王五,甘肃),Student(5,Lucy,黑吉辽)*/
        //调用算子进行聚合
        val res = stuRDD.map(stu=>(stu.province,1))
        //初始化的值针对的是每个分区当中,相同key下只有一个初始值
        val sount = res.foldByKey(1)(_+_)
        println(sount.collect().toBuffer)//ArrayBuffer((安徽,2), (甘肃,4), (山东,2), (黑吉辽,4))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    15、combineByKey算子

     	//combineByKey,reduceByKey和groupByKey底层都是通过combineByKeyWithClassTag来实现的
        val array = sc.parallelize(Array(
          "hello you",
          "hello me",
          "hello you",
          "hello you",
          "hello me",
          "hello you"
        ), 5)
        //按照分隔符进行切分
        val word = array.flatMap(line=>line.split(" "))
        //每个单词记为一次
        val word1 = word.map((_,1))
        //调用算子
        //第一个参数是初始化,第二个参数是小聚合,分区之内聚合,第三个参数是大聚合,分区之间聚合
        val res = word1.combineByKey(createCombiner,mergeValue,mergeCombiner)
        println(res.collect().toBuffer)//ArrayBuffer((me,2), (hello,6), (you,4))
    
        //例子
        val rdd: RDD[Int] = sc.parallelize(List(1,1,1,2,2,2,2,2,3,3,3,3,3,4,4,4,4,4,4,4),2)
        //将数据转为key,value形式
        val rdd1: RDD[(Int, Int)] = rdd.map((_,1))
        //查看每个分区当中都保存了哪些元素
        rdd1.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
        //调用算子
        //初始化:针对每个分区当中,相同key下第一条元素进行初始化
        val result = rdd1.combineByKey(-_,(a:Int,b:Int)=>a+b,(a:Int,b:Int)=>a+b)
        result.foreach(println)
    
      def createCombiner(num:Int)={
        num
      }
      def mergeValue(sum:Int,num:Int)={
        sum+num
      }
      def mergeCombiner(sum:Int,num:Int)={
        sum+num
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    16、aggregateByKey算子
    combineByKey和aggregateByKey的区别就相当于reduceByKey和foldByKey。

    val array = sc.parallelize(Array(
          "hello you",
          "hello me",
          "hello you",
          "hello you",
          "hello me",
          "hello you"
        ), 2)
        //切分并将每个单词记为1次
        val wordAndOne: RDD[(String, Int)] = array.flatMap(_.split(" ")).map((_,1))
        //查看每个分区当中都保存了哪些元素
        wordAndOne.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
        /*1:(hello,1),(you,1),(hello,1),(me,1),(hello,1),(you,1)
    	0:(hello,1),(you,1),(hello,1),(me,1),(hello,1),(you,1)*/
        //调用算子进行聚合
        //第一个参数是分区之内进行聚合,也就是小聚合
        //第二个参数是分区之间进行聚合,也就是大聚合
        //初始化的值针对的是每个分区当中,相同key下只有一个初始值
        val res = wordAndOne.aggregateByKey(1)(_+_,_+_)
        println(res.collect().toBuffer)//ArrayBuffer((hello,8), (me,4), (you,6))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2、action

    操作/行动(Actions)算子 (如:count, collect, foreach等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
    1、foreach算子
    foreach主要功能,就是用来遍历RDD中的每一条纪录,其实现就是将map或者flatMap中的返回值变为Unit即可,即foreach(A => Unit)
    2、count算子
    统计该rdd中元素的个数
    3、collect算子
    该算子的含义就是将分布在集群中的各个partition中的数据拉回到driver中,进行统一的处理;但是这个算子有很大的风险存在,第一,driver内存压力很大,第二数据在网络中大规模的传输,效率很低;所以一般不建议使用,如果非要用,请先执行filter。
    4、take&first算子
    返回该rdd中的前N个元素,如果该rdd的数据是有序的,那么take(n)就是TopN;而first是take(n)中比较特殊的一个take(1)。
    5、takeOrdered(n)
    返回前几个的排序
    6、reduce算子
    reduce是一个action操作,reduceByKey是一个transformation。reduce对一个rdd执行聚合操作,并返回结果,结果是一个值。
    7、countByKey算子
    统计key出现的次数。

  • 相关阅读:
    关于正负数的取模并小议某度的流量生产思维
    MMdetection3.x个人笔记
    [Android] native MediaCodec
    CNN学习笔记
    阿里云 Serverless 异步任务处理系统在数据分析领域的应用
    java智慧工地云平台源码,以物联网、移动互联网技术为基础,结合大数据、云计算等,实现工程管理绿色化、数字化、精细化、智能化的效果
    kubernetes的服务暴露Service的三种常用类型
    ssm+jsp学分置换管理系统maven idea项目
    论文精讲目录
    新版TCGA不同癌种数据合并
  • 原文地址:https://blog.csdn.net/onthe_wing/article/details/138004610