• 云计算 - 3 - 使用MapReduce处理数据



    目标

    1.使用 MapReduce 实现对多个文本文件单词总数的统计(WordCount)。
    2.使用 MapReduce 实现社交网站好友的推荐。

    1、使用Mapreduce实现对多个文本文件单词总数的统计(WordCount)。

    1.1 启动Hadoop

    使用start-dfs.sh指令启动 hadoop。

    1.2 在 HDFS 文件系统 创建文件夹来作为单词统计的输入

    在这里插入图片描述

    1.3 将用来统计的文件上传到刚建立的文件夹中。

    这里选取 hadoop 本身的两个txt文件。
    在这里插入图片描述
    使用 hadoop fs -put 文件名 -input 将两个文件都上传。
    在这里插入图片描述

    1.4 使用 hadoop jar 命令,调用 jar 包,对 /input 文件夹进行单词统计

    hadoop jar /home/lucky/hadoop/hadoop-2.6.0/share/hadoop/mapreduce/hadoop-mapreduce -examples-2.6.0.jar  wordcount  /input  /output
    
    • 1

    在这里插入图片描述

    1.5 查看输出结果,实现了对多文件的字词统计

    查看指令如图中所示。
    运行结果有多页,这里展示一页:
    在这里插入图片描述

    2、使用 MapReduce 实现社交网站好友的推荐。

    2.1 问题分析

    好友推荐功能简单的说是这样一个需求:
    预测某两个人是否认识,并推荐为好友,并且某两个非好友的用户,他们的共同好友越多,那么他们越可能认识。
    在这里插入图片描述
    以 QQ 好友举例,顶点 A、B、C 到 G 分别是 QQ 用户,两顶点之间的边表示两顶点代表的用户之间相互关注。比如,B、G 有共同好友 A,应该推荐 B、G 认识,而 D、F 有两个共同好友 C、E,那么更加应该推荐 D、F 认识。

    因此也可得到输入如下:
    在这里插入图片描述

    2.2 编写推荐代码

    使用命令vi FriendRecommenda.scala开始编写好友推荐程序代码如下:

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    object FriendRecommendation {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("FriendRecommendation").setMaster("local")
        val sc = new SparkContext(sparkConf)
        val input = "hdfs://NameNode-1:9000//LiveJournal.txt"
        val output = "hdfs://NameNode-1:9000//friendsRecommend"
        val records = sc.textFile(input)
        val pairs = records.flatMap(line => {
          val tokens = line.split(" ") 
          val person = tokens(0).toLong
          val friends = tokens(1).split(",").map(_.toLong).toList
          val mapperOutput = friends.map(directFriend => (person, (directFriend, -1.toLong)))
          val result = for {
            fi <- friends
            fj <- friends
            possibleFriend1 = (fj, person)
            possibleFriend2 = (fi, person)
            if (fi != fj)
          } yield {
            (fi, possibleFriend1) :: (fj, possibleFriend2) :: List()
          }
          mapperOutput ::: result.flatten
        })
    
        val grouped = pairs.groupByKey()
    
        val result = grouped.mapValues(values => {
          val mutualFriends = new collection.mutable.HashMap[Long, List[Long]].empty
          values.foreach(t2 => {
            val toUser = t2._1
            val mutualFriend = t2._2
            val alreadyFriend = (mutualFriend == -1)
            if (mutualFriends.contains(toUser)) {
              if (alreadyFriend) {
                mutualFriends.put(toUser, List.empty)
              } else if (mutualFriends.get(toUser).isDefined && mutualFriends.get(toUser).get.size > 0 && !mutualFriends.get(toUser).get.contains(mutualFriend)) {
                val existingList = mutualFriends.get(toUser).get
                mutualFriends.put(toUser, (mutualFriend :: existingList))
              }
            } else {
              if (alreadyFriend) {
                mutualFriends.put(toUser, List.empty)
              } else {
                mutualFriends.put(toUser, List(mutualFriend))
              }
            }
          })
          mutualFriends.filter(!_._2.isEmpty).toMap
        })
    
        result.saveAsTextFile(output)
    
          result.foreach(f => {
          val friends = if (f._2.isEmpty) "" else {
             val items = f._2.map(tuple => (tuple._1,  ":" + tuple._2.size)).toSeq.sortBy(_._2).reverse.map(g => "" + g._1 + " " + g._2)
             items.toList.mkString(", ")
          }
    
           println(s"${f._1}: ${friends}")
        })
    
        // done
        sc.stop();
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    在这里插入图片描述

    2.3 程序运行

    2.3.1 首先将上面写的 .scala 文件编译
    scalac -cp /home/spark/spark-1.6/lib/spark-assembly-1.6.1-hadoop2.6.0. jar ../FriendRecommendation.scala
    
    • 1

    在这里插入图片描述

    2.3.2 打包 jar 包
    echo Main-class: FriendRecommendation > manifest.txt
    jar cvfm FriendRecommendation.jar manifest.txt *
    
    • 1
    • 2

    在这里插入图片描述

    2.3.3 将 jar 包提交运行
    /home/spark/spark-1.6/bin/spark-submit --class "FriendRecommendation" --master local[4] FriendRecommendation.jar
    
    • 1

    在这里插入图片描述
    得到好友推荐结果如下:
    在这里插入图片描述

    遇到的问题

    实验中存在的问题在于,当我使用一个较短的文本进行好友推荐时,可以正常输出结果,而当我使用一个 LiveJournal.txt 文件作为输入时,
    在这里插入图片描述

    出现如下所示的错误:
    在这里插入图片描述

    即文本太长导致了数组越界的问题。

  • 相关阅读:
    java CPU 或者内存 异常排查
    PreScan快速入门到精通第二十六讲PreScan中常用传感器之Beacon和OBU
    机器学习-学习率:从理论到实战,探索学习率的调整策略
    hi3861A上手记录
    RabbitMQ学习笔记之交换机
    【Hello Algorithm】暴力递归到动态规划(三)
    Linux指令(ls、pwd、cd、touch、mkdir、rm)
    luffy配置相关
    为初学者介绍轻量级目录访问协议——LDAP
    es6-promise对象详解
  • 原文地址:https://blog.csdn.net/qq_45438600/article/details/127826070