• Spark RDD 行动算子


    1.reduce

    函数签名 def reduce(f: (T, T) => T): T

    代码:

    1. /**
    2. * reduce()聚合
    3. */
    4. object ActionDemo {
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. println(rdd.reduce(_+_))
    10. }
    11. }

     2.collect

    函数签名 def collect(): Array[T]

    代码: 

    1. /**
    2. *count()返回RDD中元素个数
    3. */
    4. object ActionDemo2{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. rdd.collect().foreach(println)
    10. }
    11. }

     3.count

    函数签名 def count(): Long

    代码: 

    1. /**
    2. *count()返回RDD中元素个数
    3. */
    4. object ActionDemo2{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. val rdd2 =rdd.count()
    10. println(rdd2)
    11. }
    12. }

     4.first

    函数签名 def first(): T

    返回 RDD 中的第一个元素

    代码: 

    1. /**
    2. *first()返回RDD中的第一个元素
    3. */
    4. object ActionDemo3{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. val rdd2 =rdd.first()
    10. println(rdd2)
    11. }
    12. }

    5.take

    函数签名 def take(num: Int): Array[T]

    代码:

    1. /**
    2. *take()返回由RDD前n个元素组成的数组
    3. */
    4. object ActionDemo4{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. val rdd2 =rdd.take(2)
    10. rdd2.foreach(println)
    11. }
    12. }

     6.takeOrdered

    函数签名 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

    代码:

    1. /**
    2. *takeOrdered()返回该RDD排序后前n个元素组成的数组
    3. */
    4. object ActionDemo5{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    9. val rdd2 =rdd.takeOrdered(2)
    10. rdd2.foreach(println)
    11. }
    12. }

     7.aggregate

    函数签名

    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

    代码:

    1. /**
    2. * aggregate()案例
    3. */
    4. object ActionDemo6{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    9. //22 23 25 9 19 110
    10. //空分区也有默认值
    11. val rdd2 =rdd.aggregate(10)(_+_,_+_)
    12. println(rdd2)//318
    13. }
    14. }

     8.fold

    函数签名 def fold(zeroValue: T)(op: (T, T) => T): T

    代码:

    1. /**
    2. * fold()案例
    3. */
    4. object ActionDemo7{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    9. //22 23 25 9 19 110
    10. //空分区也有默认值
    11. val rdd2 =rdd.fold(10)(_+_)
    12. println(rdd2)//318
    13. }
    14. }

     9.countByKey

    函数签名 def countByKey(): Map[K, Long]

    代码: 

    1. /**
    2. * countByKey()统计每种key的个数
    3. */
    4. object ActionDemo8 {
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
    9. val rdd2 = rdd.countByKey()
    10. println(rdd2)
    11. }
    12. }

     10.save 相关算子

    函数签名

    def saveAsTextFile(path: String): Unit

    def saveAsObjectFile(path: String): Unit

    def saveAsSequenceFile( path: String,

    codec: Option[Class[_ <: CompressionCodec]] = None): Unit

    将数据保存到不同格式的文件中

    1. // 保存成 Text 文件
    2. rdd.saveAsTextFile("output")
    3. // 序列化成对象保存到文件
    4. rdd.saveAsObjectFile("output1")
    5. // 保存成 Sequencefile 文件
    6. rdd.map((_,1)).saveAsSequenceFile("output2")

     11.foreach

    函数签名

    def foreach(f: T => Unit): Unit = withScope {

    val cleanF = sc.clean(f)

    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }

    分布式遍历 RDD 中的每一个元素,调用指定函数

  • 相关阅读:
    为什么估计的参数具有渐进高斯性?M-estimateor的渐进高斯性推导
    Docker安全及日志管理
    开源数据质量解决方案——Apache Griffin入门宝典
    线上AB实验的日志分析
    一文全面解读CKA认证的含金量、详细介绍!
    基于 Hexo 从零开始搭建个人博客(五)
    国家网络安全周2023时间是什么时候?有什么特点?谁举办的?
    go多样化定时任务通用实现与封装
    在 React 项目中全量使用 Hooks
    【牛客】WY49数对,JZ65不用加减乘除做加法
  • 原文地址:https://blog.csdn.net/m0_55834564/article/details/125458993