• Spark中RDD常见的算子:Value 类型、双 Value 类型、Key - Value 类型


    一、RDD转换算子

    RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型

    Value 类型

    【map】
    函数签名
    def map[U: ClassTag](f: T => U): RDD[U] ➢ 函数说明
    将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

    val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
    //将dataRDD中的数据逐条乘以2再返回给新的dataRDD1,map函数参数和返回结果的数量一定是保持不变的,也就是说传入多少个,返回就一定是多少个
    val dataRDD1: RDD[Int] = dataRDD.map(
     num => {
     num * 2
     } )
     //将dataRDD1中的数据类型由int类型数据逐条转化为String
    val dataRDD2: RDD[String] = dataRDD1.map(
     num => {
     "" + num
     } )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    【小结】:map函数是将RDD中的数据进行逐条转化,输入参数数量一定和输出参数数量相等。

    【flatMap】
    函数签名
    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
    函数说明
    将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

    //构建RDD,泛型为List,List中的单个元素也为List,相当于List(List(元素1,元素2),List(元素3,元素4))
    val dataRDD = sparkContext.makeRDD(List(
     List(1,2),List(3,4)
    ),1)
    //将数据List(List(元素1,元素2),List(元素3,元素4)) 变成List(元素1,元素2,元素3,元素4)
    //相当于把多层嵌套的整体拆分成当个个体,最后对单个的个体进行一层封装,所以输入值时list,输出值也是list,但是底层确是进行了数据的先拆分解除嵌套,再封装合并。
    val dataRDD1 = dataRDD.flatMap(
     list => list
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    扁平化映射可以不仅仅是直接的拆分,还可以定义对数据的操作,如分切,过滤等,输入数据数量和输出数据数量可以不相当

    案例一:将list中的元素进行切分

        val rdd: RDD[String] = sparkcontext.makeRDD(List("Hello,Scala", "Hello,Spark"))
    
        val rdd2: RDD[String] = rdd.flatMap(
    /*      elemet=>{
            elemet.split(",")
          }*/
          //两种写法都一样,函数的至简原则,当参数只出现一次,则参数省略,函数体中用下划线代替,匿名函数只有一行,则大括号省略
          _.split(",")
        )
       rdd2.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    打印的结果为

    Hello
    Scala
    Hello
    Spark
    
    • 1
    • 2
    • 3
    • 4

    案例二:flatMap中只能将两层的数据做扁平化,如果数据有三层,则视最里层的集合数据为单个元素

    例如:

        val dataRDD = sparkcontext.makeRDD(List(
          List(List("法外狂徒张三","神舟十五号"),List("法外狂徒张三","神舟十五号")),List(List("法外狂徒张三","神舟十五号"),List("法外狂徒张三","神舟十五号"))
        ),1)
        val dataRDD1 = dataRDD.flatMap(
          list => list
        )
    
       dataRDD1.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    输出结果为

    List(法外狂徒张三, 神舟十五号)
    List(法外狂徒张三, 神舟十五号)
    List(法外狂徒张三, 神舟十五号)
    List(法外狂徒张三, 神舟十五号)
    
    • 1
    • 2
    • 3
    • 4

    案例三:当不能满足直接扁平化映射的需求,则需要在flatMap中自定义逻辑,将参数按照自己的需求进行转化

        val rdd2: RDD[Any] = rdd1.flatMap(
          //将最外层List中的元素List(1, 2), 3, List(4, 5)做模式匹配,
          datas => {
            datas match {
                //当参数符合list类型时,返回list
              case list: List[_] => list
                //当参数datas参数符合int类型时,将int类型的参数datas封装到list中去
              case data:Int => List(data)
            }
          }
          //将外层list中的元素3,通过match模式匹配之后,转化为了list(3),在直接通过flatmap做扁平映射
        )
    
        rdd2.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    输出结果为

    1
    2
    3
    4
    5
    
    • 1
    • 2
    • 3
    • 4
    • 5

    案例四:案例二中list有三层,可以先将第二层和第三层看做一个,用flatmap解析,再将返回结果用模式匹配match中的case封装成一个list,此时第二层和第三层返回结果集与最外层的list就成了一个新的符合flatmap解析的双层嵌套,符合扁平化对格式的要求,这样就可以成功解析三层list的嵌套问题。

    val dataRDD = sparkcontext.makeRDD(List(
              List(List("法外狂徒张三1","神舟十五号1"),List("法外狂徒张三2","神舟十五号2")),List(List("法外狂徒张三3","神舟十五号3"),List("法外狂徒张三4","神舟十五号4"))
            ))
            val dataRDD1 = dataRDD.flatMap(
              datas => {
                datas match {
                //将第二层的元素做match模式匹配,case中再次调用flatmap算子,并将解析后的数据再次封装成list
                  case list => list.flatMap(
                    list=>list
                  ) 
                }
              }
            )
             dataRDD1.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    打印出来的结果

    法外狂徒张三1
    神舟十五号1
    法外狂徒张三2
    神舟十五号2
    法外狂徒张三3
    神舟十五号3
    法外狂徒张三4
    神舟十五号4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    【小结】:flatmap的解析数据格式必须符合双层list嵌套,如果不符合则需要用match模式匹配,用case进行转换,使其符合list(list(),list())双层list类型才可以扁平化解析,flatmap还可以嵌套使用,用于解析多层list嵌套问题。返回结果数量和输入参数数量无关。

    【glom】
    函数签名
    def glom(): RDD[Array[T]]
    函数说明
    将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
    原因:因为数组封装了很多的方法,如最大值,最小值,升序,降序,求和等等,所以将数据转化为内存数组,就可以直接调用数组中已经封装好的方法。类似于Java中将数据映射封装成对象,直接调用对象的方法既可实现功能。
    需求:求分区间最大值之和

        val rdd: RDD[Int] = sparkcontext.makeRDD(List(10, 20, 30, 40), 2)
    
        val golmRDD: RDD[Array[Int]] = rdd.glom()
    
        val maxRDD: RDD[Int] = golmRDD.map(
    /*      array => {
            array.max
          }*/
          //Scala中的简化写法
          _.max
        )
        println(maxRDD.collect().sum)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    结果为:60

    【小结】:glom()是将数据转化为数组,以便转化后能方便调用数组中的方法

    【groupBy】
    函数签名
    def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
    函数说明
    将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中
    案例一:根据取余数进行分组

        val rdd1: RDD[Int] = sparkcontext.makeRDD(List(1, 2, 3, 4), 2)
        val rdd2: RDD[(Int, Iterable[Int])] = rdd1.groupBy(_ % 2)
        val rdd3: RDD[(Int, Iterable[Int])] = rdd1.groupBy(_ % 3)
    
        rdd2.collect().foreach(println)
        rdd3.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    rdd2.collect().foreach(println)输出结果为

    (0,CompactBuffer(2, 4))
    (1,CompactBuffer(1, 3))
    
    • 1
    • 2

    rdd3.collect().foreach(println)输出结果为

    (0,CompactBuffer(3))
    (2,CompactBuffer(2))
    (1,CompactBuffer(1, 4))
    
    • 1
    • 2
    • 3

    注意:构建的rdd1通过groupBy()分组之后,新的RDD分组数量是由分组规则决定,与rdd1中定义的分组数量没有关系

    案例二:根据首字母分组

        val rdd4: RDD[String] = sparkcontext.makeRDD(List("Hello", "Spark", "Scala", "Hadoop", "法外狂徒张三"), 2)
        val groupRDD: RDD[(Char, Iterable[String])] = rdd4.groupBy(_.charAt(0))
        
        groupRDD.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4

    结果为

    (H,CompactBuffer(Hello, Hadoop))
    (S,CompactBuffer(Spark, Scala))
    (法,CompactBuffer(法外狂徒张三))
    
    • 1
    • 2
    • 3

    案例三:从服务器日志数据 apache.log 中获取每个时间段访问量。
    方式一:

        val rdd1: RDD[String] = sparkcontext.textFile("datas/apache.log")
    
        //map 将逐行数据做映射
        val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd1.map(
          line => {
            //将一行数据按照空格进行切分,返回的是一行数据切分后的数组
            val datas: Array[String] = line.split(" ")
            //去数组中的第三个元素,返回值为字符串类型的17/05/2015:10:05:47
            val timestring: String = datas(3)
            //对返回值的字符串进行时间格式化,参数是字符串中的时间格式
            val dateformatdemo = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
            //解析
            val dates: Date = dateformatdemo.parse(timestring)
    
            /*
            方式一:通过dates中直接get出小时数字
            val hours: Int = dates.getHours
            */
    
            //方式二:通过格式化方法拿到小时数字
            val hours: String = new SimpleDateFormat("HH").format(dates)
    
            //进行数据封装,封装成kv类型方便统计,表示每来一行,出现一次
            (hours, 1)
          }
        ).groupBy(_._1)
    
        val countRDD: RDD[(String, Int)] = timeRDD.map {
          case (hour, iter) => {
            (hour, iter.size)
          }
        }
        
        countRDD.collect.foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    方式二:

        val rdd1: RDD[String] = sparkcontext.textFile("datas/apache.log")
    
        //map 将逐行数据做映射
        val countRDD: RDD[(String, Int)] = rdd1.map(
          line => {
            //将一行数据按照空格进行切分,返回的是一行数据切分后的数组
            val datas: Array[String] = line.split(" ")
            //去数组中的第三个元素,返回值为字符串类型的17/05/2015:10:05:47
            val timestring: String = datas(3)
            //对返回值的字符串进行时间格式化,参数是字符串中的时间格式
            val dateformatdemo = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
            //解析
            val dates: Date = dateformatdemo.parse(timestring)
    
            /*
            方式一:通过dates中直接get出小时数字
            val hours: Int = dates.getHours
            */
    
            //方式二:通过格式化方法拿到小时数字
            val hours: String = new SimpleDateFormat("HH").format(dates)
    
            //进行数据封装,封装成kv类型方便统计,表示每来一行,出现一次
            (hours, 1)
          }
    
        ).reduceByKey(_ + _)
    
        countRDD.collect.foreach(println)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    方式一是单value类型的groupBy实现,方式二是双value类型的reduceByKey(_ + _)实现,双value类型的groupByKey也可以实现

    【小结】:分组主要是定义分组规则,分多少组取决于分组规则,与输入参数的RDD组数无关。

    【filter】
    filter
    函数签名
    def filter(f: T => Boolean): RDD[T]
    函数说明
    将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜

    val rdd1: RDD[Int] = sparkcontext.makeRDD(List(1, 2, 3, 4), 2)
    val filterRDD: RDD[Int] = rdd1.filter(_ % 2 != 0)
    filterRDD.collect().foreach(println)
    
    • 1
    • 2
    • 3

    打印结果为:1 3

    【小结】:filter是过滤器,自定义过滤规则,会将数据打乱,可能会导致数据倾斜

    【coalesce】
    函数签名

    def coalesce(numPartitions: Int, shuffle: Boolean = false,
     partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
     (implicit ord: Ordering[T] = null)
     : RDD[T]
    
    • 1
    • 2
    • 3
    • 4
        val rdd1: RDD[Int] = sparkcontext.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4), 4)
    
    	//数据直接进行缩减分区
        val rdd2: RDD[Int] = rdd1.coalesce(2)
        //数据进行了缩减分区,同时进行重新均衡,默认flase,需要时出入true即可重新进行shuffle的数据均衡
        val rdd3: RDD[Int] = rdd1.coalesce(2,true)
        rdd2.saveAsTextFile("output")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    函数说明
    根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

    【小结】:coalesce并不是将数据平均分配,只是缩减分区,会导致数据倾斜,如果想要数据均衡,则需要shuffle处理。coalesce是可以扩大分区,但是需要进行shuffle操作,如果不shuffle操作,则扩大的分区没有数据

    优化使用repartition代替coalesce
    【repartition】
    函数签名
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    函数说明
    该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的
    RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition
    操作都可以完成,因为无论如何都会经 shuffle 过程

    【小结】:对分区的操作都使用repartition,底层是采用coalesce默认shuffle,实际开发中使用repartition即可

    【sortBy】
    sortBy
    ➢ 函数签名
    def sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
    ➢ 函数说明
    该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理
    的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一
    致。中间存在 shuffle 的过程

    val dataRDD = sparkContext.makeRDD(List(
     1,2,3,4,1,2
    ),2)
    val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
    
    • 1
    • 2
    • 3
    • 4

    【小结】:sortBy是对分区内排序,分区数量保持不变

    双value类型

    对两个RDD求交集,并集、差集和拉链

    Key - Value 类型

    【partitionBy】
    函数签名
    def partitionBy(partitioner: Partitioner): RDD[(K, V)]
    函数说明
    将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

    
        val rdd1: RDD[Int] = sparkcontext.makeRDD(List(1, 2, 3, 4), 2)
    
        //将list中的元素通过map映射成key-value类型数据
        val mapRDD: RDD[(Int, Int)] = rdd1.map((_, 1))
    
        val rdd3: RDD[(Int, Int)] = mapRDD.partitionBy(new HashPartitioner(2))
    
        rdd3.saveAsTextFile("output")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    默认分区为HashPartitioner

    【reduceByKey】
    函数签名
    def reduceByKey(func: (V, V) => V): RDD[(K, V)]
    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
    函数说明
    可以将数据按照相同的 Key 对 Value 进行聚合

        val dataRDD1 = sparkcontext.makeRDD(List(("a",1),("b",2),("c",3),("a",10)))
        //scala语言中一般操作都是两两聚合,spark是基于scala开发,所以他的聚合也是两两聚合
        val dataRDD2 = dataRDD1.reduceByKey(_+_)
        //参数2位numpartions 即分区数
        val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
    
        dataRDD3.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    打印结果

    (b,2)
    (a,11)
    (c,3)
    
    • 1
    • 2
    • 3

    【小结】:reduceByKey是对相同元素进行两两聚合,函数体中_+_表示相同类型key的value1+value2
    【总结】:reduceByKey 和 groupByKey 的区别
    从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较
    高。
    从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey

    【aggregateByKey】
    函数签名
    def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
    combOp: (U, U) => U): RDD[(K, U)]
    函数说明
    将数据根据不同的规则进行分区内计算和分区间计算

    
        val dataRDD1 = sparkcontext.makeRDD(List(("a",1),("b",2),("c",3),("a",10),("b",20),("c",30)),2)
        //需求一:分区内聚合,分区间聚合,聚合逻辑定义在aggregateByKey的第二个参数中
        //aggregateByKey存在函数的柯理化,有两个参数列表,aggregateByKey()()
        //第一个参数列表,需要传递一个参数,表示初始值,即zeroValue=0,初始值用于第一次的两两操作,案例中为聚合,实际中可以使求最大值、求最小值
        //第二个参数列表需要传递2个参数,第一个参数表示分区内计算规则,第二个参数表示分区间计算规则
        val dataRDD2 = dataRDD1.aggregateByKey(0)(_+_,_+_)
    
        dataRDD2.collect().foreach(println)
    
        //需求二:分区内求最大值,分区间做聚合。运算逻辑定义在aggregateByKey的第二个参数中
        val dataRDD3: RDD[(String, Int)] = dataRDD1.aggregateByKey(0)(
          //(x,y)=>math.max(x,y)  简化后的写法在下方
          math.max(_, _),
          //(x,y)=>x+y  下方为简化后的写法
          _ + _
        )
        dataRDD3.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    【小结】:aggregateByKey可以自定义分区内的逻辑和分区间的逻辑,aggregateByKey的函数柯理化的第一个参数一般默认是0,第二个参数是实现具体的自定义逻辑,第二个参数中又包含两个参数,第二个参数中的参数一是定义分区内的逻辑,第二个参数中的参数二是定义分区间的逻辑

    【 foldByKey】
    函数签名
    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
    函数说明
    当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
    val dataRDD1 = sparkContext.makeRDD(List((“a”,1),(“b”,2),(“c”,3)))
    val dataRDD2 = dataRDD1.foldByKey(0)(+)
    【小结】:当aggregateByKey分区内逻辑和分区间逻辑相同时,可以简化为foldByKey(0)(+)

    【combineByKey】
    函数签名
    def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C): RDD[(K, C)]
    函数说明
    最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致

    RDD 行动算子

    行动算子:是触发RDD的计算,返回值不是RDD,一般是具体的值

    【reduce】
    函数签名
    def reduce(f: (T, T) => T): T
    函数说明
    聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 聚合数据,聚合逻辑在函数体中自定义
    val reduceResult: Int = rdd.reduce(_+_)
    
    • 1
    • 2
    • 3

    【小结】:reduce算子是触发计算,返回值不再是RDD

    【 collect】
    函数签名
    def collect(): Array[T]
    函数说明
    在驱动程序中,以数组 Array 的形式返回数据集的所有元素

        val rdd: RDD[Int] = sparkcontext.makeRDD(List(1,2,3,4))
        // 收集数据到 Driver,从返回值类型可以看出,是一个int类型的数组
        val actions: Array[Int] = rdd.collect()
        actions.foreach(println)
    
    • 1
    • 2
    • 3
    • 4

    【count】
    函数签名
    def count(): Long
    函数说明
    返回 RDD 中元素的个数

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 返回 RDD 中元素的个数
    val countResult: Long = rdd.count()
    
    • 1
    • 2
    • 3

    【take】
    函数签名
    def take(num: Int): Array[T]
    函数说明
    返回一个由 RDD 的前 n 个元素组成的数组

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 返回 RDD 中元素的个数
    val takeResult: Array[Int] = rdd.take(2)
    println(takeResult.mkString(","))
    
    • 1
    • 2
    • 3
    • 4

    【takeOrdered】
    函数签名
    def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
    函数说明
    返回该 RDD 排序后的前 n 个元素组成的数组

    val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
    // 返回 RDD 中元素的个数
    val result: Array[Int] = rdd.takeOrdered(2)
    
    • 1
    • 2
    • 3

    【aggregate】
    函数签名
    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
    函数说明
    分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
    // 将该 RDD 所有元素相加得到结果
    //val result: Int = rdd.aggregate(0)(_ + _, _ + _)
    val result: Int = rdd.aggregate(10)(_ + _, _ + _)
    
    • 1
    • 2
    • 3
    • 4

    【fold】
    函数签名
    def fold(zeroValue: T)(op: (T, T) => T): T
    函数说明
    折叠操作,aggregate 的简化版操作

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
    val foldResult: Int = rdd.fold(0)(_+_)
    
    • 1
    • 2

    【countByKey】
    函数签名
    def countByKey(): Map[K, Long]
    函数说明
    统计每种 key 的个数

    val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, 
    "b"), (3, "c"), (3, "c")))
    // 统计每种 key 的个数
    val result: collection.Map[Int, Long] = rdd.countByKey()
    
    • 1
    • 2
    • 3
    • 4

    【save 相关算子】
    函数签名
    def saveAsTextFile(path: String): Unit
    def saveAsObjectFile(path: String): Unit
    def saveAsSequenceFile(
    path: String,
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit
    函数说明
    将数据保存到不同格式的文件中

    // 保存成 Text 文件
    rdd.saveAsTextFile("output")
    // 序列化成对象保存到文件
    rdd.saveAsObjectFile("output1")
    // 保存成 Sequencefile 文件
    rdd.map((_,1)).saveAsSequenceFile("output2")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    【foreach】
    函数签名
    def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }
    函数说明
    分布式遍历 RDD 中的每一个元素,调用指定函数

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 收集后打印
    rdd.map(num=>num).collect().foreach(println)
    println("****************")
    // 分布式打印
    rdd.foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    [Python进阶] 消息框、弹窗:pymsgbox.alert
    MySQL 进阶 怎么去了解MySQL的架构原理
    odoo javascript参考(六)
    CAS:1445723-73-8,DSPE-PEG-NHS,磷脂-聚乙二醇-活性酯两亲性脂质PEG共轭物
    Unity3D 游戏编程中需要掌握的数学知识详解
    【数据仓库-零】数据仓库知识体系 ing
    【Elasticsearch】在es中实现mysql中的FIND_IN_SET查询条件
    【探索Spring底层】2.容器的实现
    Golang 基础三
    EasyCVR视频调阅页面如何正确关闭正在播放的视频?
  • 原文地址:https://blog.csdn.net/weixin_44870066/article/details/128149782