1.业务分析

2.数据截图

3.代码实现:
main.py:
- #cording:utf8
- from pyspark import SparkConf, SparkContext
- from pyspark.storagelevel import StorageLevel
- from defs import content_jieba, filter_word, append_word, extract_user_and_word
- from operator import add
- if __name__ == '__main__':
- conf = SparkConf().setAppName('test')
- sc = SparkContext(conf=conf)
-
- # 读取数据
- file_rdd = sc.textFile('hdfs://pyspark01:8020/input/SogouQ.txt')
- # 对数据进行划分
- split_rdd = file_rdd.map(lambda x: x.split('\t'))
- # 因为要做多个需求,split_RDD 作为基础的RDD会被多次使用
- split_rdd.persist(StorageLevel.DISK_ONLY)
-
- # TODO: 需求1:用户搜素的关键‘词’分析
- # 主要分析热点词
- # 将所有的搜索内容取出
- # print(split_rdd.takeSample(True, 3))
- context_rdd = split_rdd.map(lambda x: x[2])
-
- # 对搜索的内容进行分词分析
- word_rdd = context_rdd.flatMap(content_jieba)
- # print(word_rdd.collect())
-
- # 对数据进行过滤,补全
- # 博学 谷 -> 博学谷
- # 传智播 客 -> 传智播客
- # 院校 帮 -> 院校帮
- filtered_rdd = word_rdd.filter(filter_word)
- # 数据补全
- append_rdd = filtered_rdd.map(append_word)
-
- # 对单词进行分组 聚合 排序 求出前五名
- result_1 = append_rdd.reduceByKey(lambda a, b: a + b).\
- sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(5)
- print('需求1结果:', result_1)
-
- # TODO:需求2:用户和关键词组合分析
- # 1,我喜欢传智播客
- # 1+我 1+喜欢 1+传智播客
- user_content_rdd = split_rdd.map(lambda x: (x[1],x[2]))
- # 对用户的搜索内容进行分词,分词和用户的ID再次组合
- user_word_rdd = user_content_rdd.flatMap(extract_user_and_word)
- # 对单词进行分组 聚合 排序 求出前五名
- result_2 = user_word_rdd.reduceByKey(lambda a, b: a + b).\
- sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(5)
- print("需求2的结果:", result_2)
-
- # TODO:需求3:热门搜索时间段分析
- # 取出来所有时间
- time_rdd = split_rdd.map(lambda x: x[0])
- # 对事件进行处理,只保留小时精度即可
- hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0],1))
- # 分组 聚合 排序
- result_3 = hour_with_one_rdd.reduceByKey(add).\
- sortBy(lambda x: x[1],ascending=False,numPartitions=1).collect()
- print("需求3的结果:", result_3)
defs.py:
- import jieba
- def content_jieba(data):
- '''使用jieba分词库 对数据分词处理'''
- seg = jieba.cut_for_search(data)
- l = list()
- for word in seg:
- l.append(word)
- return l
-
- def filter_word(data):
- return data not in ['谷', "帮", '客']
-
- def append_word(data):
- if data == '院校' : data = '院校帮'
- if data == '博学' : data = '博学谷'
- if data == '传智播' : data = '传智播客'
- return (data, 1)
-
- def extract_user_and_word(data):
- '''传入数据 例(1,我喜欢传智播客)'''
- user_id = data[0]
- content = data[1]
- # 对content进行访问
- words = content_jieba(content)
- return_list = list()
- for word in words:
- # 过滤 谷、帮、客
- if filter_word(word):
- return_list.append((user_id + '_' + append_word(word)[0],1))
- return return_list
-