Hadoop+Hive+Spark+HBase 开发环境。
本数据集包含了2017-09-11至2017-12-03之间有行为的约5458位随机用户的所有行为(行为包括点击、购买、加购、喜欢)。数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下具体字段说明如下:
列名称 | 列中文名称 | 说明 |
user_id | 用户 ID | 整数类型,序列化后的用户 ID |
item_id | 商品 ID | 整数类型,序列化后的商品 ID |
category_id | 商品类目 ID | 整数类型,序列化后的商品所属类目 ID |
behavior_type | 行为类型 | 字符串,枚举类型,包括 ('pv', 'buy', 'cart', 'fav') |
time | 时间戳 | 行为发生的时间戳 |
用户行为类型共有四种,它们分别是:
行为类型 | 说明 |
pv | 商品详情页 pv,等价于点击 |
buy | 商品购买 |
cart | 将商品加入购物车 |
fav | 收藏商品 |
(1)在 HDFS 中创建目录/data/userbehavior,并将 UserBehavior.csv 文件传到该目录。
- [root@kb135 examdata]# hdfs dfs -mkdir -p /data/userbehavior
- [root@kb135 examdata]# hdfs dfs -put ./UserBehavior.csv /data/userbehavior
(2)通过 HDFS 命令查询出文档有多少行数据。
[root@kb135 examdata]# hdfs dfs -cat /data/userbehavior/UserBehavior.csv | wc -l
(1)在 Hive 中创建数据库 exam
- hive (default)> create database exam;
- hive (default)> use exam;
(2)在 exam 数据库中创建外部表 userbehavior,并将 HDFS 数据映射到表中
- create external table userbehavior(
- user_id int,
- item_id int,
- category_id int,
- behavior_type string,
- `time` bigint
- )
- row format delimited fields terminated by ","
- stored as textfile location '/data/userbehavior';
(3)在 HBase 中创建命名空间 exam,并在命名空间 exam 创建 userbehavior 表,包
含一个列簇 info
- hbase(main):002:0> create_namespace 'exam202010'
- hbase(main):003:0> create 'exam202010:userbehavior','info'
(4)在 Hive 中创建外部表 userbehavior_hbase,并映射到 HBase 中,并将数
据加载到 HBase 中
- create external table userbehavior_hbase(
- user_id int,
- item_id int,
- category_id int,
- behavior_type string,
- `time` bigint
- )
- stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with
- serdeproperties("hbase.columns.mapping"=":key,info:item_id,info:category_id,info:behavior_type,info:time")
- tblproperties ("hbase.table.name"="exam202010:userbehavior");
insert into userbehavior_hbase select * from userbehavior;
(5)在 exam 数据库中创建内部分区表 userbehavior_partitioned(按照日期进行分区),
并通过查询 userbehavior 表将时间戳格式化为”年-月-日时:分:秒”格式,将数据插入至 userbehavior_partitioned 表中,例如下图:
- --创建分区表
- create table userbehavior_partitioned(
- user_id int,
- item_id int,
- category_id int,
- behavior_type string,
- `time` string
- ) partitioned by (dt string)
- stored as orc;
- set hive.exec.dynamic.partition=true;
- set hive.exec.dynamic.partition.mode=nonstrict;
- insert into table userbehavior_partition partition (dt)
- select user_id,item_id,category_id,behavior_type,
- from_unixtime(`time`) as `time`,
- from_unixtime(`time`,'yyyy-MM-dd') dt
- from userbehavior;
使用 Spark,加载 HDFS 文件系统 UserBehavior.csv 文件,并分别使用 RDD 完成以下分析。
加载文件:
scala> val fileRdd = sc.textFile("hdfs://kb135:9000/data/userbehavior")
(1)统计 uv 值(一共有多少用户访问淘宝)
- scala> fileRdd.map(x=>x.split(","))
- .filter(_.length==5)
- .map(x=>x(0))
- .distinct().count
- res1: Long = 5458
-
- scala> fileRdd.map(x=>x.split(","))
- .filter(_.length==5)
- .groupBy(x=>x(0)).count
- res2: Long = 5458
(2)分别统计浏览行为为点击,收藏,加入购物车,购买的总数量
- scala> fileRdd.map(x=>x.split(","))
- .filter(_.length==5)
- .map(x=>(x(3),1))
- .reduceByKey(_+_)
- .collect.foreach(println)
- (cart,30888)
- (buy,11508)
- (pv,503881)
- (fav,15017)
-
- scala> fileRdd.map(x=>x.split(","))
- .filter(_.length==5)
- .map(x=>(x(3),1))
- .groupByKey()
- .map(x=>(x._1,x._2.toList.size))
- .collect.foreach(println)
- (cart,30888)
- (buy,11508)
- (pv,503881)
- (fav,15017)
(1)使用 SparkSQL 统计用户最近购买时间。以 2017-12-03 为当前日期,计算时间范围为一个月,计算用户最近购买时间,时间的区间为 0-30 天,将其分为 5 档,0-6 天,7-12 天,13-18 天,19-24 天,25-30 天分别对应评分 4 到 0
- with
- tb as
- (select user_id,
- datediff('2017-12-03',max(dt)) as diff,
- max(dt)
- from userbehavior_partition
- where dt>'2017-11-03' and behavior_type='buy'
- group by user_id),
- tb2 as
- (select user_id,
- (case when diff between 0 and 6 then 4
- when diff between 7 and 12 then 3
- when diff between 13 and 18 then 2
- when diff between 19 and 24 then 1
- when diff between 25 and 30 then 0
- else null end ) tag
- from tb)
- select * from tb2 where tag=3;
(2)使用 SparkSQL 统计用户的消费频率。以 2017-12-03 为当前日期,计算时间范围为一个月,计算用户的消费次数,用户中消费次数从低到高为 1-161 次,将其分为 5 档,1-32,33-64,65-96,97-128,129-161 分别对应评分 0 到 4
- with
- tb as
- (select user_id,
- count(user_id) as num from userbehavior_partition
- where dt between '2017-11-03' and '2017-12-03' and behavior_type='buy'
- group by user_id)
- select
- user_id,
- (case when num between 129 and 161 then 4
- when num between 97 and 128 then 3
- when num between 65 and 96 then 2
- when num between 33 and 64 then 1
- when num between 1 and 32 then 0
- else null end) tag
- from tb;