• Spark - RDD 算子介绍及使用 Scala、Java、Python 三种语言演示


    一、RDD 的起源

    RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行流程如下:
    在这里插入图片描述
    多个 MapReduce 任务之间只能通过磁盘来进行传递数据,很明显的效率低下,再来看 RDD 的处理方式:

    在这里插入图片描述
    整个过程是共享内存的, 而不需要将中间结果存放在分布式文件系统中,这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度。

    二、RDD 的特点

    RDD 不仅是数据集, 也是编程模型,提供了上层 API, 同时 RDDAPIjdk8stream 流对集合运算的 API 非常类似,同样也都是各算子,如下:

    textFile.filter(StringUtils.isNotBlank) //过滤空内容
      .flatMap(_.split(" ")) //根据空格拆分
      .map((_, 1)) // 构建新的返回
      .foreach(s => println(s._1 + "  " + s._2)) //循环
    
    • 1
    • 2
    • 3
    • 4

    RDD 的算子大致分为两类:

    • Transformation 转换操作, 例如 map flatMap filter 等。
    • Action 动作操作, 例如 reduce collect show

    注意:执行 RDD 的时候会进行惰性求值,执行到转换操作的时候,并不会立刻执行,直到遇见了 Action 操作,才会触发真正的执行。

    创建 RDD

    RDD 有三种创建方式,可以通过本地集合直接创建,也可以通过读取外部数据集来创建,还可以通过其它的 RDD 衍生而来:

    首先声明 SparkContext

    • scala:
    val conf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc = new SparkContext(conf)
    
    • 1
    • 2
    • java
    SparkConf conf = new SparkConf().setAppName("spark").setMaster("local[*]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    
    • 1
    • 2
    • python
    from pyspark import SparkConf, SparkContext, StorageLevel
    import findspark
    
    if __name__ == '__main__':
        findspark.init()
        conf = SparkConf().setAppName('spark').setMaster('local[*]')
        sc = SparkContext(conf=conf)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1. 通过集合创建
    • scala
    val rdd1 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""))
    //指定分区
    val rdd2 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""), 5)
    
    • 1
    • 2
    • 3
    • java
    JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""));
    //指定分区
    JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""), 5);
    
    • 1
    • 2
    • 3
    • python
    rdd1 = sc.parallelize(["abc", "abc", "fff dd", "ee,pp", ""])
    #
    rdd2 = sc.parallelize(["abc", "abc", "fff dd", "ee,pp", ""], 5)
    
    • 1
    • 2
    • 3
    2. 通过文件创建
    • scala
     //读取本地文件
     val rdd3 = sc.textFile("D:/test/spark/input3/words.txt")
     //读取本地文件,指定分区
     val rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5)
     //读取 HDFS 文件
     val rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt")
     //读取文件同时拿到文件名
     val rdd6 = sc.textFile("hdfs://test/spark/input3/")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • java
    //读取本地文件
    JavaRDD<String> rdd3 = sc.textFile("D:/test/spark/input3/words.txt");
    //读取本地文件,指定分区
    JavaRDD<String> rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5);
    //读取 HDFS 文件
    JavaRDD<String> rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt");
    //读取文件同时拿到文件名
    JavaRDD<String> rdd6 = sc.textFile("hdfs://test/spark/input3/");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • python
    # 读取本地文件
    rdd3 = sc.textFile("D:/test/spark/input3/words.txt")
    #读取本地文件,指定分区
    rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5)
    #读取 HDFS 文件
    rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt")
    #读取文件同时拿到文件名
    rdd6 = sc.textFile("hdfs://test/spark/input3/")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    下面对相关常用算子进行演示。

    三、Transformations 算子

    1. map

    RDD 中的数据 一对一 的转为另一种形式:

    例如:

    • scala:
    val num = sc.parallelize(Seq(1, 2, 3, 4, 5))
    println(
      num.map(_+1).collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • java:
    JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
    System.out.println(
           num.map(i -> i + 1).collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • python:
    num = sc.parallelize((1, 2, 3, 4, 5))
    print(
        num.map(lambda i:i+1).collect()
    )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    2. flatMap

    Map 算子类似,但是 FlatMap 是一对多,并都转化为一维数据:

    例如:

    • scala:
    val text = sc.parallelize(Seq("abc def", "hello word", "dfg,okh", "he,word"))
    println(
      text.flatMap(_.split(" ")).flatMap(_.split(",")).collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • java:
    JavaRDD<String> text = sc.parallelize(Arrays.asList("abc def", "hello word", "dfg,okh", "he,word"));
    System.out.println(
            text.flatMap(s ->Arrays.asList(s.split(" ")).iterator())
                    .flatMap(s ->Arrays.asList(s.split(",")).iterator())
                    .collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • python:
    text = sc.parallelize(("abc def", "hello word", "dfg,okh", "he,word"))
    print(
        text.flatMap(lambda s: s.split(" ")).flatMap(lambda s: s.split(",")).collect()
    )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    3. filter

    过滤掉不需要的内容:

    例如:

    • scala:
    val text = sc.parallelize(Seq("hello", "hello", "word", "word"))
    println(
      text.filter(_.equals("hello")).collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • java:
    JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"));
    System.out.println(
            text.filter(s -> Objects.equals(s,"hello"))
                    .collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • python:
    text = sc.parallelize(("hello", "hello", "word", "word"))
    print(
        text.filter(lambda s: s == 'hello').collect()
    )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    4. mapPartitions

    map 类似,针对整个分区的数据转换,拿到的是每个分区的集合:

    例如:

    • scala:
    val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
    println(
      text.mapPartitions(iter => {
        iter.map(_ + "333")
      }).collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • java:
    JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
    System.out.println(
            text.mapPartitions(iter -> {
                List<String> list = new ArrayList<>();
                iter.forEachRemaining(s -> list.add(s+"333"));
                return list.iterator();
            }).collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • python:
     text = sc.parallelize(("hello", "hello", "word", "word"), 2)
     
     def partition(par):
         tmpArr = []
         for s in par:
             tmpArr.append(s + "333")
         return tmpArr
    
     print(
         text.mapPartitions(partition).collect()
     )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述

    5. mapPartitionsWithIndex

    mapPartitions 类似, 只是在函数中增加了分区的 Index

    例如:

    • scala:
    val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
    println(
      text.mapPartitionsWithIndex((index, iter) => {
        println("当前分区" + index)
        iter.map(_ + "333")
      }, true).collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • java:
    JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
    System.out.println(
           text.mapPartitionsWithIndex((index, iter) -> {
               System.out.println("当前分区" + index);
               List<String> list = new ArrayList<>();
               iter.forEachRemaining(s -> list.add(s + "333"));
               return list.iterator();
           }, true).collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • python:
    text = sc.parallelize(("hello", "hello", "word", "word"), 2)
    
    def partition(index, par):
        print("当前分区" + str(index))
        tmpArr = []
        for s in par:
            tmpArr.append(s + "333")
        return tmpArr
    
    print(
        text.mapPartitionsWithIndex(partition).collect()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    6. mapValues

    只能作用于 Key-Value 型数据, 和 Map 类似, 也是使用函数按照转换数据, 不同点是 MapValues 只转换 Key-Value 中的 Value

    例如:

    • scala:
    val text = sc.parallelize(Seq("abc", "bbb", "ccc", "dd"))
    println(
      text.map((_, "v" + _))
        .mapValues(_ + "66")
        .collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • java:
    JavaRDD<String> text = sc.parallelize(Arrays.asList("abc", "bbb", "ccc", "dd"));
    System.out.println(
           text.mapToPair(s -> new Tuple2<>(s, "v" + s))
                   .mapValues(v -> v + "66").collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • python:
    text = sc.parallelize(("abc", "bbb", "ccc", "dd"))
    print(
        text.map(lambda s: (s, "v" + s)).mapValues(lambda v: v + "66").collect()
    )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    7. sample

    可以从一个数据集中抽样出来一部分, 常用作于减小数据集以保证运行速度, 并且尽可能少规律的损失:

    第一个参数为withReplacement, 意为是否取样以后是否还放回原数据集供下次使用, 简单的说,如果这个参数的值为 true, 则抽样出来的数据集中可能会有重复。

    第二个参数为fraction, 意为抽样的比例。

    第三个参数为seed, 随机数种子, 用于 Sample 内部随机生成下标,一般不指定,使用默认值。

    例如:

    • scala:
    val num = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    println(
      num.sample(true,0.6,2)
        .collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • java:
    JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    System.out.println(
        num.sample(true, 0.6, 2).collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • python:
    num = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    print(
        num.sample(True, 0.6, 2).collect()
    )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    8. union

    两个数据并集,类似于数据库的 union

    例如:

    • scala:
    val text1 = sc.parallelize(Seq("aa", "bb"))
    val text2 = sc.parallelize(Seq("cc", "dd"))
    println(
      text1.union(text2).collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • java:
    JavaRDD<String> text1 = sc.parallelize(Arrays.asList("aa", "bb"));
    JavaRDD<String> text2 = sc.parallelize(Arrays.asList("cc", "dd"));
    System.out.println(
            text1.union(text2).collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • python:
    text1 = sc.parallelize(("aa", "bb"))
    text2 = sc.parallelize(("cc", "dd"))
    print(
       text1.union(text2).collect()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    9. join,leftOuterJoin,rightOuterJoin

    两个(key,value)数据集,根据 key 取连接、左连接、右连接,类似数据库中的连接:

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
    val s2 = sc.parallelize(Seq("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))
    
    val s3 = s1.map(s => (s.split(",")(0), s.split(",")(0)))
    val s4 = s2.map(s => (s.split(",")(0), s.split(",")(1)))
    
    println(s3.join(s4).collectAsMap)
    println(s3.leftOuterJoin(s4).collectAsMap)
    println(s3.rightOuterJoin(s4).collectAsMap)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
    JavaRDD<String> s2 = sc.parallelize(Arrays.asList("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"));
    
    JavaPairRDD<String, String> s3 = s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
    JavaPairRDD<String, String> s4 = s2.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
    
    System.out.println(s3.join(s4).collectAsMap());
    System.out.println(s3.leftOuterJoin(s4).collectAsMap());
    System.out.println(s3.rightOuterJoin(s4).collectAsMap());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • python:
    s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
    s2 = sc.parallelize(("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))
    
    s3 = s1.map(lambda s:(s.split(",")[0], s.split(",")[0]))
    s4 = s2.map(lambda s:(s.split(",")[0], s.split(",")[1]))
    
    print(s3.join(s4).collectAsMap())
    print(s3.leftOuterJoin(s4).collectAsMap())
    print(s3.rightOuterJoin(s4).collectAsMap())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    10. intersection

    获取两个集合的交集 :

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("abc", "dfe", "hello"))
    val s2 = sc.parallelize(Seq("fgh", "nbv", "hello", "word", "jkl", "abc"))
    println(
     	s1.intersection(s2).collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "dfe", "hello"));
    JavaRDD<String> s2 = sc.parallelize(Arrays.asList("fgh", "nbv", "hello", "word", "jkl", "abc"));
    System.out.println(
         s1.intersection(s2).collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • python:
    s1 = sc.parallelize(("abc", "dfe", "hello"))
    s2 = sc.parallelize(("fgh", "nbv", "hello", "word", "jkl", "abc"))
    print(
        s1.intersection(s2).collect()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    11. subtract

    获取差集,a - b ,取 a 集合中 b 集合没有的元素:

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("abc", "dfe", "hello"))
    val s2 = sc.parallelize(Seq("fgh", "nbv", "hello", "word", "jkl", "abc"))
    println(
     	s1.subtract(s2).collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "dfe", "hello"));
    JavaRDD<String> s2 = sc.parallelize(Arrays.asList("fgh", "nbv", "hello", "word", "jkl", "abc"));
    System.out.println(
            s1.subtract(s2).collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • python:
    s1 = sc.parallelize(("abc", "dfe", "hello"))
    s2 = sc.parallelize(("fgh", "nbv", "hello", "word", "jkl", "abc"))
    print(
        s1.subtract(s2).collect()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    12. distinct

    元素去重,是一个需要 Shuffled 的操作:

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
    println(
      s1.distinct().collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
    System.out.println(
        s1.distinct().collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • python:
    s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
    print(
      s1.distinct().collect()
    )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    13. reduceByKey

    只能作用于 Key-Value 型数据,根据 Key 分组生成一个 Tuple,然后针对每个组执行 reduce 算子,传入两个参数,一个是当前值,一个是局部汇总,这个函数需要有一个输出, 输出就是这个 Key 的汇总结果,是一个需要 Shuffled 的操作:

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
    println(
      s1.map((_, 1))
        .reduceByKey(Integer.sum)
        .collectAsMap
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
    System.out.println(
       s1.mapToPair(s -> new Tuple2<>(s, 1))
               .reduceByKey(Integer::sum)
               .collectAsMap()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • python:
    s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
    print(
      s1.map(lambda s: (s, 1))
          .reduceByKey(lambda v1, v2: v1 + v2)
          .collectAsMap()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    14. groupByKey

    只能作用于 Key-Value 型数据,根据 Key 分组, 和 ReduceByKey 有点类似, 但是 GroupByKey 并不求聚合, 只是列举 Key 对应的所有 Value,是一个需要 Shuffled 的操作。

    GroupByKeyReduceByKey 不同,因为需要列举 Key 对应的所有数据, 所以无法在 Map 端做 Combine, 所以 GroupByKey 的性能并没有 ReduceByKey 好:

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
    println(
      s1.map((_, 1))
        .groupByKey()
        .collectAsMap
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
    System.out.println(
            s1.mapToPair(s -> new Tuple2<>(s, 1))
                    .groupByKey()
                    .collectAsMap()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • python:
    s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
    print(
        s1.map(lambda s: (s, 1))
            .reduceByKey()
            .collectAsMap()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    15. combineByKey

    对数据集按照 Key 进行聚合,groupByKey, reduceByKey 的底层都是 combineByKey

    参数:

    createCombiner 将 Value 进行初步转换
    mergeValue 在每个分区把上一步转换的结果聚合
    mergeCombiners 在所有分区上把每个分区的聚合结果聚合
    partitioner 可选, 分区函数
    mapSideCombiner 可选, 是否在 Map 端 Combine
    serializer 序列化器

    例如,求取每个人的分数的平均值:

    • scala:
    val s1 = sc.parallelize(Seq("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
    println(
      s1.map(s => (s.split(":")(0), s.split(":")(1).toDouble))
        .combineByKey(
          score => (score, 1),
          (c: (Double, Int), newScore: Double) => (c._1 + newScore, c._2 + 1),
          (d1: (Double, Int), d2: (Double, Int)) => (d1._1 + d2._1, d1._2 + d2._2)
        ).map(t => (t._1, t._2._1 / t._2._2))
        .collectAsMap
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"));
    System.out.println(
       s1.mapToPair(s -> new Tuple2<>(s.split(":")[0], Double.parseDouble(s.split(":")[1])))
               .combineByKey(
                       (Function<Double, Tuple2<Double, Integer>>) score -> new Tuple2(score, 1),
                       (Function2<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>>) (c, newScore) -> new Tuple2<>(c._1 + newScore, c._2 + 1),
                       (Function2<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>) (d1, d2) -> new Tuple2<>(d1._1 + d2._1, d1._2 + d2._2))
               .mapToPair(t -> new Tuple2(t._1, t._2._1 / t._2._2))
               .collectAsMap()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • python:
    s1 = sc.parallelize(("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
    print(
        s1.map(lambda s: (s.split(":")[0], float(s.split(":")[1])))
            .combineByKey(lambda score: (score, 1),
                          lambda c, newScore: (c[0] + newScore, c[1] + 1),
                          lambda d1, d2: (d1[0] + d2[0], d1[1] + d2[1]))
            .map(lambda t: (t[0], t[1][0] / t[1][1]))
            .collectAsMap()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    16. aggregateByKey

    聚合所有 Key 相同的 Value:

    参数
    zeroValue 初始值
    seqOp 转换每一个值的函数
    comboOp 将转换过的值聚合的函数

    例如,求取每个人的分数的平均值:

    • scala:
    val s1 = sc.parallelize(Seq("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
    println(
     s1.map(s => (s.split(":")(0), s.split(":")(1).toDouble))
       .aggregateByKey((0.0, 0))(
         (zeroValue, aDouble) => {
           (zeroValue._1 + aDouble, zeroValue._2 + 1)
         },
         (t1, t2) => {
           (t1._1 + t2._1, t1._2 + t2._2)
         }
       ).map(t => (t._1, t._2._1 / t._2._2)).collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"));
    System.out.println(
      s1.mapToPair(s -> new Tuple2<>(s.split(":")[0], Double.parseDouble(s.split(":")[1])))
              .aggregateByKey(
                      new Tuple2<>(0.0, 0),
                      (Function2<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>>) (zeroValue, aDouble) -> new Tuple2<>(zeroValue._1 + aDouble, zeroValue._2 + 1),
                      (Function2<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>) (t1, t2) -> new Tuple2<>(t1._1 + t2._1, t1._2 + t2._2)
              ).map(t -> new Tuple2<>(t._1, t._2._1 / t._2._2))
              .collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • python:
    s1 = sc.parallelize(("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
    
    print(
       s1.map(lambda s: (s.split(":")[0], float(s.split(":")[1])))
           .aggregateByKey((0.0, 0),
               lambda zeroValue, aDouble:(zeroValue[0] + aDouble, zeroValue[1] + 1),
               lambda t1, t2:(t1[0] + t2[0], t1[1] + t2[1]))
           .map(lambda t:(t[0], t[1][0] / t[1][1])).collect()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    17. foldByKey

    ReduceByKey 是一样的, 都是按照 Key 做分组求聚合,但是 FoldByKey 可以指定初始值,可以认为是 AggregateByKey 的简化版本, seqOpcombOp 是同一个函数:

    参数
    zeroValue 初始值
    func seqOp 和 combOp 相同, 都是这个参数

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
    println(
     s1.map((_, 1))
       .foldByKey(0)((seroValue, v) => seroValue + v)
       .collect().toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
    System.out.println(
      s1.mapToPair(s -> new Tuple2<>(s, 1))
              .foldByKey(0, (seroValue, v) -> seroValue + v).collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • python:
    s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
    print(
       s1.map(lambda s:(s, 1))
           .foldByKey(0,lambda seroValue, v: seroValue + v)
           .collect()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    18. sortBy ,sortByKey

    数据排序,同 sortByKey ,但普通的 RDD 没有sortByKey, 只有 Key-ValueRDD 才有:

    参数
    func通过这个函数返回要排序的字段
    ascending是否升序
    numPartitions分区数

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
    val s2 = s1.map(s => (s.split(",")(0), s.split(",")(1).toInt))
    println(
      s2.sortBy(_._2,false)
        .collectAsMap()
    )
    println(
      s2.sortByKey(false).collectAsMap()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
    System.out.println(
      s1.map(s -> new Tuple2<>(s.split(",")[0], Integer.parseInt(s.split(",")[1])))
              .sortBy(t -> t._2, false, 1)
              .collect()
    );
    System.out.println(
      s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], Integer.parseInt(s.split(",")[1])))
              .sortByKey(false)
              .collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • python:
    s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
    s2 = s1.map(lambda s:(s.split(",")[0],int(s.split(",")[1])))
    print(
       s2.sortBy(lambda t:t[1],False)
           .collectAsMap()
    )
    print(
       s2.sortByKey(False)
           .collectAsMap()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述

    19. repartition,coalesce

    repartition:重新分区,coalesce:减少分区,如果新的分区数量比原分区数大, 必须 Shuffled, 否则重分区无效,repartitioncoalesce 的不同就在于 coalesce 可以控制是否 Shufflerepartition 是一个 Shuffled 操作。

    例如:

    • scala:
    var p1 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""))
    println(p1.getNumPartitions)
    p1 = p1.repartition(5)
    println(p1.getNumPartitions)
    p1 = p1.coalesce(3)
    println(p1.getNumPartitions)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • java:
    JavaRDD<String> p1 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""));
    System.out.println(p1.getNumPartitions());
    p1 = p1.repartition(5);
    System.out.println(p1.getNumPartitions());
    p1 = p1.coalesce(3);
    System.out.println(p1.getNumPartitions());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • python:
    p1 = sc.parallelize(("abc", "abc", "fff dd", "ee,pp", ""))
    print(p1.getNumPartitions)
    p1.repartition(5)
    print(p1.getNumPartitions)
    p1.coalesce(3)
    print(p1.getNumPartitions)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    20. cogroup

    多个 RDD 协同分组, 将多个 RDDKey 相同的 Value 分组:

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
    val s2 = sc.parallelize(Seq("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))
    
    val s3 = s1.map(s => (s.split(",")(0), s.split(",")(1)))
    val s4 = s2.map(s => (s.split(",")(0), s.split(",")(1)))
    
    println(
      s3.cogroup(s4).collectAsMap
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
    JavaRDD<String> s2 = sc.parallelize(Arrays.asList("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"));
    
    JavaPairRDD<String, String> s3 = s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
    JavaPairRDD<String, String> s4 = s2.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
    
    System.out.println(
           s3.cogroup(s4).collectAsMap()
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • python:
    s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
    s2 = sc.parallelize(("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))
    s3 = s1.map(lambda s: (s.split(",")[0], s.split(",")[1]))
    s4 = s2.map(lambda s: (s.split(",")[0], s.split(",")[1]))
    print(
        s3.cogroup(s4).collectAsMap()
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述

    四、Action 算子

    1. reduce

    对整个结果集规约, 最终生成一条数据, 是整个数据集的汇总。

    reducereduceByKey 完全不同, reduce 是一个 action, 并不是 Shuffled 操作,本质上 reduce 就是现在每个 partition 上求值, 最终把每个 partition 的结果再汇总。

    例如:

    • scala:
    var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
    println(
      p1.reduce((_+_))
    )
    
    • 1
    • 2
    • 3
    • 4
    • java:
    JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
    System.out.println(
        p1.reduce(Integer::sum)
    );
    
    • 1
    • 2
    • 3
    • 4
    • python:
     p1 = sc.parallelize((1, 2, 3, 4, 6))
     print(
         p1.reduce(lambda i1, i2: i1 + i2)
     )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    2. collect

    以数组的形式返回数据集中所有元素。
    例如:

    • scala:
    var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
    println(
      p1.collect()
    )
    
    • 1
    • 2
    • 3
    • 4
    • java:
    JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
    System.out.println(
           p1.collect()
    );
    
    • 1
    • 2
    • 3
    • 4
    • python:
    p1 = sc.parallelize((1, 2, 3, 4, 6))
    print(
        p1.collect()
    )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    3. count

    数据元素个数:

    例如:

    • scala:
    var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
    println(
     p1.count()
    )
    
    • 1
    • 2
    • 3
    • 4
    • java:
    JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
    System.out.println(
          p1.count()
    );
    
    • 1
    • 2
    • 3
    • 4
    • python:
    p1 = sc.parallelize((1, 2, 3, 4, 6))
    print(
       p1.count()
    )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    4. first

    返回第一个元素:

    例如:

    • scala:
    var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
    println(
      p1.first()
    )
    
    • 1
    • 2
    • 3
    • 4
    • java:
    JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
    System.out.println(
          p1.first()
    );
    
    • 1
    • 2
    • 3
    • 4
    • python:
    p1 = sc.parallelize((1, 2, 3, 4, 6))
    print(
        p1.first()
    )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    5. countByKey

    求得整个数据集中 Key 以及对应 Key 出现的次数:

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
    println(
     s1.map((_,1)).countByKey()
    )
    
    • 1
    • 2
    • 3
    • 4
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"))
    System.out.println(
          s1.mapToPair(s -> new Tuple2<>(s, 1)).countByKey()
    );
    
    • 1
    • 2
    • 3
    • 4
    • python:
    s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
    print(
        s1.map(lambda s: (s, 1)).countByKey()
    )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    6. take

    返回前 N 个元素:

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
    println(
      s1.take(3)
    )
    
    • 1
    • 2
    • 3
    • 4
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
    System.out.println(
           s1.take(3)
    );
    
    • 1
    • 2
    • 3
    • 4
    • python:
    s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
    print(
        s1.take(3)
    )
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    7. saveAsTextFile

    将结果存入 path 对应的目录中:

    例如:

    • scala:
     val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
     s1.saveAsTextFile("D:/test/output/text/")
    
    • 1
    • 2
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
    s1.saveAsTextFile("D:/test/output/text/");
    
    • 1
    • 2
    • python:
    s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
    s1.saveAsTextFile("D:/test/output/text/")
    
    • 1
    • 2

    在这里插入图片描述

    8. lookup

    根据 key 查询对应的 value

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
    println(
     s1.map(s=>(s.split(":")(0),s.split(":")(1).toDouble))
       .lookup("小明").toList
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"));
    System.out.println(
        s1.mapToPair(s -> new Tuple2<>(s.split(":")[0], Double.parseDouble(s.split(":")[1])))
                .lookup("小明")
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • python:
    s1 = sc.parallelize(("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
    print(
       s1.map(lambda s: (s.split(":")[0], float(s.split(":")[1])))
           .lookup("小明")
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    五、补充算子

    1. RDD 持久化

    对于需要复用的RDD,可以进行缓存,已防止重复计算,持久化主要有三个算子,cache、persist、Checkpoint,其中persist可以指定存储的类型,是硬盘还是内存,cache 底层调用的 persist 默认存储在内存中 ,Checkpoint 则可以存储在 HDFS 中:

    例如:

    • scala:
    val s1 = sc.parallelize(Seq("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
    //缓存
    s1.cache // 底层调用的 persist
    //持久化
    s1.persist(StorageLevel.MEMORY_AND_DISK) //使用内存和磁盘(内存不够时才使用磁盘)
    s1.persist(StorageLevel.MEMORY_ONLY) //持久化到内存
    // Checkpoint  应使用Checkpoint把数据发在HDFS上
    sc.setCheckpointDir("/data/spark/") //实际中写HDFS目录
    s1.checkpoint()
    //清空缓存
    s1.unpersist()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • java:
    JavaRDD<String> s1 = sc.parallelize(Arrays.asList("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"));
    //缓存
    s1.cache(); // 底层调用的 persist
    //持久化
    s1.persist(StorageLevel.MEMORY_AND_DISK()); //使用内存和磁盘(内存不够时才使用磁盘)
    s1.persist(StorageLevel.MEMORY_ONLY()); //持久化到内存
    // Checkpoint  应使用Checkpoint把数据发在HDFS上
    sc.setCheckpointDir("/data/spark/");//实际中写HDFS目录
    s1.checkpoint();
    //清空缓存
    s1.unpersist();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • python:
    s1 = sc.parallelize(("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
    # 缓存
    s1.cache() # 底层调用的persist
    # 持久化
    s1.persist(StorageLevel.MEMORY_AND_DISK) # 使用内存和磁盘(内存不够时才使用磁盘)
    s1.persist(StorageLevel.MEMORY_ONLY) # 持久化到内存
    # Checkpoint 使用Checkpoint把数据发在HDFS上
    sc.setCheckpointDir("/data/spark/") # 实际中写HDFS目录
    s1.checkpoint()
    # 清空缓存
    s1.unpersist()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2. 共享变量,累加器

    支持在所有 不同节点上进行全局累加计算:

    例如:

    • scala:
    //创建一个计数器/累加器
    var ljq = sc.longAccumulator("mycounter")
    ljq.add(2)
    println(ljq.value)
    
    • 1
    • 2
    • 3
    • 4
    • java:
    SparkContext sparkContext = JavaSparkContext.toSparkContext(sc);
    //创建一个计数器/累加器
    LongAccumulator ljq = sparkContext.longAccumulator("mycounter");
    ljq.add(2);
    System.out.println(ljq.value());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • python:
    ljq = sc.accumulator("mycounter")
    ljq.add(2)
    print(ljq.value)
    
    • 1
    • 2
    • 3

    3. 共享变量,广播变量

    支持在所有 不同节点上进行全局累加计算:

    例如:

    • scala:
    val list = Seq(1, 2, 3, 4, 6)
    val broadcast = sc.broadcast(list)
    val value = broadcast.value
    println(value.toList)
    
    • 1
    • 2
    • 3
    • 4
    • java:
    List<Integer> list = Arrays.asList(1, 2, 3, 4, 6);
    Broadcast<List<Integer>> broadcast = sc.broadcast(list);
    List<Integer> value = broadcast.getValue();
    System.out.println(value);
    
    • 1
    • 2
    • 3
    • 4
    • python:
    list = (1, 2, 3, 4, 6)
    broadcast = sc.broadcast(list)
    value = broadcast.value
    print(value)
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    谁能想到Java技术能跟看小说一样学习,不知不觉就看完了,膜拜大老!
    Linux网络编程(四)
    机器学习——聚类算法
    [Games101] Lecture 02 Review of Linear Algebra
    计算机组成原理 | 指令系统
    Android系统编译优化:使用Ninja加快编译
    Allegro172版本Shape避让方形盘不出现弧形操作指导
    存储器相关的术语总结
    数据脱敏sensitive(前端或数据库加密,解密)
    数据结构之带头双向循环链表的实现
  • 原文地址:https://blog.csdn.net/qq_43692950/article/details/128065466