RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 类型
函数签名:def map[U: ClassTag](f: T => U): RDD[U]
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
代码:
- object RddMapTest {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- var rdd = sc.makeRDD(List(1,2,3,4))
- //var rdd2 = rdd.map(x=>x*2)
- var rdd2 = rdd.map(_*2)
- rdd2.saveAsTextFile("outMap")
- }
-
- }
函数签名 def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
代码:
- object RddMapTest2{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test2").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(List(5,6,7,8))
- val rdd2 = rdd.mapPartitions(datas=>{
- datas.map(_*2)
- })
- rdd2.saveAsTextFile("outMap2")
- }
- }
- 数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子 是以分区为单位进行批处理操作。
- 功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。 MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据
- 性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处 理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能 不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。
函数签名
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
代码:
- /**需求,RddMapTest的0分区乘与2;1分区不动
- *
- */
- object RddMapTest{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test3").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(List(4,5,8,9),2)
- val rdd2:RDD[(Int)] = rdd.mapPartitionsWithIndex((index,datas)=>{
- index match {
- case 0 => datas.map(_*2)
- case _ => datas
- }
- })
- rdd2.saveAsTextFile("outMap4")
- }
- }
函数签名 def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
代码:
- object FlatMapDemo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(List(List(1,2),List(3,4),List(5,6)))
- rdd.flatMap(list => list).collect().foreach(println)
- }
-
- }
函数签名 def glom(): RDD[Array[T]]
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
代码:
- /**
- * 分区内取最大值,分区间最大值求和
- */
- object glomDemo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.textFile("input\\word.txt",3)
- val rdd2 = rdd.flatMap(line =>{
- line.split(",")
- })
- val rdd3 = rdd2.map(
- value=>{
- Integer.parseInt(value)
- }).glom().map(arr=>arr.max)
- rdd3.saveAsTextFile("out10")
- }
-
- }
word.txt的数据
1,5,6,7,100 -1,50,14,1000 6,5,-100,20
函数签名 def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
代码:
- object groupByDemo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.textFile("input\\word.txt",2)
- val rdd2 = rdd.flatMap(_.split(",")).map(Integer.parseInt(_)).groupBy(_%2)
- rdd2.saveAsTextFile("out11")
- }
-
- }
函数签名 def filter(f: T => Boolean): RDD[T]
代码:
- /**
- * 过滤字符串
- */
- object FilterDemo2 {
- def main(args: Array[String]): Unit = {
- //创建SparkConfig
- var sparkConf = new SparkConf().setAppName("text").setMaster("local[*]")
- //创建SparkContext
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array("java","js","python","php"))
- rdd.filter(_.charAt(0)=='j').collect().foreach(println)
- }
-
- }
函数签名 def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
代码:
- object SampleDemo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd:RDD[Long]=sc.range(1,100)
- //放回抽样
- // val rdd2 = rdd.sample(true,0.3)
- //不放回抽样
- val rdd2 = rdd.sample(false,0.3)
- print(rdd2.collect().mkString(","))
- }
- }
-
- /**
- *循环抽样
- */
- object SampleDemo2 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd:RDD[Long]=sc.range(1,100)
- for(a <- 1 to 10){
- //不放回抽样
- val rdd2 = rdd.sample(false,0.3,1)
- println(rdd2.collect().mkString(","))
- }
-
- }
- }
-
- /**
- * 数据倾斜
- */
- object SampleDemo3 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array("A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A",
- "A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A",
- "A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A","A",
- "B","B","B","B","B","B","B","B","C","D","E","F","G"))
- val rdd2 = rdd.map((_,1)).sample(true,0.3).reduceByKey(_+_).map(line=>(line._2,line._1)).sortBy(_._1,false).take(1)
- rdd2.foreach(line =>{
- println(line._1,line._2)
- })
- sc.stop()
- }
- }
函数签名
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
代码:
- object DistinctDemo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(1,2,3,1,1,2,2,3,3,5))
- //第一种去重
- // val rdd2 = rdd.distinct()
- //第二种去重
- // val rdd2 = rdd.map(x => (x,null)).reduceByKey((x,y)=> x,8).map(_._1)
- //对RDD采用多个Task去重,提高并发度
- val rdd2 = rdd.distinct(5)
- rdd2.foreach(println)
- }
-
- }
函数签名
def coalesce(numPartitions: Int,
shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null) : RDD[T]
函数说明
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率 当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少 分区的个数,减小任务调度成本
代码:
- object coalesceDemo {//执行Shuffle
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(1,2,3,4,5,6),3)
- val rdd2 = rdd.mapPartitionsWithIndex((index,datas)=>{
- datas.foreach(data=>{
- println(s"${index}->${data}")
- })
- datas
- })
- rdd2.collect()
-
- //缩减分区至2个,
- // val rdd3 = rdd.coalesce(2)默认不Shuffle,
- val rdd3 = rdd.coalesce(2,true)执行Shuffle
- val rdd4 = rdd3.mapPartitionsWithIndex((index,datas)=>{
- datas.foreach(data=>{
- println(s"${index}->${data}")
- })
- datas
- })
- rdd4.collect()
- }
-
- }
函数签名 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
函数说明
该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。
代码:
- /**
- * repartition()重新分区,可以大转小,小转大
- */
- object coalesceDemo2 {//执行Shuffle
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(1,2,3,4,5,6),2)
- val rdd2 = rdd.mapPartitionsWithIndex((index,datas)=>{
- datas.foreach(data=>{
- println(s"${index}->${data}")
- })
- datas
- })
- rdd2.collect()
-
-
- val rdd3 = rdd.repartition(3)//执行Shuffle
- val rdd4 = rdd3.mapPartitionsWithIndex((index,datas)=>{
- datas.foreach(data=>{
- println(s"${index}->${data}")
- })
- datas
- })
- rdd4.collect()
- }
-
- }
函数签名 def sortBy[K]( f: (T) => K,ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
代码:
- object sortByDemo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array(1,2,3,4,5,6),3)
- rdd.sortBy(a=>a,false).collect().foreach(println)
- }
-
- }
函数签名 def intersection(other: RDD[T]): RDD[T]
代码:
- /**
- * intersection()交集
- * 共同的元素
- */
- object IntersectionDemo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd1 = sc.makeRDD(Array(1,2,3),3)
- val rdd2 = sc.makeRDD(Array(3,4,5),3)
- val rdd3 = rdd1.intersection(rdd2)
- //打印分区号和数据
- val rdd4 = rdd3.mapPartitionsWithIndex((index,datas)=>{
- datas.foreach(data=>{
- println(s"${index}->${data}")//1->1 2->2
- })
- datas
- })
- rdd4.collect()
- }
-
- }
函数签名 def union(other: RDD[T]): RDD[T]
代码:
- object UnionDemo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd1 = sc.makeRDD(Array(1,2,3),3)
- val rdd2 = sc.makeRDD(Array(3,4,5),3)
- // rdd1.union(rdd2).collect().foreach(println)//1,2,3,3,4,5
- val rdd3 = rdd1.union(rdd2)
- // rdd3.saveAsTextFile("out\\out13")
- //打印分区号和数据
- val rdd4 = rdd3.mapPartitionsWithIndex((index,datas)=>{
- datas.foreach(data=>{
- println(s"${index}->${data}")
- })
- datas
- })
- rdd4.collect()
- }
-
- }
函数签名 def subtract(other: RDD[T]): RDD[T]
代码:
- /**
- * subtract ()差集
- * rdd1->rdd2 1,2
- * rrd2 ->rdd1 4,5
- */
- object SubtractDemo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd1 = sc.makeRDD(Array(1,2,3),3)
- val rdd2 = sc.makeRDD(Array(3,4,5),3)
- val rdd3 = rdd1.subtract(rdd2)
- //打印分区号和数据
- val rdd4 = rdd3.mapPartitionsWithIndex((index,datas)=>{
- datas.foreach(data=>{
- println(s"${index}->${data}")//1->1 2->2
- })
- datas
- })
- rdd4.collect()
- }
-
- }
函数签名 def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
代码:
- object ZipDemo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd1 = sc.makeRDD(Array("zhangsan","lisi","wangwu"),3)
- val rdd2 = sc.makeRDD(Array(20,18,21),3)
- val rdd3 = rdd1.zip(rdd2)
- rdd3.collect().foreach(println)
- }
-
- }
函数签名 def partitionBy(partitioner: Partitioner): RDD[(K, V)]
代码:
- object ParTest {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c")))
- var rdd2 = rdd.partitionBy(new HashPartitioner(2))
- rdd2.saveAsTextFile("out\\out14")
- sc.stop()
- }
-
- }
函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
代码:
- /**
- * reduceByKey()按照K聚合V
- */
- object ParTest2{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array((1,"a"),(1,"b"),(3,"c"),(3,"d"),(3,"k")))
- val rdd2 = rdd.reduceByKey((x,y) =>x +y)
- rdd2.saveAsTextFile("out\\out15")
- sc.stop()
- }
- }
函数签名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
代码:
- /**
- *groupByKey()按照K重新分组,求value的最大值
- */
- object ParTest3{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
- val rdd2 = rdd.groupByKey()
- rdd2.map(line =>(line._1,line._2.max)).collect().foreach(println)
- sc.stop()
- }
- }
- 从 shuffle 的角度:
reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的 数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较 高。
- 从功能的角度:
reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚 合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那 么还是只能使用 groupByKey
函数签名
def aggregateByKey[U: ClassTag](zeroValue: U)
(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
代码:
- /**
- * aggregateByKey()
- */
- object aggregateByKeyTest {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
- rdd.aggregateByKey(0)(Math.max(_,_),_+_).collect().foreach(println)
- sc.stop()
- }
-
- }
函数签名 def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
代码:
- /***
- * foldByKey()分区内
- */
- object foldByKeyTest{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
- rdd.foldByKey(0)(_+_).collect().foreach(println)
- sc.stop()
- }
- }
函数签名
def combineByKey[C]( createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
代码:
- /**
- * combineByKey()转换结构后分区内和分区间操作
- */
- object combineByKeyTest{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)))
- val rdd2 = rdd.combineByKey(
- (_,1),
- (acc:(Int,Int),v)=>(acc._1 + v,acc._2+1),
- (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
- )
- rdd2.map{
- case(key,value)=>{
- (key,value._1/value._2.toDouble)
- }
- }.collect().foreach(println)
- sc.stop()
- }
- }
函数签名
def sortByKey(ascending: Boolean = true,
numPartitions: Int = self.partitions.length) : RDD[(K, V)]
代码:
- /**
- * sortByKey()按照K进行排序
- */
- object sortByKeyTest{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
- rdd.sortByKey(true).collect().foreach(println)
- }
- }
函数签名 def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
代码:
- /**
- * join()连接 将相同key对应的多个value关联在一起
- */
- object joinTest{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd:RDD[(Int,String)]= sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
- val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))
- rdd.join(rdd1).collect().foreach(println)
- sc.stop()
- }
- }
函数签名 def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
代码:
- /**
- * cogroup() 类似全连接,但是在同一个RDD中对key聚合
- */
- object cogroupTest{
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd:RDD[(Int,String)]= sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
- val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
- rdd.cogroup(rdd1).collect().foreach(println)
- sc.stop()
- }
- }