一、算子列表
| 编号 | 名称 |
| 1 | map算子 |
| 2 | flatMap算子 |
| 3 | filter算子 |
| 4 | mapPartitions算子 |
| 5 | mapPartitionsWithIndex算子 |
| 6 | keys算子 |
| 7 | values算子 |
| 8 | mapValues算子 |
| 9 | flatMaplValues算子 |
| 10 | union算子 |
| 11 | reducedByKey算子 |
| 12 | combineByKey算子 |
| 13 | groupByKey算子 |
| 14 | foldByKey算子 |
| 15 | aggregateByKey算子 |
| 16 | ShuffledRDD算子 |
| 17 | distinct算子 |
| 18 | partitionBy算子 |
二、代码示例
import org.apache.hadoop.mapreduce.task.reduce.Shuffle
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.{Aggregator, HashPartitioner, SparkConf, SparkContext, TaskContext}
object basi_transform_02 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7),2)
val map_rdd: RDD[Int] = rdd1.map(_ * 2)
println("*****1. map算子************")
map_rdd.foreach(println(_))
println("*****2.flatMap算子************")
val arr: Array[String] = Array(
val rdd2: RDD[String] = sc.makeRDD(arr, 2)
val flatMap_rdd: RDD[String] = rdd2.flatMap(_.split(" "))
flatMap_rdd.foreach(println(_))
println("*****3.filter算子***********")
val rdd3: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10))
val filter_rdd :RDD[Int]= rdd3.filter(_ % 2 == 0)
filter_rdd.foreach(println(_))
println("*****4. mapPartitions算子**********")
val rdd4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10),2)
val mapParition_rdd: RDD[Int] = rdd4.mapPartitions(iter => {
mapParition_rdd.foreach(println(_))
println("*****5. mapPartitionsWithIndex算子**********")
val rdd5: RDD[Int] = sc.parallelize(List(10, 20, 30, 40, 60),2)
val mapPartitionWithIndex_Rdd: RDD[String] = rdd5.mapPartitionsWithIndex((index, it) => {
it.map(e => s"partition:$index,val:$e")
mapPartitionWithIndex_Rdd.foreach(println(_))
println("*****6.keys算子**********")
val lst: List[(String, Int)] = List(
("spark", 1), ("spark", 3), ("hive", 2),
("Java", 1), ("Scala", 3), ("Python", 2)
val rdd6: RDD[(String, Int)] = sc.parallelize(lst)
val keysRdd: RDD[String] = rdd6.keys
keysRdd.foreach(println(_))
println("*****7.values算子**********")
val values_RDD: RDD[Int] = rdd6.values
values_RDD.foreach(println(_))
println("*****8.mapValues算子**********")
val lst2: List[(String, Int)] = List(
("Hello", 1), ("world", 2),
("I", 2), ("love", 3), ("you", 2)
val rdd8: RDD[(String, Int)] = sc.parallelize(lst2, 2)
val mapValues_rdd: RDD[(String, Int)] = rdd8.mapValues(_ * 10)
mapValues_rdd.foreach(println(_))
println("*****9.flatMaplValues算子**********")
val lst3: List[(String,String )] = List(
("Hello", "1 2 3"), ("world", "4 5 6"),
val rdd9: RDD[(String, String)] = sc.parallelize(lst3)
val flatMapValues: RDD[(String, Int)] = rdd9.flatMapValues(_.split(" ").map(_.toInt))
flatMapValues.foreach(println(_))
println("*****10.union算子**********")
val union_rdd1 = sc.parallelize(List(1, 2, 3), 2)
val union_rdd2 = sc.parallelize(List(4, 5, 6), 3)
val union_rdd: RDD[Int] = union_rdd1.union(union_rdd2)
union_rdd.foreach(println(_))
println("*****11.reducedByKey算子**********")
val lst4: List[(String, Int)] = List(
("spark", 1), ("spark", 1), ("hive", 3),
("Python", 1), ("Java", 1), ("Scala", 3),
("flink", 1), ("Mysql", 1), ("hive", 3)
val rdd11: RDD[(String, Int)] = sc.parallelize(lst4, 2)
val reduced_rdd: RDD[(String, Int)] = rdd11.reduceByKey(_ + _)
reduced_rdd.foreach(println(_))
println("*****12.combineByKey算子**********")
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f1 function invoked in stage: $stage,partiton:$partition")
val f2 = (a:Int,b:Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f2 function invoked in stage: $stage,partiton:$partition")
val f3 = (m:Int,n:Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f3 function invoked in stage: $stage,partiton:$partition")
val rdd12: RDD[(String, Int)] = sc.parallelize(lst4,2)
val combineByKey_rdd: RDD[(String, Int)] = rdd12.combineByKey(f1, f2, f3)
combineByKey_rdd.foreach(println(_))
println("*****13.groupByKey算子**********")
val rdd13: RDD[(String, Int)] = sc.parallelize(lst4, 3)
val groupByKey_rdd: RDD[(String, Iterable[Int])] = rdd13.groupByKey()
groupByKey_rdd.foreach(println(_))
println("*****14.foldByKey算子**********")
val lst5: List[(String, Int)] = List(
("maple", 1), ("kelly", 1), ("Avery", 1),
("maple", 1), ("kelly", 1), ("Avery", 1)
val rdd14: RDD[(String, Int)] = sc.parallelize(lst5)
val foldByKey_rdd: RDD[(String, Int)] = rdd14.foldByKey(1)(_ + _)
foldByKey_rdd.foreach(println(_))
println("*****15.aggregateByKey算子**********")
val rdd15: RDD[(String, Int)] = sc.parallelize(lst5)
val aggregateByKey_rdd: RDD[(String, Int)] = rdd15.aggregateByKey(1)(_ + _,_ * _ )
aggregateByKey_rdd.foreach(print(_))
println("*****16.ShuffledRDD算子**********")
val rdd16: RDD[(String, Int)] = sc.parallelize(lst5,2)
val partitioner = new HashPartitioner(rdd16.partitions.length)
val shuffledRDD: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String, Int, Int](rdd16,partitioner)
val aggregator: Aggregator[String, Int, Int] = new Aggregator[String, Int, Int](f1, f2, f3)
shuffledRDD.setAggregator(aggregator)
shuffledRDD.setMapSideCombine(true)
println(shuffledRDD.collect().toList)
println("*****17.distinct算子**********")
val lst6: Array[String] = Array(
"spark", "spark", "hive",
"Python", "Python", "Java"
val rdd17: RDD[String] = sc.parallelize(lst6)
val distinct_rdd: RDD[String] = rdd17.distinct()
println(distinct_rdd.collect().toList)
println("***** 18.partitionBy算子**********")
val rdd18: RDD[(String,Int)] = sc.parallelize(lst5,2)
val partitioner2 = new HashPartitioner(rdd18.partitions.length)
val partitioned_rdd: RDD[(String, Int)] = rdd18.partitionBy(partitioner2)
println(partitioned_rdd.collect().toList)




