• 35 个 Spark 常用算子总结


    35个 Spark 常用算子总结

    RDD 分为转换算子和行动算子。

    RDD 根据数据处理方式的不同,分为 value 类型、双 value 类型、key-value 类型。

    1. value 类型

    1.1 map

    函数签名:

    def map[U: ClassTag](f: T => U): RDD[U]
    
    • 1

    函数说明:

    Return a new RDD by applying a function to all elements of this RDD.

    val data: RDD[Long] = context.range(1, 11)
    data.map(i => i * 2)
    
    • 1
    • 2

    1.2 mapPartitions

    函数签名:

    def mapPartitions[U: ClassTag](
        f: Iterator[T] => Iterator[U],
        preservesPartitioning: Boolean = false): RDD[U]
    
    • 1
    • 2
    • 3

    函数说明:

    Return a new RDD by applying a function to each partition of this RDD.

    val data: RDD[Long] = context.range(1, 11)
    data.mapPartitions(iterator => iterator.map(i => i * 2))
    
    • 1
    • 2

    1.3 mapPartitionsWithIndex

    函数签名:

    def mapPartitionsWithIndex[U: ClassTag](
          f: (Int, Iterator[T]) => Iterator[U],
          preservesPartitioning: Boolean = false): RDD[U]
    
    • 1
    • 2
    • 3

    函数说明:

    Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

    val data: RDD[Long] = context.range(1, 11)
    data.mapPartitionsWithIndex((index, iterator) => iterator.map(num => (index, num)))
    
    • 1
    • 2

    1.4 flatMap

    函数签名:

    def filter(f: T => Boolean): RDD[T]
    
    • 1

    函数说明:

    Return a new RDD containing only the elements that satisfy a predicate.

    val data: RDD[Long] = context.range(1, 11)
    data.filter(num => num % 2 == 0)
    
    • 1
    • 2

    1.5 glom

    函数签名:

    def glom(): RDD[Array[T]]
    
    • 1

    函数说明:

    Return an RDD created by coalescing all elements within each partition into an array.

    val data: RDD[Long] = context.range(1, 11)
    val rddArr: RDD[Array[Long]] = data.glom()
    
    • 1
    • 2

    1.6 groupBy

    函数签名:

    def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
    
    • 1

    函数说明:

    Return an RDD of grouped items. Each group consists of a key and a sequence of elements mapping to that key. The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.

    val data: RDD[Long] = context.range(1, 11)
    val groupByData: RDD[(Boolean, Iterable[Long])] = data.groupBy(num => num % 2 == 0)
    
    • 1
    • 2

    1.7 filter

    函数签名:

    def filter(f: T => Boolean): RDD[T]
    
    • 1

    函数说明:

    Return a new RDD containing only the elements that satisfy a predicate.

    val data: RDD[Long] = context.range(1, 11)
    val filterData: RDD[Long] = data.filter(num => num % 2 == 0)
    
    • 1
    • 2

    1.8 sample

    函数签名:

    def sample(
          withReplacement: Boolean,
          fraction: Double,
          seed: Long = Utils.random.nextLong): RDD[T]
    
    • 1
    • 2
    • 3
    • 4

    函数说明:

    Return a sampled subset of this RDD.

    Params:
    withReplacement – can elements be sampled multiple times (replaced when sampled out)
    fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be greater than or equal to 0
    seed – seed for the random number generator

    val data: RDD[Long] = context.range(1, 101)
    val sampleData: RDD[Long] = data.sample(true, 0.2, 807)
    
    • 1
    • 2

    1.9 distinct

    函数签名:

    def distinct(): RDD[T]
    
    • 1

    函数说明:

    Return a new RDD containing the distinct elements in this RDD.

    val distinctRdd: RDD[Long] = data.distinct()
    
    • 1

    1.10 coalesce

    函数签名:

    def coalesce(numPartitions: Int, shuffle: Boolean = false,
                   partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
                  (implicit ord: Ordering[T] = null)
          : RDD[T]
    
    • 1
    • 2
    • 3
    • 4

    函数说明:

    Return a new RDD that is reduced into numPartitions partitions.

    val lessPartition: RDD[Long] = data.coalesce(4)
    
    • 1

    1.11 repartition

    函数签名:

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
        coalesce(numPartitions, shuffle = true)
      }
    
    • 1
    • 2
    • 3

    函数说明:

    Return a new RDD that has exactly numPartitions partitions.

    val morePartition: RDD[Long] = data.repartition(16)
    
    • 1

    1.12 sortBy

    函数签名:

    def sortBy[K](
          f: (T) => K,
          ascending: Boolean = true,
          numPartitions: Int = this.partitions.length)
          (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
    
    • 1
    • 2
    • 3
    • 4
    • 5

    函数说明:

    Return this RDD sorted by the given key function.

    val data: RDD[Long] = context.range(1, 101)
    val sortRDD: RDD[Long] = data.sortBy(num => num, false)
    
    • 1
    • 2

    2. 双 value 类型

    2.1 intersection

    函数签名:

    def intersection(other: RDD[T]): RDD[T]
    
    • 1

    函数说明:

    Return the intersection of this RDD and another one.

    val data1: RDD[Long] = context.range(1, 11)
    val data2: RDD[Long] = context.range(5, 16)
    val intersectionRDD: RDD[Long] = data1.intersection(data2)
    
    • 1
    • 2
    • 3

    2.2 union

    函数签名:

    def union(other: RDD[T]): RDD[T]
    
    • 1

    函数说明:

    Return the union of this RDD and another one.

    val data1: RDD[Long] = context.range(1, 11)
    val data2: RDD[Long] = context.range(5, 16)
    val unionRDD: RDD[Long] = data1.union(data2)
    
    • 1
    • 2
    • 3

    2.3 subtract

    函数签名:

    def subtract(other: RDD[T]): RDD[T]
    
    • 1

    函数说明:

    Return an RDD with the elements from this that are not in other.

    val data1: RDD[Long] = context.range(1, 11)
    val data2: RDD[Long] = context.range(5, 16)
    val subtractRDD: RDD[Long] = data1.subtract(data2)
    
    • 1
    • 2
    • 3

    2.4 zip

    函数签名:

    def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
    
    • 1

    函数说明:

    Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the *same number of partitions and the same number of elements in each partition.

    val data1: RDD[Long] = context.range(1, 5, 1, 1)
    val data2: RDD[String] = context.makeRDD(List("zhangsan", "lisi", "wangwu", "zhaoliu"), 1)
    val zipRDD: RDD[(Long, String)] = data1.zip(data2)
    
    • 1
    • 2
    • 3

    3. key-value 类型

    3.1 partitionBy

    函数签名:

    def partitionBy(partitioner: Partitioner): RDD[(K, V)]
    
    • 1

    函数说明:

    Return a copy of the RDD partitioned using the specified partitioner.

    val data: RDD[(Int, String)] = context.makeRDD(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu"), (4, "zhaoliu")))
    import org.apache.spark.HashPartitioner
    val hashPartitionerRDD: RDD[(Int, String)] = data.partitionBy(new HashPartitioner(1))
    
    • 1
    • 2
    • 3

    3.2 reduceByKey

    函数签名:

    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
    
    • 1

    函数说明:

    Merge the values for each key using an associative and commutative reduce function.

    val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1)))
    val reduceByKeyRDD = data.reduceByKey((value1, value2) => value1 + value2)
    
    • 1
    • 2

    3.3 groupByKey

    函数签名:

    def groupByKey(): RDD[(K, Iterable[V])]
    
    • 1

    函数说明:

    Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with the existing partitioner/parallelism level. The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.

    val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1)))
    val groupByKeyRDD: RDD[(String, Iterable[Int])] = data.groupByKey()
    
    • 1
    • 2

    3.4 aggregateByKey

    函数签名:

    def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
          combOp: (U, U) => U): RDD[(K, U)]
    
    • 1
    • 2

    函数说明:

    Aggregate the values of each key, using given combine functions and a neutral “zero value”.

    val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1)))
    val aggregateByKeyRDD: RDD[(String, Int)] = data.aggregateByKey(0)((num1, num2) => num1 + num2, (num1, num2) => num1 + num2)
    
    • 1
    • 2

    3.5 foldByKey

    函数签名:

    def foldByKey(
          zeroValue: V,
          partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
    
    • 1
    • 2
    • 3

    函数说明:

    Merge the values for each key using an associative function and a neutral “zero value” which may be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).

    val data: RDD[(String, Int)] = context.makeRDD(Array(("zhangsan", 1), ("lisi", 1), ("zhangsan", 1), ("lisi", 1)))
    val foldByKeyByKeyRDD: RDD[(String, Int)] = data.foldByKey(3)((num1, num2) => num1 + num2)
    
    • 1
    • 2

    3.6 sortByKey

    函数签名:

    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
          : RDD[(K, V)]
    
    • 1
    • 2

    函数说明:

    Sort the RDD by key, so that each partition contains a sorted range of the elements.

    val data: RDD[(Int, String)] = context.makeRDD(Array((3, "zhangsan"), (2, "lisi"), (4, "wangwu"), (0, "laoliu"), (1, "xiaoqi")))
    val sortByKeyRDD: RDD[(Int, String)] = data.sortByKey(false, 1)
    
    • 1
    • 2

    3.7 join

    函数签名:

    def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
    
    • 1

    函数说明:

    Return an RDD containing all pairs of elements with matching keys in this and other.

    val data1: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21), ("zhaoliu", 20)))
    val data2: RDD[(String, String)] = context.makeRDD(List(("zhangsan", "male"), ("lisi", "famle"), ("wangwu", "male"), ("laoliu", "male")))
    val joinRDD: RDD[(String, (Int, String))] = data1.join(data2)
    
    • 1
    • 2
    • 3

    3.8 leftOuterJoin

    函数签名:

    def leftOuterJoin[W](
          other: RDD[(K, W)],
          partitioner: Partitioner): RDD[(K, (V, Option[W]))]
    
    • 1
    • 2
    • 3

    函数说明:
    Perform a left outer join of this and other.

    val data1: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21), ("zhaoliu", 20)))
    val data2: RDD[(String, String)] = context.makeRDD(List(("zhangsan", "male"), ("lisi", "famle"), ("wangwu", "male"), ("laoliu", "male")))
    val leftOuterJoinnRDD: RDD[(String, (Int, Option[String]))] = data1.leftOuterJoin(data2)
    
    • 1
    • 2
    • 3

    3.9 cogroup

    函数签名:

    def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
          other2: RDD[(K, W2)],
          other3: RDD[(K, W3)],
          partitioner: Partitioner)
          : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
    
    • 1
    • 2
    • 3
    • 4
    • 5

    函数说明:

    For each key k in this or other1 or other2 or other3, return a resulting RDD that contains a tuple with the list of values for that key in this, other1, other2 and other3.

    val data1: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21), ("zhaoliu", 20)))
    val data2: RDD[(String, String)] = context.makeRDD(List(("zhangsan", "male"), ("lisi", "famle"), ("wangwu", "male"), ("laoliu", "male")))
    val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = data1.cogroup(data2)
    
    • 1
    • 2
    • 3

    4. action 算子

    4.1 reduce

    函数签名:

    def reduce(f: (T, T) => T): T
    
    • 1

    函数说明:

    Reduces the elements of this RDD using the specified commutative and associative binary operator.

    val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20)))
    val result: (String, Int) = data.reduce((data1, data2) => ("sum", data1._2 + data2._2))
    
    • 1
    • 2

    4.2 collect

    函数签名:

    def collect(): Array[T]
    
    • 1

    函数说明:

    Return an array that contains all of the elements in this RDD.

    val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20)))
    val result: Array[(String, Int)] = data.collect()
    
    • 1
    • 2

    4.3 count

    函数签名:

    def count(): Long
    
    • 1

    函数说明:

    Return the number of elements in the RDD.

    val result: Long = data.count()
    
    • 1

    4.4 first

    函数签名:

    def first(): T
    
    • 1

    函数说明:

    Return the first element in this RDD.

    val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20)))
    val result: (String, Int) = data.first()
    
    • 1
    • 2

    4.5 take

    函数签名:

    def take(num: Int): Array[T]
    
    • 1

    函数说明:

    Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

    val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20)))
    val result: Array[(String, Int)] = data.take(1)
    
    • 1
    • 2

    4.6 aggregate

    函数签名:

    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
    
    • 1

    函数说明:

    Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U’s, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
    Params:
    zeroValue – the initial value for the accumulated result of each partition for the seqOp operator, and also the initial value for the combine results from different partitions for the combOp operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)
    seqOp – an operator used to accumulate results within a partition
    combOp – an associative operator used to combine results from different partitions

    val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3)
    val result: String = data.aggregate("A")((A, data1) => A + data1._1, (str1, str2) => str1 + str2)
    
    • 1
    • 2

    4.7 fold

    函数签名:

    def fold(zeroValue: T)(op: (T, T) => T): T
    
    • 1

    函数说明:

    Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”. The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
    This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.
    Params:
    zeroValue – the initial value for the accumulated result of each partition for the op operator, and also the initial value for the combine results from different partitions for the op operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)
    op – an operator used to both accumulate results within a partition and combine results from different partitions

    val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3)
    val result: (String, Int) = data.fold("A", 65)((data1, data2) => ("sum", data1._2 + data2._2))
    
    • 1
    • 2

    4.8 countByKey

    函数签名:

    def countByKey(): Map[K, Long]
    
    • 1

    函数说明:

    Count the number of elements for each key, collecting the results to a local Map.

    val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3)
    val result: collection.Map[String, Long] = data.countByKey()
    
    • 1
    • 2

    4.9 saveAsTextFile

    函数签名:

    def saveAsTextFile(path: String): Unit
    
    • 1

    函数说明:

    Save this RDD as a text file, using string representations of elements.

    data.saveAsTextFile("./test")
    
    • 1

    4.10 foreach

    函数签名:

    def foreach(f: T => Unit): Unit
    
    • 1

    函数说明:

    Applies a function f to all elements of this RDD.

    val data: RDD[(String, Int)] = context.makeRDD(List(("zhangsan", 18), ("lisi", 20), ("wangwu", 21)), 3)
    data.foreach(data1 => println(data1._1))
    
    • 1
    • 2
  • 相关阅读:
    Springboot中使用@JsonProperty和@JSONField
    【SA8295P 源码分析 (二)】37 - OpenWFD Server 启动流程 之 openwfd_server.c main 函数源码分析
    机器学习实战应用案例100篇(十九)-鲸鱼算法从原理到实战应用
    手写 Promise(2)实例方法与静态方法的实现
    Flink系列文档-(YY04)-Flink编程基础API-Transformation算子
    boost 库记录
    考研复习408计算机网络——物理层
    【Java 进阶篇】深入了解 Bootstrap 按钮和图标
    带你熟悉NLP预训练模型:BERT
    [carla入门教程]-2 pythonAPI的使用
  • 原文地址:https://blog.csdn.net/weixin_46376562/article/details/126215241