• Spark基础【RDD转换算子】


    一 RDD单Value类型转换算子

    1 filter

    函数签名
    def filter(f: T => Boolean): RDD[T]
    
    • 1
    • 2

    将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。

    val rdd: RDD[Int] = sc.makeRDD(
      List(1, 2, 3, 4, 5, 6), 2
    )
    
    val rdd1: RDD[Int] = rdd.filter(
      num => num % 2 == 1
    )
    
    rdd1.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

    小功能:从agent.log文件中获取第二列为2的所有uid(第一列),部分数据展示如下

    1516609143867 6 7 64 16
    1516609143869 9 4 75 18
    1516609143869 1 7 87 12
    1516609143869 2 8 92 9
    1516609143869 6 7 84 24
    1516609143869 1 8 95 5
    1516609143869 8 1 90 29
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    val lines: RDD[String] = sc.textFile("data/agent.log")
    
    val filterLines: RDD[String] = lines.filter(
      line => {
        val datas = line.split(" ")
        datas(1).contains("2")
      }
    )
    
    val value: RDD[String] = filterLines.map(
      line => {
        val datas = line.split(" ")
        datas(0)
      }
    )
    
    value.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    2 sample

    函数签名
    def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T]
    
    • 1
    • 2
    • 3
    • 4
    • 5

    根据指定的规则从数据集中抽取数据

    如何解决数据倾斜:最简单的方式就是扩容,如HashMap

    在数据量非常大的情况下,有时为了分析造成数据倾斜的原因,需要从原数据中抽取出一部分进行分析,或者当需要将10000个存储在数据库中的对象存储到内存中,不确定内存是否足够,也可以抽取一部分对象,查看占用内存大小,从而推算出10000个对象所占的内存

    第一个参数表示抽取数据的方式

    • 抽取放回,true,伯努利算法

      又叫0、1分布。例如扔硬币,要么正面,要么反面

    • 抽取不放回,false,泊松算法

    第二个参数和第一个参数有关系

    • 如果是抽取不放回的场景,此参数表示每条数据被抽取的几率,并不是指被抽取的个数

      val rdd: RDD[Int] = sc.makeRDD(1 to 10)
      val rdd1: RDD[Int] = rdd.sample(false,0.5)
      
      • 1
      • 2
    • 如果是抽取放回的场景,此参数表示每条数据希望被重复抽取的次数

      val rdd1: RDD[Int] = rdd.sample(true,2)
      
      • 1

    第三个参数表示随机数种子

    随机数不随机,所谓的随机数,其实是通过随机算法获取的一个数,使用这个随机数种子可以产生其他的数据,如另随机算法为XXX

    3 = XXX(1),通过随机数种子1产生数据3

    2 = XXX(3),通过随机数种子3,产生数据2

    val rdd1: RDD[Int] = rdd.sample(false,0.5,2)
    
    • 1

    源码如下

    if (withReplacement) {
      new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
    } else {
      new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3 coalesce

    函数签名
    def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T]
    
    • 1
    • 2
    • 3
    • 4
    • 5

    根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

    当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本

    先假设有三个分区,每个分区内部各有1000条数据,经过过滤操作后,分区内剩余数据分为两种情况

    • 分区一10条,分区二100条,分区三998条
    • 分区一100条,分区二300条,分区三900条

    针对第二种情况,三个分区需要申请三个资源去处理,前两个资源执行的快,因为分区三没执行完,所以前两个资源也不可以释放,造成资源浪费,为了解决这种情况,可以进行缩减分区,将一二两个分区的数据一起处理,申请一个资源

    针对第一种情况,即使将一二两个分区的数据一起处理,整体的效率也不高,所以可以使用shuffle来打乱分区

    这里需要注意,缩减并不是在原始数据上进行缩减,而是在分区后的数据进行缩减,也可以称其为合并

    默认情况下分区不会shuffle

    val rdd: RDD[Int] = sc.makeRDD(
      List(1, 2, 3, 4, 5, 6), 3
    )
    val rdd1: RDD[Int] = rdd.coalesce(2)
    
    rdd.saveAsTextFile("output")	//分区一1,2 分区二3,4 分区三5,6
    rdd1.saveAsTextFile("output1")	//分区一1,2 分区二3,4,5,6
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    shuffle表示一个分区的数据被打乱后和其他分区的数据重新组合在一起

    在合并的过程中,程序并不一定会将两个数据少的分区缩减在一起,根据首选位置(地理位置)进行缩减,尽量减少网络传输,增加效率,所以在某些情况下,以上操作无法解决数据倾斜问题

    所以还可以在缩减分区的同时,进行数据的shuffle操作,使得分区内的数据更加均匀,shuffle操作,前期慢,后期计算快

    val rdd1: RDD[Int] = rdd.coalesce(2,true)
    
    • 1

    扩大分区:数据量多,分区数少,分区内的数据很多,给计算资源造成很大压力,增加分区,减少计算压力,简单来说就是降低负载

    val rdd: RDD[Int] = sc.makeRDD(
      List(1, 2, 3, 4, 5, 6), 2
    )
    
    val rdd1: RDD[Int] = rdd.coalesce(3,true)
    
    rdd.saveAsTextFile("output")
    rdd1.saveAsTextFile("output1")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    如果不进行shuffle,coalesce扩大分区没有意义

    4 repartition

    函数签名
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    
    • 1
    • 2

    该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程

    val rdd1: RDD[Int] = rdd.repartition(3)
    
    • 1

    repartition源码

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      coalesce(numPartitions, shuffle = true)
    }
    
    • 1
    • 2
    • 3

    5 distinct

    函数签名
    def distinct()(implicit ord: Ordering[T] = null): RDD[T]
    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    
    • 1
    • 2
    • 3

    将数据集中重复的数据去重

    与scala集合(单点)的去重不同,scala集合直接将数据放到HashSet中,自动去重,rdd(分布式)的海量数据去重不能直接在内存中进行计算

    scala去重源码

    def distinct: Repr = {
      val isImmutable = this.isInstanceOf[immutable.Seq[_]]
      if (isImmutable && lengthCompare(1) <= 0) repr
      else {
        val b = newBuilder
        val seen = new mutable.HashSet[A]()
        var it = this.iterator
        var different = false
        while (it.hasNext) {
          val next = it.next
          if (seen.add(next)) b += next else different = true
        }
        if (different || !isImmutable) b.result() else repr
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    rdd去重部分源码

    map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
    执行流程:
    元数据【1,1,1】
    【(1null),(1null),(1null)】
    【nullnullnull】
    【nullnull】
    【(1null)】
    【1
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    当发现分区有问题时,可以在distinct中传递分区数量,所以在distinct底层存在shuffle

    val rdd1: RDD[Int] = rdd.distinct(3)
    
    • 1

    6 sortBy

    函数签名
    def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程

    val rdd: RDD[Int] = sc.makeRDD(
      List(4,7,2,1,4,8), 3
    )
    
    val rdd1: RDD[Int] = rdd.sortBy(num => num)
    
    println(rdd1.collect().mkString(","))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    降序排序

    val rdd1: RDD[Int] = rdd.sortBy(num => num,false)
    
    • 1

    二 RDD双Value类型转换算子

    1 intersection

    对源RDD和参数RDD求交集后返回一个新的RDD,取两集合的交集

    函数签名
    def intersection(other: RDD[T]): RDD[T]
    
    • 1
    • 2
    val rdd: RDD[Int] = sc.makeRDD(
      List(1,2,3,4), 3
    )
    
    val rdd1: RDD[Int] = sc.makeRDD(
      List(3,4,5,6), 3
    )
    
    println(rdd.intersection(rdd1).collect().mkString(","))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2 union

    对源RDD和参数RDD求交集后返回一个新的RDD,取两集合的并集

    函数签名
    def union(other: RDD[T]): RDD[T]
    
    • 1
    • 2
    println(rdd.union(rdd1).collect().mkString(","))
    
    • 1

    3 subtract

    对源RDD和参数RDD求交集后返回一个新的RDD,取两集合的差集

    函数签名
    def subtract(other: RDD[T]): RDD[T]
    
    • 1
    • 2
    println(rdd.subtract(rdd1).collect().mkString(","))
    
    • 1

    1 2 3 如果两个RDD数据类型不一致,不能进行操作

    4 zip

    函数签名
    def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
    
    • 1
    • 2

    将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素

    println(rdd.zip(rdd1).collect().mkString(","))
    
    • 1

    如果两个RDD分区数据数量不一致

    val rdd: RDD[Int] = sc.makeRDD(
      List(1,2,3,4), 2
    )
    
    val rdd1: RDD[Int] = sc.makeRDD(
      List(3,4,5,6,7,8), 2
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Can only zip RDDs with same number of elements in each partition

    使两个RDD分区内数据相同,两个RDD数据分区不一致

    val rdd: RDD[Int] = sc.makeRDD(
      List(1,2,3,4), 2
    )
    
    val rdd1: RDD[Int] = sc.makeRDD(
      List(3,4,5,6,7,8), 3
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Can’t zip RDDs with unequal numbers of partitions: List(2, 3)

    所以需要保证两个RDD分区数量要相同,每个分区内的元素数量还要相等

    两个RDD数据类型不一致,可以进行操作

    val rdd2: RDD[String] = sc.makeRDD(
      List("3","4","5","6"),2
    )
     println(rdd.zip(rdd2).collect().mkString(","))
    
    • 1
    • 2
    • 3
    • 4

    三 RDD Key -Value类型转换算子

    1 partitionBy

    函数签名
    def partitionBy(partitioner: Partitioner): RDD[(K, V)]
    
    • 1
    • 2

    将数据按照指定Partitioner对每一条数据重新进行分区。Spark默认的分区器是HashPartitioner

    与repartition不相同,repartition强调分区数量的变化,不关心数据怎么变

    partitionBy算子更关心数据的分区规则

    val rdd: RDD[Int] = sc.makeRDD(
      List(1,2,3,4), 2
    )
    
    val rdd1: RDD[(Int, Int)] = rdd.map((_,1))
    
    rdd1.partitionBy(null)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    以上调用RDD对象的partitionBy方法会报错,因为经过二次编译,隐式转换使得RDD变为了PairRDDFunctions,源码如下

    implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
      new PairRDDFunctions(rdd)
    }
    
    • 1
    • 2
    • 3
    • 4

    Spark的分区器:

    • RangePartitioner:按照一定的范围进行分区

      sortBy使用的就是RangePartitioner分区器

      def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
          : RDD[(K, V)] = self.withScope
      {
        val part = new RangePartitioner(numPartitions, self, ascending)
        new ShuffledRDD[K, V, V](self, part)
          .setKeyOrdering(if (ascending) ordering else ordering.reverse)
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
    • HashPartitioner:默认shuffle分区器

      val rdd: RDD[Int] = sc.makeRDD(
        List(1,2,3,4), 2
      )
      
      val rdd1: RDD[(Int, Int)] = rdd.map((_,1))
      
      rdd1.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

      HashPartitioner源码

      def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
      }
      def nonNegativeMod(x: Int, mod: Int): Int = {
          val rawMod = x % mod
          rawMod + (if (rawMod < 0) mod else 0)
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8

    2 reduceByKey

    函数签名
    def reduceByKey(func: (V, V) => V): RDD[(K, V)]
    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
    
    • 1
    • 2
    • 3

    可以将数据按照相同的Key对Value分在一个组中,然后进行reduce操作

    val rdd: RDD[(String,Int)] = sc.makeRDD(
      List(
        ("a", 1),
        ("a", 1),
        ("a", 1),
        ("b", 1)
      )
    )
    val wordCount: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    reduceByKey可以实现WordCount(2/10)

  • 相关阅读:
    麒麟Arm64nacos打包docker镜像说明
    python环境搭建
    神经网络图像识别技术,神经网络指纹识别
    【CQF Finance Class 4 金融衍生品】
    Selenium进行无界面爬虫开发
    关于蓝牙人员定位的几个重要问题
    花呗不小心升级了信用购会影响什么
    wordpress遇到的问题
    二叉搜索树之:【中序遍历一棵二叉搜索树】【给一棵有固定形态的二叉搜索树填值】【用BST中序遍历的性质填值】【之前讲过层序遍历】
    rpc通信的实现方式(以grpc为例)
  • 原文地址:https://blog.csdn.net/weixin_43923463/article/details/126272166