• 【算子2】spark(四):spark core:trans算子中key-value类型的算子使用说明


    1. Key-Value类型的算子分类

    一、(输入分区与输出分区)一对一

    mapValues(func): 针对于(K,V)形式的类型只对V进行操作
    sortByKey([ascending], [numTasks]): 按照key值进行排序
    

     
    二、RDD聚集

    reduceByKey(func): 将相同key的value值进行收集,例如求出每个key的平均值
    combineByKey(func): 对数据进行分区内和分区间的操作
    partitionBy(func): 通过key值进行分区;分区逻辑可以是默认可以是自定类(继承Partitioner

     

    三、连接

    join(otherDataset, [numTasks])): 类似于mysql中的外键关联,关联要素key相同
    leftOutJoin()\rightOutJoin:
    cogroup(otherDataset): 与join类似,但不同的是,没有相同的key时,同样可以进行联合。
    

     
     

    2. 常用算子使用举例

    partitionBy:通过key进行分区

    println("分区数"+arrayRDD.partitions.size)
    //分区逻辑可以是默认可以是自定类(继承Partitioner)
    val partitionByRDD: RDD[(Int, String)] = arrayRDD.partitionBy(new HashPartitioner(2))
    println("分区数"+partitionByRDD.partitions.size)
    

     

    groupBykey:通过key进行分组

    相同的key分为一组

    //将数据变成Tuple2
    val ArrayRDD: RDD[String] = sc.makeRDD(Array("one", "two", "two", "three", "three", "three"))
    val mapRDD: RDD[(String, Int)] = ArrayRDD.map((_,1))
    
    //groupByKey
    val groupByKeyRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
    
    //map对Tuple2的CompactBuffer进行求和操作
    val sumRDD: RDD[(String, Int)] = groupByKeyRDD.map(t=>(t._1,t._2.sum))
    sumRDD.collect().foreach(println)
    

     

    combineByKey

    对数据进行分区内和分区间的操作

    val combineByKeyRDD: RDD[(String, (Int, Int))] = ListRDD.combineByKey(
          (_, 1),
          (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
          (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
        )
        combineByKeyRDD.collect().foreach(println)
        //求平均:
        val avrRDD: RDD[(String, Int)] = combineByKeyRDD.map {
          case (x, y) => (x, y._1 / y._2)
        }
        avrRDD.collect().foreach(println)
    

    在这里插入图片描述

     

    join

    在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

    1)创建第一个pairRDD
    scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))2)创建第二个pairRDD
    scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))3)join操作并打印结果
    scala> rdd.join(rdd1).collect()
    res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
    

     

    cogroup(otherDataset, [numTasks])

    作用:在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
    需求:创建两个pairRDD,并将key相同的数据聚合到一个迭代器。

    1)创建第一个pairRDD
    scala> val rdd = sc.parallelize(Array((1,"a"),(6,"b"),(2,"c"),(5,"d")))2)创建第二个pairRDD
    scala> val rdd1 = sc.parallelize(Array((1,"a"),(5,"b"),(5,"b")))3)cogroup两个RDD并打印结果
    scala> rdd.cogroup(rdd1).collect()
    

    与join类似,但不同的是,没有相同的key时,同样可以进行联合。
    在这里插入图片描述

  • 相关阅读:
    Dubbo 框架搭建一个passport案例
    Flask 表单form.validate_on_submit()什么情况下会是false——解决办法
    解决windows端口占用
    【场景生成与研究】考虑时序相关性MC的场景生成与削减研究(Matlab代码实现)
    安装OCP集群
    骑行运动耳机哪个好,盘点五款适合骑行佩戴的耳机推荐
    6. N 字形变换
    二叉树分层遍历
    如何用postman做接口自动化测试
    【数据结构】模拟实现queue
  • 原文地址:https://blog.csdn.net/hiliang521/article/details/127040464