• spark—KV算子解析


    面试题目


    (1)reduceByKey 和 groupByKey 的区别?

    从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。

    从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey

    (2)reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?

    这四个算子底层调用的都是同一个方法combineByKeyWithClassTag只不过他们的参数传值不同:

    注意这里第四个参数partitioner分区先不管

    reduceByKey:

    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
    

    参数一:(v: V) => v 表示第一个值不会参与计算

    参数二:func 表示分区内数据的处理函数

    参数三:func 表示分区间数据的处理函数

    这里分区内和分区间用的是同一个函数,所以可以简化

    foldByKey:

    combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
          cleanedFunc, cleanedFunc, partitioner)
    

    参数一:(v: V) => (v: V) => cleanedFunc(createZero(), v) 表示初始值和第一个key的value值进行的分区内数据操作

    参数二:cleanedFunc 表示分区内数据的处理函数

    参数三:cleanedFunc 表示分区间数据的处理函数

    这里分区内和分区间用的是同一个函数,所以可以简化

    aggregateByKey:

    combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
          cleanedSeqOp, combOp, partitioner)
    

    参数一:(v: V) => cleanedSeqOp(createZero(), v)
    表示初始值和第一个key的value值进行的分区内数据操作

    参数二:cleanedSeqOp 表示分区内数据的处理函数

    参数三:combOp 表示分区间数据的处理函数

    combineByKey :

    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)
    

    参数一:createCombiner 表示相同key的第一条数据,做什么样的处理,进行的处理函数

    参数二:mergeValue 表示分区内数据的处理函数

    参数三:mergeCombiners 表示分区间数据的处理函数

    为了便于理解,使用上面四个算子同时进行wordcout操作代码如下:

    package com.bigdata.SparkCore.wc
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @author wangbo
     * @version 1.0
     */
    object test {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local").setAppName("test")
        val sc = new SparkContext(sparkConf)
        val list1: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("a",5),("b",6)),2)
        val wordcout: RDD[(String, Int)] = list1.reduceByKey(_+_)
        val wordcout1: RDD[(String, Int)] = list1.foldByKey(0)(_ + _)
        val wordcout2: RDD[(String, Int)] = list1.aggregateByKey(0)(_ + _, _ + _)
        val wordcout3: RDD[(String, Int)] = list1.combineByKey(v => v, (x: Int, y) => x + y, (x: Int, y: Int) => x + y)
        wordcout.collect().foreach(println)
        println("=====")
        wordcout1.collect().foreach(println)
        println("=====")
        wordcout2.collect().foreach(println)
        println("=====")
        wordcout3.collect().foreach(println)
      }
    }
    

    说明:

    1. reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
    2. FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
    3. AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
    4. CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
  • 相关阅读:
    java之日期相关
    程序员工作只能做到 45 岁吗?之后的路该怎么走?
    linux启动时发生的那些事(待更)
    常见的数据结构有哪些?
    mybatis执行过程,源码分析
    rust学习——操作字符串、字符串转义、操作UTF8-字符串 (操作中文字符串)
    算法 顺时针旋转矩阵
    数据挖掘方法论具体实施步骤
    电信保温杯笔记——《统计学习方法(第二版)——李航》第17章 潜在语义分析
    汽车油封的重要性
  • 原文地址:https://blog.csdn.net/weixin_44604159/article/details/126919799