• Spark基础【RDD累加器、广播变量、一个小案例】


    累加器

    1 问题引入

    Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

    • RDD : 弹性分布式数据集
    • 累加器:分布式共享只写变量
    • 广播变量:分布式共享只读变量
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
      val sc = new SparkContext(conf)
    
      val rdd: RDD[Int] = sc.makeRDD(
        List(1, 2, 3, 4),2
      )
      var sum = 0
      rdd.foreach(
        num => num + sum
      )
    
      println(sum)
      
      sc.stop()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    以上代码中,list和sum分别在Driver端声明,而foreach是一个行动算子,在Executor端执行,执行时在Executor端没有发现sum,因闭包,会将sum传给Executor端,计算完成之后,分区一中的sum = 3,分区二中的sum = 7,而println在Driver端执行,Driver中的sum仍然等于0,所以最终返回0,关键在于Executor端执行完成的sum没有传回到Driver端,在以上代码的实现方式中,无法将计算结果返回到Driver端,因为sum能够传到Executor的原因是闭包,存在闭包,所以Spark知道在以后的执行中可能会用到sum,但是它并不知道最终的计算结果也要返回到Driver端

    list与rdd有关,所以可以通过rdd.collect采集回去,但sum只是一个变量,与rdd没有任何的关系,所以没有办法告诉Spark将sum也采集回去

    2 实现原理

    累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge,将多个分区的sum在Driver端进行合并

    在kafka中Producer的主线程将生产的数据放到缓冲区中,sender线程将数据从缓冲区取出放到kafka集群中,这样就可以让生产的数据均匀一些,这个缓冲区叫做RecordAccmulator,kafka使用双端队列实现生产有序,这个双端队列(DQ)就在缓冲区中

    Sqark中的其中一个累加器叫做longAccmulator

    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
      val sc = new SparkContext(conf)
    
      val rdd: RDD[Int] = sc.makeRDD(
        List(1, 2, 3, 4),2
      )
      // 创建累加器
      val sum: LongAccumulator = sc.longAccumulator("sum")
      rdd.foreach(
        num => {
          // 使用累加器
          sum.add(num)
        }
      )
      // 获取累加器的结果
      println(sum.value)
    
      sc.stop()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    3 自定义累加器

    使用累加器实现wordcount

    累加器源码解析:

    def longAccumulator(name: String): LongAccumulator = {
      // 构建一个对象
      val acc = new LongAccumulator
      register(acc, name)
      acc
    }
    
    class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {}
    
    abstract class AccumulatorV2[IN, OUT] extends Serializable {}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    根据源码来看,自定义累加器的流程

    • 继承AccumulatorV2类

    • 定义泛型

      • IN :输入数据的类型 String
      • OUT 输出数据的类型 Map[K, V]
    • 重写方法,6个(3【与计算相关】 + 3【与状态相关】)

      isZero:判断累加器是否为初始状态
      copy:  复制累加器
      reset: 重置累加器,原来的map中有数据,现在需要将其清空
      add:   从外部向累加器中添加数据
      merge: 合并多个累加器的结果
      value: 将结果返回到外部
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
      val sc = new SparkContext(conf)
    
      val rdd: RDD[String] = sc.makeRDD(
        List(
          "spark",
          "spark",
          "spark",
          "spark",
          "spark",
          "scala",
          "scala"
        )
      )
      // 创建累加器
      val accu = new WordCountAccu()
      // 向spark进行注册
      sc.register(accu,"wordCount")
      rdd.foreach(
        word => {
          // 将单词加入到累加器
          accu.add(word)
        }
      )
      // 输出结果
      println(accu.value)
    
      sc.stop()
    }
    
    class WordCountAccu extends AccumulatorV2[String,mutable.Map[String,Int]]{
    
      // 将结果存储到map
      private val wcMap: mutable.Map[String, Int] = mutable.Map[String, Int]()
    
      // 判断累加器是否为初始状态
      override def isZero: Boolean = {
        wcMap.isEmpty
      }
    
      override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
        new WordCountAccu()
      }
    
      // 重置累加器,原来的map中有数据,现在需要将其清空
      override def reset(): Unit = {
        wcMap.clear()
      }
    
      // 从外部向累加器中添加数据
      override def add(word: String): Unit = {
        val oldCnt: Int = wcMap.getOrElse(word,0)
        wcMap.update(word,oldCnt + 1)
      }
    
      // 合并多个累加器的结果
      override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
        other.value.foreach{
          case (word,cnt) => {
            val oldCnt: Int = wcMap.getOrElse(word,0)
            wcMap.update(word,oldCnt + cnt)
          }
        }
      }
    
      // 将结果返回到外部
      override def value: mutable.Map[String, Int] = wcMap
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    4 问题深入

    为什么进行以下操作

    • copy:将累加器从Driver端复制到Executor端

    • reset:分布式计算中,如果原始累加器因网络原因发送分区一成功,分区二失败,当分区一计算完成,累加器返回到Driver端,如果将计算完成的累加器传给分区二,若不重置,会造成结果错误,累加器是一个分布式的只写变量,所以这就要求它累加器之间不能互相读取,使用重置实现

    • isZero:先执行copy,再执行reset,最后执行isZero,如果isZero返回false会报错assertion failed: copyAndReset must return a zero value copy

      源码如下:

      final def assert(assertion: Boolean, message: => Any) {
        if (!assertion)
          throw new java.lang.AssertionError("assertion failed: "+ message)
      }
      
      
      val copyAcc = copyAndReset()
      assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
      
      
      def copyAndReset(): AccumulatorV2[IN, OUT] = {
          val copyAcc = copy()
          copyAcc.reset()
          copyAcc
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15

    5 存在问题

    累加器重复计算的问题:将累加器放在转换算子当中,转换算子如果重复使用的场景下,数据会重复计算

    累加器没有计算问的题:将累加器放在转换算子当中,没有执行行动算子,数据不会计算

    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
      val sc = new SparkContext(conf)
    
      val rdd: RDD[Int] = sc.makeRDD(
        List(1, 2, 3, 4),2
      )
      // 创建累加器
      val sum: LongAccumulator = sc.longAccumulator("sum")
      val rdd1: RDD[Int] = rdd.map {
        num => {
          sum.add(num)
          num * 2
        }
      }
      rdd1.collect()
      rdd1.foreach(println)
      println("************")
      
      // 获取累加器的结果
      println(sum.value)
    
      sc.stop()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    二 广播变量

    1 问题引入

    实现两个rdd的join操作

    val rdd1: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 1), ("b", 2))
    )
    val rdd2: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 3), ("b", 4))
    )
    val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
    rdd3.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    join操作的缺点:可能会产生笛卡尔乘积,存在shuffle

    使用map提升性能,没有笛卡尔,没有shuffle

    val rdd1: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 1), ("b", 2))
    )
    
    val map: mutable.Map[String, Int] = mutable.Map[String, Int](
      ("a", 3), ("b", 4)
    )
    val rdd2: RDD[(String, (Int, Int))] = rdd1.map {
      case (word, cnt) => {
        val cnt1: Int = map.getOrElse(word, 0)
        (word, (cnt, cnt1))
      }
    }
    rdd2.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    以上代码存在一个问题,map是在Driver端声明,而map算子在Executor端执行,如果map中存在大量数据,每一个task都会有一个存放map的内存,造成内存中存在大量的数据冗余,如果将map中的数据放到Executor中,可解决数据冗余的问题

    2 实现原理

    广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送

    val rdd1: RDD[(String, Int)] = sc.makeRDD(
      List(("a", 1), ("b", 2))
    )
    val map: mutable.Map[String, Int] = mutable.Map[String, Int](
      ("a", 3), ("b", 4)
    )
    val bcMap: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
    val rdd2: RDD[(String, (Int, Int))] = rdd1.map {
      case (word, cnt) => {
        val cnt1: Int = bcMap.value.getOrElse(word, 0)
        (word, (cnt, cnt1))
      }
    }
    rdd2.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    三 案例实操

    0 数据准备

    日期 用户ID Session ID 页面ID 动作时间 搜索关键字 点击数据 下单行为 支付行为

    在这里插入图片描述

    上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的4种行为:搜索,点击,下单,支付。数据规则如下:

    • 数据文件中每行数据采用下划线分隔数据

    • 每一行数据表示用户的一次行为,这个行为只能是4种行为的一种

    • 如果搜索关键字为null,表示数据不是搜索数据

    • 如果点击的品类ID和产品ID为-1,表示数据不是点击数据

    • 针对于下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,id之间采用逗号分隔,如果本次不是下单行为,则数据采用null表示

    • 支付行为和下单行为类似

    详细字段说明:

    编号字段名称字段类型字段含义
    1dateString用户点击行为的日期
    2user_idLong用户的ID
    3session_idStringSession的ID
    4page_idLong某个页面的ID
    5action_timeString动作的时间点
    6search_keywordString用户搜索的关键词
    7click_category_idLong某一个商品品类的ID
    8click_product_idLong某一个商品的ID
    9order_category_idsString一次订单中所有品类的ID集合
    10order_product_idsString一次订单中所有商品的ID集合
    11pay_category_idsString一次支付中所有品类的ID集合
    12pay_product_idsString一次支付中所有商品的ID集合
    13city_idLong城市 id

    注:编号代表其在数据中的位置,数组从0开始

    样例类:

    //用户访问动作表
    case class UserVisitAction(
        date: String,//用户点击行为的日期
        user_id: Long,//用户的ID
        session_id: String,//Session的ID
        page_id: Long,//某个页面的ID
        action_time: String,//动作的时间点
        search_keyword: String,//用户搜索的关键词
        click_category_id: Long,//某一个商品品类的ID
        click_product_id: Long,//某一个商品的ID
        order_category_ids: String,//一次订单中所有品类的ID集合
        order_product_ids: String,//一次订单中所有商品的ID集合
        pay_category_ids: String,//一次支付中所有品类的ID集合
        pay_product_ids: String,//一次支付中所有商品的ID集合
        city_id: Long
    )//城市 id
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    1 需求1:Top10热门品类

    (1)需求说明

    品类是指产品的分类,大型电商网站品类分多级,此项目中品类只有一级,不同的公司可能对热门的定义不一样。在这里按照每个品类的点击、下单、支付的量来统计热门品类。

    鞋 点击数 下单数 支付数

    衣服 点击数 下单数 支付数

    电脑 点击数 下单数 支付数

    例如,综合排名 = 点击数20%+下单数30%+支付数*50%

    本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。

    (2)实现流程

    1 读取文件,获取原始数据
    
    2 统计品类的点击数量
      	统计分区前需要将不需要的数据过滤掉(数据清洗)
      	保留所有的点击数据
     	 对点击数据进行统计
     	 
    3 统计品类的下单数量
     	 注意:一次可以下单多个类别的商品,(1,2,3,4) => (1,1)(2,1)(3,1)(4,1)
     	 先扁平化,再map
     	 
    4 统计品类的支付数量,下单行为与支付行为类似
    
    5 对统计结果进行排序,先点击,再下单,最后支付 => Tuple(点击,下单,支付)
      	(品类ID,点击)Data
      	(品类ID,下单)Data
      	(品类ID,支付)Data
      	最终合成(品类ID,(点击,下单,支付))
        	join,leftOuterJoin,rightOuterJoin:当左或右无数据时,存在缺陷
        	fullOuterJoin可行,但连接三个data,数据结构复杂(RDD[(String, (Option[(Option[Int], Option[Int])], Option[Int]))])
        	使用
        	
    6 将结果采集后打印在控制台上
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    (3)代码实现

    def main(args: Array[String]): Unit = {
    
      val conf = new SparkConf().setMaster("local").setAppName("HotCategoryTop10")
      val sc = new SparkContext(conf)
    
      // TODO  1 读取文件,获取原始数据
      val fileDatas: RDD[String] = sc.textFile("data/user_visit_action.txt")
    
      // TODO 2 统计品类的点击数量
      //  过滤数据
      val clickDatas: RDD[String] = fileDatas.filter {
        data => {
          val datas: Array[String] = data.split("_")
          val ccid: String = datas(6)
          ccid != "-1"
        }
      }
    
      val clickCntDatas: RDD[(String, Int)] = clickDatas.map {
        data => {
          val datas: Array[String] = data.split("_")
          val ccid: String = datas(6)
          (ccid, 1)
        }
      }.reduceByKey(_ + _)
    
      // TODO 3 统计品类的下单数量
      // 注意:一次可以下单多个类别的商品,(1,2,3,4) => (1,1)(2,1)(3,1)(4,1)
      val orderDatas: RDD[String] = fileDatas.filter {
        data => {
          val datas: Array[String] = data.split("_")
          val ocid: String = datas(8)
          ocid != "null"
        }
      }
    
      val orderCntDatas: RDD[(String, Int)] = orderDatas.flatMap(
        data => {
          val datas: ArrayOps.ofRef[String] = data.split("_")
          val ocid: StringOps = datas(8)
          val ocids: ArrayOps.ofRef[String] = ocid.split(",")
          ocids.map((_, 1))
        }
      ).reduceByKey(_ + _)
    
      // TODO 4 统计品类的支付数量,支付行为与下单行为类似
      val payDatas: RDD[String] = fileDatas.filter(
        data => {
          val datas: Array[String] = data.split("_")
          val pcid: String = datas(10)
          pcid != "null"
        }
      )
    
      val payCntDatas: RDD[(String, Int)] = payDatas.flatMap(
        data => {
          val datas: ArrayOps.ofRef[String] = data.split("_")
          val pcid: StringOps = datas(10)
          val pcids: ArrayOps.ofRef[String] = pcid.split(",")
          pcids.map((_, 1))
        }
      ).reduceByKey(_ + _)
    
      // TODO 5 对统计结果进行排序,先点击,再下单,最后支付
      val ccidCntsDatas: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
        clickCntDatas.cogroup(orderCntDatas,payCntDatas)
      
      val mapDatas: RDD[(String, (Int, Int, Int))] = ccidCntsDatas.map {
        case (cid, (clickIter, orderIter, payIter)) => {
          var clickCnt = 0
          var orderCnt = 0
          var payCnt = 0
    
          val iterator1: Iterator[Int] = clickIter.iterator
          if (iterator1.hasNext) {
            clickCnt = iterator1.next()
          }
    
          val iterator2: Iterator[Int] = orderIter.iterator
          if (iterator2.hasNext) {
            orderCnt = iterator2.next()
          }
    
          val iterator3: Iterator[Int] = payIter.iterator
          if (iterator3.hasNext) {
            payCnt = iterator3.next()
          }
    
          (cid, (clickCnt, orderCnt, payCnt))
        }
      }
        
      val top10: Array[(String, (Int, Int, Int))] = mapDatas.sortBy(_._2,false).take(10)
    
      // TODO 6 将结果采集后打印在控制台上
      top10.foreach(println)
    
      sc.stop()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
  • 相关阅读:
    【JavaScript】一文搞懂JavaScript当中晦涩难懂的类型转换
    springboot实现支付宝沙箱支付功能
    [学习笔记]CS224W
    从汇编看语法糖(C++引用)和指针的区别
    Yolov5进阶之八 高低版本格式转换问题
    zookeeper的安装与配置和启动闪退问题的解决(win)
    极智AI | 昇腾 CANN ATC 模型转换
    洛谷 P2812 校园网络(强连通分量, 缩点)
    程序员为什么会成为工具人——及其一些破局的思考
    I.MX6ull UART
  • 原文地址:https://blog.csdn.net/weixin_43923463/article/details/126352302