1、基本释义
groupByKey 顾名思义是“按照 Key 做分组”,但实际上groupByKey算子包含分组和收集两步。具体来说,对于元素类型为(Key,Value)键值对的 Paired RDD,groupByKey 的功能就是对 Key 值相同的元素做分组,然后把相应的 Value 值,以集合的形式收集到一起。换句话说,groupByKey 会把 RDD 的类型,由 RDD[(Key, Value)]转换为 RDD[(Key, Value 集合)]。
分组但不聚合,带上分组后所有的数据直接进行shuffle操作
2、场景举例
有如下文件student_score.txt,是每个学生考试的分数
- 100 tom
- 90 lily
- 100 小明
- 80 小亮
- 100 cat
统计有哪些学生考了相同的分数
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setAppName("agg").setMaster("local")
- val sc = new SparkContext(conf)
- val file: String = "./student_score.txt"
- val lineRDD: RDD[String] = sc.textFile(file)
- val kvRDD: RDD[(String, String)] = lineRDD.map(line => line.split(" ")).map(arr => (arr(0),arr(1)))
- val keySetsRDD: RDD[(String, Iterable[String])] = kvRDD.groupByKey()
- keySetsRDD.foreach(f => {
- println(f._1,f._2)
- })
- }
输出如下
- (80,CompactBuffer(小亮))
- (100,CompactBuffer(tom, 小明, cat))
- (90,CompactBuffer(lily))
1、基本释义
reduceByKey的字面含义是“按照 Key值做聚合”,它的计算逻辑,就是根据聚合函数 f 给出的算法,把 Key 值相同的多个元素,聚合成一个元素。
其实是先分组再聚合的逻辑,与groupByKey相比,会先map端一次聚合运算,减少数据的shuffle操作,然后把聚合后的结果发给reduce端。因为只有一个函数入参,map与reduce阶段只能执行相同的操作
2、场景举例
统计每个分数相同的有多少人
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setAppName("agg").setMaster("local")
- val sc = new SparkContext(conf)
-
- val file: String = "./student_score.txt"
-
- val lineRDD: RDD[String] = sc.textFile(file)
-
- val kvRDD: RDD[(String, String)] = lineRDD.map(line => line.split(" ")).map(arr => (arr(0),arr(1)))
- val kvRDD2: RDD[(String, Int)] = kvRDD.map(word => (word._1,1))
-
- val wordCounts: RDD[(String, Int)] = kvRDD2.reduceByKey((x, y) => x + y)
-
- wordCounts.foreach(f => {
- println(f._1,f._2)
- })
- }
输出如下
- (80,1)
- (90,1)
- (100,3)
1、基本释义
aggregateByKey逻辑类似 aggregate,但aggregateByKey针对的是PairRDD,即键值对 RDD,所以返回结果也是 PairRDD,结果形式为:(各个Key, 同样Key对应的Value聚合后的值)
aggregateByKey先将每个partition内元素进行分组计算(map端聚合),然后将每个partition的计算结果进行combine(reduce端聚合),得到最终聚合结果。且最终结果允许跟原始RDD类型不同
分组加聚合,在mapper端会做本地的聚合,然后把聚合后的结果发给reducer,与reduceByKey相比还可以可以分区间的聚合操作,即定义reduce阶段的函数,在reduce结算执行更灵活的操作
方法及入参数如下:
- def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
- combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
- aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
- }
zeroValue: 每个partition的聚合初始值
seqOp: sequence operation,对partition内数据进行映射,最终1个partition只有1个结果。输入类型为U跟V,输出为U,即每次操作结果要跟zeroValue类型一致
第一次操作时,U为zeroValue(初始值),第一次操作之后输出结果U,作为下一次操作的U
第二次操作及之后操作时,U为前一次操作输出结果,而不再是zeroValue
combOp: combine operation,对每个partition的结果进行combine操作。输入类型为U跟U,输出为U,即输入类型与输出类型一致,最终结果为:(K, U)类型的PairRDD
2、场景使用
有如下文件,记录了每个学生的各科成绩,求每个学生成绩的总和
- 99 tom 语文
- 100 小明 语文
- 80 小亮 语文
- 92 tom 数学
- 80 小明 数学
- 89 小亮 数学
- 99 tom 英语
- 88 小明 英语
- 90 小亮 英语
利用aggregateByKey两次求和,最终得到每个学生的总成绩。
由于上述的文件可能会存在多个分区之上,seqFunc函数参与分区内的计算,会有学生部分科目的成绩。combFunc参与分区之间的计算,shuffle时触发,所有分区加起来才是所有科目的成绩
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setAppName("agg").setMaster("local")
- val sc = new SparkContext(conf)
-
- val file: String = "./wikiOfSpark.txt"
- // 读取文件内容
- val lineRDD: RDD[String] = sc.textFile(file)
- val kvRDD: RDD[(String, Int)] = lineRDD.map(line => line.split(" ")).map(arr => (arr(1), arr(0).toInt))
-
- val kvRDD2: RDD[(String, Int)] = kvRDD.aggregateByKey(zeroValue = 0)((x, y) => seqFunc(x, y),(x, y) => combFunc(x, x))
- kvRDD2.collect().foreach(f => {
- println(f)
- })
- }
-
- def seqFunc(a: Int, b: Int): Int = {
- a+b
- }
- def combFunc(a: Int, b: Int): Int = {
- a+b
- }
输出
- (tom,290)
- (小亮,259)
- (小明,268)
是aggregateByKey函数的简写形式。
方法及参数def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
aggregateByKey例子中的
kvRDD.aggregateByKey(zeroValue = 0)((x, y) => seqFunc(x, y),(x, y) => combFunc(x, x))
改写成 kvRDD.foldByKey(0)(_+_)依然正确运行
1、基本释义
与groupByKey、reduceByKey、aggregateByKey相比,combineByKey在日常开发工作中我们用的更少一些。combineByKey使用也相对复杂一些,可以处理更为复杂和个性化的需求。许多bykey类的内部也是调用combineByKeyWithClassTag函数完成部分计算操作的
下边是combineByKey的参数定义
- def combineByKey[C](
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C
- ): RDD[(K, C)]
可以看到起有三个参数,分别是
createCombiner:对于每个元素(键值对)的key只有两种情况,初次遇到,再次遇到。如果初次遇到执行createCombiner方法,创建Combiner,如果再次遇到同一key,表明已经存在则执行mergeValue,进行value的合并。该函数在格式上是由 V类型 -> C类型的转化
mergeValue:对于已经出现过的key,调用mergeValue来进行聚合操作,对该键的累加器对应的当前值(C类型)与这个新的值(V类型)进行合并,并输出C类型的结果。
mergeCombiners:是针对不同分区而言的,如果同一key出现在了多个分区,就需要使用mergeCombiners方法出马将各个分区的结果(全是C格式)进行一个最终的合并。
createCombiner 和 mergeValue 处理单个分区中数据,属于map端的操作, mergeCombiners是不同分区之间的数据合并,属于reduce端的操作,会触发shuffle计算。因此combineByKey首先通过createCombiner 、mergeValue合并相同的key,减少了对reduce的数据,自然shuffle时对资源的消耗减少,性能提升不言而喻
2、场景使用
1、有如下Tom和Cat两个学生的考试成绩,计算每个人的总分数和科目数量
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setAppName("agg").setMaster("local")
- val sc = new SparkContext(conf)
- val scores: Array[(String, Int)] = Array(("Tom", 88), ("Tom", 95), ("Tom", 91), ("Cat", 93), ("Cat", 95), ("Cat", 98), ("Tom", 92))
- val input: RDD[(String, Int)] = sc.parallelize(scores, 3);
- val combineRDD: RDD[(String,(Int,Int))] = input.combineByKey(
- (v: Int) => (v, 1), //key初次遇到(分数值,1)
- (acc: (Int, Int), v: Int) => (acc._1 + v, acc._2 + 1), //相同key同一个分区(分数值累加,科目数累加)
- (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) //相同key不同分区(分数值累加,科目数累加)
- )
- combineRDD.foreach(f => {
- println(f)
- })
-
- //打印每个人的平均分
- combineRDD.map(f => {
- (f._1,f._2._1/f._2._2)
- }).foreach(f => println(f))
- }
输出如下
- //总分及科目数
- (Tom,(366,4))
- (Cat,(286,3))
-
- //平均分
- (Tom,91)
- (Cat,95)
- def combineByKey[C](
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C
- ): RDD[(K, C)]
结合示例代码和函数定义,参数有如下映射关系
(1)createCombiner:V>=C, (v: Int) => (v, 1)
key初次来没有,key对应的分数做新key,value就记数量1,(key,value)作为C输出
(2)mergeValue:(C, V) => C,(acc: (Int, Int), v: Int) => (acc._1 + v, acc._2 + 1)
相同key同一个分区,分数值累加,科目数累加,同样新(key,value)作为C输出
(3)mergeCombiners:(C, C) => C,(acc1: (Int, Int),acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
相同key不同分区分数值累加,科目数累加,同样新(key,value)作为C输出