如何定义热门商品?
简单模型:直接通过用户对商品的点击量来衡量商品热度。
复杂模型:依据各类别权重(后续补充)
如何获取区域?
通过用户点击日志,获取访问IP,进而获取区域信息。
通过数据库中的订单关联用户表,获取用户的地域信息
如何去除爬虫水军(商家为了提高自己的排名,用爬虫来频繁访问自己的店铺)?
一段时间分析用户IP的访问次数(后续补充)
数据采集(ETL)电商API接口接入
❝「因为要访问数据库,所以会对数据库造成很大的压力,而且在真实的生产环境中,一般没有权限直接访问数据库。可以把数据导出成csv文件,放到日志服务器上,再通过Flume采集到HDFS上。假如有权限访问数据库,数据库也需要设置成读写分离的模式,来缓解压力。」
❞
电商日志一般存储在日志服务器,通过 Flume 拉取到 HDFS 上,本文通过编写python程序模拟日志数据。
业务数据通过 Sqoop 从关系型数据库mysql中读取数据,然后导入到HDFS。
数据清洗
使用 MapReduce 进行数据清洗。
使用 Spark Core 进行数据清洗。
各区域热门商品计算
使用 Hive 进行数据的分析和处理。
使用 Spark SQL 进行数据的分析和处理
product(商品)表:
| 列名 | 描述 | 数据类型 | 空/非空约束 |
|---|---|---|---|
| product_id | 商品号 | varchar(18) | Not null |
| product_name | 商品名称 | varchar(20) | Not null |
| marque | 商品型号 | varchar(10) | Not null |
| barcode | 仓库条码 | varchar | Not null |
| price | 商品价格 | double | Not null |
| brand_id | 商品品牌 | varchar(8) | Not null |
| market_price | 市场价格 | double | Not null |
| stock | 库存 | int | Not null |
| status | 状态 | int | Not null |
❝补充说明: status: 下架-1,上架0,预售1
❞
area_info(地区信息)表
| 列名 | 描述 | 数据类型 | 空/非空约束 |
|---|---|---|---|
| area_id | 地区编号 | varchar(18) | Not null |
| area_name | 地区名称 | varchar(20) | Not null |
user_click_log(用户点击信息)表
| 列名 | 描述 | 数据类型 | 空/非空约束 |
|---|---|---|---|
| user_id | 用户ID | varchar(18) | Not null |
| user_ip | 用户IP | varchar(20) | Not null |
| url | 用户点击 URL | varchar(200) | |
| click_time | 用户点击时间 | varchar(40) | |
| action_type | 动作名称 | varchar(40) | |
| area_id | 地区ID | varchar(40) |
❝补充说明: action_type: 1 收藏,2 加购物车,3 购买 area_id:已经通过IP地址,解析出了区域信息
❞
area_hot_product(区域热门商品)表
| 列名 | 描述 | 数据类型 | 空/非空约束 |
|---|---|---|---|
| area_id | 地区ID | varchar(18) | Not null |
| area_name | 地区名称 | varchar(20) | Not null |
| product_id | 商品ID | varchar(200) | |
| product_name | 商品名称 | varchar(40) | |
| pv | 访问量 | BIGINT |
使用Flume采集用户点击日志
Flume配置文件(flume-areahot.conf)
2.利用python编写程序模拟日志信息,jian放入/log0208文件夹下,自定义添加不符合字段数据,要经过mr或spark进行数据清洗。
启动 Flume agent,在 Flume 的根目录下执行命令:bin/flume-ng agent -n a4 -f flume-areahot.conf -c conf -Dflume.root.logger=INFO,console
再执行python dslog.py向 /log0208 目录里放入用户日志文件(实现方法:此处
Flume 会将 /log0208 目录下的文件采集到 hdfs://master:9000/flume/ 当天日期 目录下。
「运行dslog.py程序如下:」
- #coding=utf-8
- import random
- import time
- iplist=[26,23,47,56,108,10,33,48,66,77,101,45,61,52,88,89,108,191,65,177,98,21,34,61,19,11,112,114]
-
- url = "http://mystore.jsp/?productid={query}"
- x=[1,2,3,4]
-
- def use_id():
- return random.randint(1,20)
- def get_ip():
- return '.'.join(str(x) for x in random.sample(iplist,4))
-
- def urllist():
- def sample_references():
- if random.uniform(0,1)>0.8:
- return ""
-
- query_str=random.sample(x,1)
- return url.format(query=query_str[0])
-
- def get_time():
- return time.strftime('%Y%m%d%H%M%S',time.localtime())
-
- # action: 1 收藏,2 加购物车,3 购买 area_id代表不同区域
- def action():
- return random.randint(1,4)
-
- def area_id():
- return random.randint(1,21)
-
-
- def get_log(count):
- while count>0:
- log='{},{},{},{},{},{}\n'.format(use_id(),get_ip(),urllist(),get_time(),action(),area_id())
- # with open('/usr/local/src/tmp/1.log','a+')as file:
- with open('/log0208/click.log','a+')as file:
- file.write(log)
- # print(log)
- # time.sleep(1)
- count=count-1
- if __name__ == '__main__':
- get_log(10000)
点击并拖拽以移动
生成日志结果截取:
- 5,10.26.56.45,http://mystore.jsp/?productid=1,20210222005139,1,19
- 2,10.101.98.47,http://mystore.jsp/?productid=1,20210222005139,3,8
- 17,191.88.66.108,http://mystore.jsp/?productid=3,20210222005139,2,14
- 4,89.21.33.108,,20210222005139,2,10
- 4,108.23.48.114,http://mystore.jsp/?productid=4,20210222005139,1,21
- 8,21.48.19.65,,20210222005139,1,3
- 16,61.21.89.11,http://mystore.jsp/?productid=2,20210222005139,3,11
- 6,56.47.112.88,,20210222005139,1,3
点击并拖拽以移动
flume-areahot.conf配置文件如下:
- #bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
- #定义agent名, source、channel、sink的名称
- a4.sources = r1
- a4.channels = c1
- a4.sinks = k1
-
- #具体定义source
- a4.sources.r1.type = spooldir
- a4.sources.r1.spoolDir = /log0208
-
- #具体定义channel
- a4.channels.c1.type = memory
- a4.channels.c1.capacity = 10000
- a4.channels.c1.transactionCapacity = 100
-
- #定义拦截器,为消息添加时间戳
- a4.sources.r1.interceptors = i1
- a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
-
- #具体定义sink
- a4.sinks.k1.type = hdfs
- a4.sinks.k1.hdfs.path = hdfs://master:9000/flume/%Y%m%d
- a4.sinks.k1.hdfs.filePrefix = events-
- a4.sinks.k1.hdfs.fileType = DataStream
-
- #不按照条数生成文件
- a4.sinks.k1.hdfs.rollCount = 0
- #HDFS上的文件达到128M时生成一个文件
- a4.sinks.k1.hdfs.rollSize = 134217728
- #HDFS上的文件达到60秒生成一个文件
- a4.sinks.k1.hdfs.rollInterval = 60
-
- #组装source、channel、sink
- a4.sources.r1.channels = c1
- a4.sinks.k1.channel = c1
点击并拖拽以移动
3.数据清洗
需要将用户点击日志里面对于商品的点击识别出来
过滤不满足6个字段的数据
过滤URL为空的数据,即:过滤出包含http开头的日志记录