• 大数据之Spark案例实操完整使用(第六章)


    一、案例一

    1、准备数据

    在这里插入图片描述
    上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的 4 种行为:搜索,点击,下单,支付。数据规则如下:

    ➢ 数据文件中每行数据采用下划线分隔数据

    ➢ 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种

    ➢ 如果搜索关键字为 null,表示数据不是搜索数据

    ➢ 如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据

    ➢ 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示

    ➢ 支付行为和下单行为类似

    详细字段说明

    编号字段名称字段类型字段含义
    1dateString用户点击行为的日期
    2user_idLong用户的 ID
    3session_idStringSession 的 ID
    4page_idLong某个页面的 ID
    5action_timeString动作的时间点
    6search_keywordString用户搜索的关键词
    7click_category_idLong某一个商品品类的 ID
    8click_product_idLong某一个商品的 ID
    9 order_category_idsString一次订单中所有品类的 ID 集合
    10 order_product_idsString一次订单中所有商品的 ID 集合
    11pay_category_idsString一次支付中所有品类的 ID 集合
    12pay_product_idsString一次支付中所有商品的 ID 集合
    13city_idLong城市 id
    //用户访问动作表
    case class UserVisitAction(
     date: String,//用户点击行为的日期
     user_id: Long,//用户的 ID
     session_id: String,//Session 的 ID
     page_id: Long,//某个页面的 ID
     action_time: String,//动作的时间点
     search_keyword: String,//用户搜索的关键词
     click_category_id: Long,//某一个商品品类的 ID
     click_product_id: Long,//某一个商品的 ID
     order_category_ids: String,//一次订单中所有品类的 ID 集合
     order_product_ids: String,//一次订单中所有商品的 ID 集合
     pay_category_ids: String,//一次支付中所有品类的 ID 集合
     pay_product_ids: String,//一次支付中所有商品的 ID 集合
     city_id: Long
    )//城市 id
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2、需求 1:Top10 热门品类

    在这里插入图片描述

    3、需求说明

    品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的
    公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。

    鞋 点击数 下单数 支付数
    衣服 点击数 下单数 支付数
    电脑 点击数 下单数 支付数
    例如,综合排名 = 点击数20%+下单数30%+支付数*50%
    本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下 单数;下单数再相同,就比较支付数。

    方案一、

    分别统计每个品类点击的次数,下单的次数和支付的次数:
    (品类,点击总数)(品类,下单总数)(品类,支付总数)
    
    • 1
    • 2
    package com.spack.bigdata.core.req
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_Req1_HotCategoryTop10Analysis {
    
      def main(args: Array[String]): Unit = {
        /**
         *
         * TODO 热门类品类
         */
    
        val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
        val sc = new SparkContext(operator)
    
    
        //TODO 1、读取原始日志数据
        val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
    
        //TODO 2、统计品类的点击数量:(品类ID,点击数量)
        val clickActionRDD = actionRdd.filter(
          action => {
            val datas = action.split("_")
            //获取索引6的、去除不是-1的数据
            datas(6) != "-1"
    
          }
        )
    
    
        val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
          action => {
            val datas = action.split("_")
            //点击品类的ID就有了、数量就是1--(单独统计点击的品类)
            (datas(6), 1)
          }
        ).reduceByKey(_ + _)
    
        //TODO 3、统计品类的下单数量:(品类ID,下单数量) ----下单的话一定不为null
    
        val orderCountRDD = actionRdd.filter(
          action => {
            val datas = action.split("_")
            //获取索引6的、去除不是-1的数据
            datas(8) != "null"
    
          }
        )
    
        val orderCount: RDD[(String, Int)] = orderCountRDD.flatMap(
          action => {
            val datas = action.split("_")
            val cid = datas(8)
            val cids = cid.split(",")
            cids.map(id => (id, 1))
    
          }
        ).reduceByKey(_ + _)
    
        //    value".collect().foreach(println)
    
    
        //TODO 4、统计品类的支付数量:(品类ID,支付数量)
    
        val payCountRDD = actionRdd.filter(
          action => {
            val datas = action.split("_")
            //获取索引6的、去除不是-1的数据
            datas(10) != "null"
    
          }
        )
    
        val payCount: RDD[(String, Int)] = payCountRDD.flatMap(
          action => {
            val datas = action.split("_")
            val cid = datas(10)
            val cids = cid.split(",")
            cids.map(id => (id, 1))
          }
        ).reduceByKey(_ + _)
    
    
    
        //TODO 5、将品类进行排序,并且提取前十名
        //点击数量排序、下单数量排序,支付数量排序
        //元组排序:先比较第一个,在比较第二个,在比较第三个,以此类推退
        //(品类ID,(点击数量,下单数量,支付数量))
    
        //会在自己的数据源建立一个分组、跟另外一个数据源做一个链接
        //cogroup = connect + group
    
        val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRDD.cogroup(orderCount, payCount)
    
        val analysisRDD = cogroupRDD.mapValues {
          case (clickIter, orderIter, payIter) => {
            var clickCnt = 0
            val iter1 = clickIter.iterator
            if (iter1.hasNext) {
              clickCnt = iter1.next()
            }
    
            var orderCnt = 0
            val iter2 = orderIter.iterator
            if (iter2.hasNext) {
              orderCnt = iter2.next()
            }
    
            var payCnt = 0
            val iter3 = payIter.iterator
            if (iter3.hasNext) {
              payCnt = iter3.next()
            }
            (clickCnt, orderCnt, payCnt)
          }
        }
    
        val tuples = analysisRDD.sortBy(_._2, false).take(10)
        tuples.foreach(println)
        //TODO 6、将结果采集到控制台打印出来
    
        sc.stop()
    
        //TODO 7、统计品类的点击数量:(品类ID,点击数量)
    
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129

    结果:
    (15,(6120,1672,1259))
    (2,(6119,1767,1196))
    (20,(6098,1776,1244))
    (12,(6095,1740,1218))
    (11,(6093,1781,1202))
    (17,(6079,1752,1231))
    (7,(6074,1796,1252))
    (9,(6045,1736,1230))
    (19,(6044,1722,1158))
    (13,(6036,1781,1161))

    实现方案二

    一次性统计每个品类点击的次数,下单的次数和支付的次数:
    (品类,(点击总数,下单总数,支付总数))
    
    • 1
    • 2
    package com.spack.bigdata.core.req
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    /**
     * 第二种实现方式
     */
    object Spark02_Req1_HotCategoryTop10Analysis {
    
      def main(args: Array[String]): Unit = {
        /**
         *
         * TODO 热门类品类
         */
    
        val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
        val sc = new SparkContext(operator)
        //Q: actionRdd重复使用 -使用缓存
        //Q: cogroup性能可能较低
    
        //TODO 1、读取原始日志数据
        val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
        actionRdd.cache()
    
        //TODO 2、统计品类的点击数量:(品类ID,点击数量)
        val clickActionRDD = actionRdd.filter(
          action => {
            val datas = action.split("_")
            //获取索引6的、去除不是-1的数据
            datas(6) != "-1"
          }
        )
    
        val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
          action => {
            val datas = action.split("_")
            //点击品类的ID就有了、数量就是1--(单独统计点击的品类)
            (datas(6), 1)
          }
        ).reduceByKey(_ + _)
    
        //TODO 3、统计品类的下单数量:(品类ID,下单数量) ----下单的话一定不为null
    
        val orderCountRDD = actionRdd.filter(
          action => {
            val datas = action.split("_")
            //获取索引6的、去除不是-1的数据
            datas(8) != "null"
    
          }
        )
    
        val orderCount: RDD[(String, Int)] = orderCountRDD.flatMap(
          action => {
            val datas = action.split("_")
            val cid = datas(8)
            val cids = cid.split(",")
            cids.map(id => (id, 1))
    
          }
        ).reduceByKey(_ + _)
    
        //    value".collect().foreach(println)
    
        //TODO 4、统计品类的支付数量:(品类ID,支付数量)
    
        val payCountRDD = actionRdd.filter(
          action => {
            val datas = action.split("_")
            //获取索引6的、去除不是-1的数据
            datas(10) != "null"
    
          }
        )
    
        val payCount: RDD[(String, Int)] = payCountRDD.flatMap(
          action => {
            val datas = action.split("_")
            val cid = datas(10)
            val cids = cid.split(",")
            cids.map(id => (id, 1))
          }
        ).reduceByKey(_ + _)
    
        //TODO 5、将品类进行排序,并且提取前十名
        //点击数量排序、下单数量排序,支付数量排序
        //元组排序:先比较第一个,在比较第二个,在比较第三个,以此类推退
        //(品类ID,(点击数量,下单数量,支付数量))
    
        //会在自己的数据源建立一个分组、跟另外一个数据源做一个链接
        //cogroup = connect + group
    
        val rdd = clickCountRDD.map {
          case (cid, cnt) => {
            (cid, (cnt, 0, 0))
          }
        }
    
        val rdd1 = orderCount.map {
          case (cid, cnt) => {
            (cid, (0, cnt, 0))
          }
        }
        val rdd2 = payCount.map {
          case (cid, cnt) => {
            (cid, (0, 0, cnt))
          }
        }
    
        //将三个数据源合并在一起、统一进行聚合计算
        val sourceRDD: RDD[(String, (Int, Int, Int))] = rdd.union(rdd1).union(rdd2)
        val analysisRDD = sourceRDD.reduceByKey {
          (t1, t2) => {
            (t1._1+t2._1,t1._2+ t2._2,t1._3+ t2._3)
          }
        }
    //        sourceRDD.collect().foreach(println)
            val tuples = analysisRDD.sortBy(_._2, false).take(10)
    
        tuples.foreach(println)
        //TODO 6、将结果采集到控制台打印出来
    
        sc.stop()
        //TODO 7、统计品类的点击数量:(品类ID,点击数量)
    
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130

    结果:
    (15,(6120,1672,1259))
    (2,(6119,1767,1196))
    (20,(6098,1776,1244))
    (12,(6095,1740,1218))
    (11,(6093,1781,1202))
    (17,(6079,1752,1231))
    (7,(6074,1796,1252))
    (9,(6045,1736,1230))
    (19,(6044,1722,1158))
    (13,(6036,1781,1161))

    Process finished with exit code 0

    实现方案三

    使用累加器的方式聚合数据

    package com.spack.bigdata.core.req
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.util.AccumulatorV2
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    
    /**
     * 使用累加器的方式聚合数据
     *
     */
    object Spark04_Req1_HotCategoryTop10Analysis {
      def main(args: Array[String]): Unit = {
        /**
         *
         * TODO 热门类品类
         */
    
        val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
        val sc = new SparkContext(operator)
    
    
        //TODO 1、读取原始日志数据
        val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
        val acc = new HotCategoryAccumulator
        sc.register(acc, "HotCategory")
    
    
        //将数据转换结构
        val flatMapRDD = actionRdd.foreach(
          action => {
            val datas = action.split("_")
            if (datas(6) != "-1") {
              //点击的场合
              acc.add(datas(6), "click")
            } else if (datas(8) != "null") {
              //下单的场合
              val ids = datas(8).split(",")
              ids.foreach(
                id => {
                  acc.add(id, "order")
                }
              )
    
            } else if (datas(10) != "null") {
              //支付的场合
              val ids = datas(10).split(",")
              ids.foreach(
                id => {
                  acc.add(id, "pay")
                }
              )
            }
          }
        )
    
    
        val accVal: mutable.Map[String, HotCategory] = acc.value
        val categories: mutable.Iterable[HotCategory] = accVal.map(_._2)
    
    
        val sort = categories.toList.sortWith(
          (left, right) => {
            if (left.clickCnt > right.clickCnt) {
              true
            } else if (left.clickCnt == right.clickCnt) {
              if (left.orderCnt > right.orderCnt) {
                true
              } else if (left.orderCnt == right.orderCnt) {
                left.payCnt > right.payCnt
              } else {
                false
              }
            } else {
              false
            }
          }
        )
    
    
        sort.take(10).foreach(println)
        //TODO 6、将结果采集到控制台打印出来
    
        sc.stop()
        //TODO 7、统计品类的点击数量:(品类ID,点击数量)
      }
    
    
      case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)
    
      /**
       * 自定义累加器
       * 1、继承AccumlatorV2,定义泛型
       * IN :(品类ID,行为类型)
       * OUT: mutable.Map[String,HotCategory]
       *
       * 2、重写方法(6)
       */
      class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {
    
    
        private val hcMap = mutable.Map[String, HotCategory]()
    
        //是不是当前的初始状态
        override def isZero: Boolean = {
          hcMap.isEmpty
        }
    
    
        override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {
          new HotCategoryAccumulator()
        }
    
        override def reset(): Unit = {
          hcMap.clear()
        }
    
        override def add(v: (String, String)): Unit = {
          val cid = v._1
          val actionType = v._2
          val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))
          if (actionType == "click") {
            category.clickCnt += 1
          } else if (actionType == "order") {
            category.orderCnt += 1
          } else if (actionType == "pay") {
            category.payCnt += 1
          }
    
          hcMap.update(cid, category)
        }
    
        override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
          val map1 = this.hcMap
          val map2 = other.value
    
          map2.foreach {
            case (cid, hc) => {
              val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))
              category.clickCnt += hc.clickCnt
              category.orderCnt += hc.orderCnt
              category.payCnt += hc.payCnt
              map1.update(cid, category)
            }
          }
    
        }
    
    
        //返回结果
        override def value: mutable.Map[String, HotCategory] = hcMap
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155

    二 、需求实现

    1、需求 2:Top10 热门品类中每个品类的 Top10 活跃 Session 统计

    需求说明:
    在需求一的基础上,增加每个品类用户 session 的点击统计

    package com.spack.bigdata.core.req
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.util.AccumulatorV2
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    
    /**
     * 第三种实现方式
     * 重新看- 没看懂
     *
     */
    object Spark05_Req1_HotCategoryTop10Analysis {
      def main(args: Array[String]): Unit = {
        /**
         *
         * TODO 热门类品类
         */
    
        val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
        val sc = new SparkContext(operator)
    
        val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
        actionRdd.cache()
    
        val top10Ids: Array[(String)] = top10Category(actionRdd)
    
        //1、过滤原始数据、保留点击和前10品类ID
        val filterActionRDD = actionRdd.filter(
          action => {
            val datas = action.split("_")
            //先满足这个条件
            if (datas(6) != "-1") {
              //判断datas(6) 在不在 top10Ids 里面
              top10Ids.contains(datas(6))
            } else {
              false
            }
          }
        )
    
        //根据品类ID和sessionID进行点击量的统计
        val reduceRDD = filterActionRDD.map(
          action => {
            val datas = action.split("_")
            ((datas(6), datas(2)), 1)
          }
        ).reduceByKey(_ + _)
    
    
        //3、将统计数据的结果进行结构的转换
        //((品类ID,sessionID),sum)=>(品类ID,(SessionID,sum))
        val mapRDD = reduceRDD.map {
          case ((cid, sid), sum) => {
            (cid, (sid, sum))
          }
        }
    
        //相同的品类进行分组
        val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()
    
        //5、将分组后的数据进行点击量的排序、取前10名
        val resultRDD = groupRDD.mapValues(
          iter => {
            iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
          }
        )
    
    
        resultRDD.collect().foreach(println)
        sc.stop()
      }
    
    
      def top10Category(actionRdd: RDD[String]): Array[(String)] = {
        val flatRDD: RDD[(String, (Int, Int, Int))] = actionRdd.flatMap(
          action => {
            val datas = action.split("_")
            if (datas(6) != "-1") {
              //点击的场合
              List((datas(6), (1, 0, 0)))
    
            } else if (datas(8) != "null") {
              //下单的场合
              val ids = datas(8).split(",")
              ids.map(id => (id, (0, 1, 0)))
    
            } else if (datas(10) != "null") {
              //支付的场合
              val ids = datas(10).split(",")
              ids.map(id => (id, (0, 0, 1)))
    
            } else {
              Nil
            }
          }
        )
        val analysisRDD: RDD[(String, (Int, Int, Int))] = flatRDD.reduceByKey(
          (t1, t2) => {
            (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
          }
        )
        analysisRDD.sortBy(_._2, false).take(10).map(_._1)
      }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109

    2、页面单跳转换率统计

    需求说明
    1)页面单跳转化率
    计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中
    访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,
    那么单跳转化率就是要统计页面点击的概率。
    比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV) 为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,
    那么 B/A 就是 3-5 的页面单跳转化率。

    在这里插入图片描述

    2)统计页面单跳转化率意义
    产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的
    表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。
    数据分析师,可以此数据做更深一步的计算和分析。
    企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当
    调整公司的经营战略或策略。

    package com.spack.bigdata.core.req
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    /**
     * 第三种实现方式
     * 重新看- 没看懂
     *
     */
    object Spark06_Req3_HotCategoryTop10Analysis {
      def main(args: Array[String]): Unit = {
        /**
         *
         * TODO 热门类品类
         */
    
        val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
        val sc = new SparkContext(operator)
    
        val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
    
        val actionDataRdd: RDD[UserVisitAction] = actionRdd.map(
          action => {
            val datas = action.split("_")
            UserVisitAction(
              datas(0),
              datas(1).toLong,
              datas(2),
              datas(3).toLong,
              datas(4),
              datas(5),
              datas(6).toLong,
              datas(7).toLong,
              datas(8),
              datas(9),
              datas(10),
              datas(11),
              datas(12).toLong,
            )
          }
        )
    
    
        //TODO 对指定的页面连续跳转进行统计
        //1-2,2-3,3-4,4-5,5-6,6-7
        val ids = List[Long](1, 2, 3, 4, 5, 6, 7)
        val okflowIds: List[(Long, Long)] = ids.zip(ids.tail)
    
    
        //TODO 计算分母 --求首页
        val pageidToCountMap: Map[Long, Long] = actionDataRdd.filter(
          //先过滤提高  ---init 不包含最后一个
          action => {
            ids.init.contains(action.page_id)
          }
        ).map(
          action => {
            (action.page_id, 1L)
          }
        ).reduceByKey(_ + _).collect().toMap
    
        actionDataRdd.cache()
        //TODO 计算分子
        val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRdd.groupBy(_.session_id)
    
        //分组后、根据访问时间进行排序(升序)
        val mvRdd: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
          iter => {
            val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time) //默认升序
    
            //[1,2,3,4]
            //[1,2] ,[2,3],[3,4]
            //[1-2,2-3,3-4]
    
            //Sliding:滑窗
            //【1,2,3,4】 flowIds就是【1,2,3,4】
            //【2,3,4】 就是 List[Long] 尾部信息
            //        zip 拉链
            val flowIds: List[Long] = sortList.map(_.page_id)
    
            val pageFlowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)
    
            //将不合法的页面跳转进行过滤
            pageFlowIds.filter(
              t => {
                okflowIds.contains(t)
              }
            ).map(
              t => {
                (t, 1)
              }
            )
    
          }
        )
    
        //((1,2),1) 拆开
        val flatRDD: RDD[((Long, Long), Int)] = mvRdd.map(_._2).flatMap(list => list)
        //((1,2),1)=>((1,2),SUM)
        val dataRDD = flatRDD.reduceByKey(_ + _)
    
        //TODO 计算单挑转换率
        //分子除以分母
        dataRDD.foreach {
          case ((pageid1, pageid2), sum) => {
            val lon: Long = pageidToCountMap.getOrElse(pageid1, 0L)
            println(s"页面${pageid1}挑转到页面${pageid2}单挑转化率为" + (sum.toDouble / lon))
          }
        }
    
        sc.stop()
      }
    
      //用户访问动作表
      case class UserVisitAction(
                                  date: String, //用户点击行为的日期
                                  user_id: Long, //用户的 ID
                                  session_id: String, //Session 的 ID
                                  page_id: Long, //某个页面的 ID
                                  action_time: String, //动作的时间点
                                  search_keyword: String, //用户搜索的关键词
                                  click_category_id: Long, //某一个商品品类的 ID
                                  click_product_id: Long, //某一个商品的 ID
                                  order_category_ids: String, //一次订单中所有品类的 ID 集合
                                  order_product_ids: String, //一次订单中所有商品的 ID 集合
                                  pay_category_ids: String, //一次支付中所有品类的 ID 集合
                                  pay_product_ids: String, //一次支付中所有商品的 ID 集合
                                  city_id: Long
                                ) //城市 id
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
  • 相关阅读:
    Spring
    Android12.0 app调用hal层接口功能实现系列三(frameworks层实现)
    java计算机毕业设计在线毕设选题系统源码+系统+mysql数据库+lw文档
    SpringBoot如何保证接口安全?
    华为OD 身高体重排序(100分)【java】A卷+B卷
    怎么将视频转化为gif?
    611. 有效三角形的个数 - 力扣
    【数据结构 二叉树 递归与非递归遍历】
    【大模型应用极简开发入门(2)】GPT模型简史:从GPT-1到GPT-4:从小数据量的微调到大数据量的强化学习不断优化模型
    About 8.21 This Week
  • 原文地址:https://blog.csdn.net/qq_42082701/article/details/125478165