• Spark RDD 行动算子


    1.reduce

    函数签名 def reduce(f: (T, T) => T): T

    代码:

    1. /**
    2. * reduce()聚合
    3. */
    4. object ActionDemo {
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. println(rdd.reduce(_+_))
    10. }
    11. }

     2.collect

    函数签名 def collect(): Array[T]

    代码: 

    1. /**
    2. *count()返回RDD中元素个数
    3. */
    4. object ActionDemo2{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. rdd.collect().foreach(println)
    10. }
    11. }

     3.count

    函数签名 def count(): Long

    代码: 

    1. /**
    2. *count()返回RDD中元素个数
    3. */
    4. object ActionDemo2{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. val rdd2 =rdd.count()
    10. println(rdd2)
    11. }
    12. }

     4.first

    函数签名 def first(): T

    返回 RDD 中的第一个元素

    代码: 

    1. /**
    2. *first()返回RDD中的第一个元素
    3. */
    4. object ActionDemo3{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. val rdd2 =rdd.first()
    10. println(rdd2)
    11. }
    12. }

    5.take

    函数签名 def take(num: Int): Array[T]

    代码:

    1. /**
    2. *take()返回由RDD前n个元素组成的数组
    3. */
    4. object ActionDemo4{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. val rdd2 =rdd.take(2)
    10. rdd2.foreach(println)
    11. }
    12. }

     6.takeOrdered

    函数签名 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

    代码:

    1. /**
    2. *takeOrdered()返回该RDD排序后前n个元素组成的数组
    3. */
    4. object ActionDemo5{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    9. val rdd2 =rdd.takeOrdered(2)
    10. rdd2.foreach(println)
    11. }
    12. }

     7.aggregate

    函数签名

    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

    代码:

    1. /**
    2. * aggregate()案例
    3. */
    4. object ActionDemo6{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    9. //22 23 25 9 19 110
    10. //空分区也有默认值
    11. val rdd2 =rdd.aggregate(10)(_+_,_+_)
    12. println(rdd2)//318
    13. }
    14. }

     8.fold

    函数签名 def fold(zeroValue: T)(op: (T, T) => T): T

    代码:

    1. /**
    2. * fold()案例
    3. */
    4. object ActionDemo7{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    9. //22 23 25 9 19 110
    10. //空分区也有默认值
    11. val rdd2 =rdd.fold(10)(_+_)
    12. println(rdd2)//318
    13. }
    14. }

     9.countByKey

    函数签名 def countByKey(): Map[K, Long]

    代码: 

    1. /**
    2. * countByKey()统计每种key的个数
    3. */
    4. object ActionDemo8 {
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
    9. val rdd2 = rdd.countByKey()
    10. println(rdd2)
    11. }
    12. }

     10.save 相关算子

    函数签名

    def saveAsTextFile(path: String): Unit

    def saveAsObjectFile(path: String): Unit

    def saveAsSequenceFile( path: String,

    codec: Option[Class[_ <: CompressionCodec]] = None): Unit

    将数据保存到不同格式的文件中

    1. // 保存成 Text 文件
    2. rdd.saveAsTextFile("output")
    3. // 序列化成对象保存到文件
    4. rdd.saveAsObjectFile("output1")
    5. // 保存成 Sequencefile 文件
    6. rdd.map((_,1)).saveAsSequenceFile("output2")

     11.foreach

    函数签名

    def foreach(f: T => Unit): Unit = withScope {

    val cleanF = sc.clean(f)

    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }

    分布式遍历 RDD 中的每一个元素,调用指定函数

  • 相关阅读:
    3D Instances as 1D Kernels
    node.js室内装修风格选择系统毕业设计-附源码211552
    Redis 分布式锁
    Java Swing游戏开发学习25
    合同法名词解释
    项目实战-day1.0
    路径规划 | 飞蛾扑火算法求解二维栅格路径规划(Matlab)
    知识图谱从入门到应用——知识图谱的知识表示:基础知识
    40 个 SpringBoot 常用注解:让生产力爆表!
    maven的私服
  • 原文地址:https://blog.csdn.net/m0_55834564/article/details/125458993