• spark案例分析-搜索引擎日志分析案例


    1.业务分析

    2.数据截图

    3.代码实现

            main.py:

    1. #cording:utf8
    2. from pyspark import SparkConf, SparkContext
    3. from pyspark.storagelevel import StorageLevel
    4. from defs import content_jieba, filter_word, append_word, extract_user_and_word
    5. from operator import add
    6. if __name__ == '__main__':
    7. conf = SparkConf().setAppName('test')
    8. sc = SparkContext(conf=conf)
    9. # 读取数据
    10. file_rdd = sc.textFile('hdfs://pyspark01:8020/input/SogouQ.txt')
    11. # 对数据进行划分
    12. split_rdd = file_rdd.map(lambda x: x.split('\t'))
    13. # 因为要做多个需求,split_RDD 作为基础的RDD会被多次使用
    14. split_rdd.persist(StorageLevel.DISK_ONLY)
    15. # TODO: 需求1:用户搜素的关键‘词’分析
    16. # 主要分析热点词
    17. # 将所有的搜索内容取出
    18. # print(split_rdd.takeSample(True, 3))
    19. context_rdd = split_rdd.map(lambda x: x[2])
    20. # 对搜索的内容进行分词分析
    21. word_rdd = context_rdd.flatMap(content_jieba)
    22. # print(word_rdd.collect())
    23. # 对数据进行过滤,补全
    24. # 博学 谷 -> 博学谷
    25. # 传智播 客 -> 传智播客
    26. # 院校 帮 -> 院校帮
    27. filtered_rdd = word_rdd.filter(filter_word)
    28. # 数据补全
    29. append_rdd = filtered_rdd.map(append_word)
    30. # 对单词进行分组 聚合 排序 求出前五名
    31. result_1 = append_rdd.reduceByKey(lambda a, b: a + b).\
    32. sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(5)
    33. print('需求1结果:', result_1)
    34. # TODO:需求2:用户和关键词组合分析
    35. # 1,我喜欢传智播客
    36. # 1+我 1+喜欢 1+传智播客
    37. user_content_rdd = split_rdd.map(lambda x: (x[1],x[2]))
    38. # 对用户的搜索内容进行分词,分词和用户的ID再次组合
    39. user_word_rdd = user_content_rdd.flatMap(extract_user_and_word)
    40. # 对单词进行分组 聚合 排序 求出前五名
    41. result_2 = user_word_rdd.reduceByKey(lambda a, b: a + b).\
    42. sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(5)
    43. print("需求2的结果:", result_2)
    44. # TODO:需求3:热门搜索时间段分析
    45. # 取出来所有时间
    46. time_rdd = split_rdd.map(lambda x: x[0])
    47. # 对事件进行处理,只保留小时精度即可
    48. hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0],1))
    49. # 分组 聚合 排序
    50. result_3 = hour_with_one_rdd.reduceByKey(add).\
    51. sortBy(lambda x: x[1],ascending=False,numPartitions=1).collect()
    52. print("需求3的结果:", result_3)

            defs.py:

    1. import jieba
    2. def content_jieba(data):
    3. '''使用jieba分词库 对数据分词处理'''
    4. seg = jieba.cut_for_search(data)
    5. l = list()
    6. for word in seg:
    7. l.append(word)
    8. return l
    9. def filter_word(data):
    10. return data not in ['谷', "帮", '客']
    11. def append_word(data):
    12. if data == '院校' : data = '院校帮'
    13. if data == '博学' : data = '博学谷'
    14. if data == '传智播' : data = '传智播客'
    15. return (data, 1)
    16. def extract_user_and_word(data):
    17. '''传入数据 例(1,我喜欢传智播客)'''
    18. user_id = data[0]
    19. content = data[1]
    20. # 对content进行访问
    21. words = content_jieba(content)
    22. return_list = list()
    23. for word in words:
    24. # 过滤 谷、帮、客
    25. if filter_word(word):
    26. return_list.append((user_id + '_' + append_word(word)[0],1))
    27. return return_list

  • 相关阅读:
    【二、http】go的http基本请求设置(设置查询参数、定制请求头)get和post类似
    【luogu P6779】rla1rmdq(分块)(树链剖分)
    Python数据分析实战-表连接-merge四种连接方式用法(附源码和实现效果)
    第二证券:科创板创业板涨跌幅限制?
    【经验分享】PS如何设置像素
    二维数组的最小路径和问题
    各大电商平台关于预制菜品种酸菜鱼销售量
    AI大模型引领未来智慧科研暨ChatGPT自然科学高级应用
    Linux编程 文件操作 close read write
    Oracle 11g+PLSQL Developer安装及环境配置
  • 原文地址:https://blog.csdn.net/2202_75347029/article/details/133933262