• 【智能大数据分析】实验1 MapReduce实验:单词计数


    【智能大数据分析】实验1 MapReduce实验:单词计数

    在我之前的一篇博客中:云计算中的大数据处理:尝试HDFS和MapReduce的应用有过类似的操作,具体不会的可以去这篇博客中看看。

    一、实验目的

    基于MapReduce思想,编写WordCount程序。

    二、实验要求

    1.理解MapReduce编程思想;

    2.会编写MapReduce版本WordCount;

    3.会执行该程序;

    4.自行分析执行过程。

    三、实验原理

    MapReduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE)。这样做的好处是可以在任务被分解后,可以通过大量机器进行并行计算,减少整个操作的时间。

    适用范围:数据量大,但是数据种类小可以放入内存。

    基本原理及要点:将数据交给不同的机器去处理,数据划分,结果归约。

    理解MapReduce和Yarn:在新版Hadoop中,Yarn作为一个资源管理调度框架,是Hadoop下MapReduce程序运行的生存环境。其实MapRuduce除了可以运行Yarn框架下,也可以运行在诸如Mesos,Corona之类的调度框架上,使用不同的调度框架,需要针对Hadoop做不同的适配。

    一个完成的MapReduce程序在Yarn中执行过程如下:

    (1)ResourcManager JobClient向ResourcManager提交一个job。

    (2)ResourcManager向Scheduler请求一个供MRAppMaster运行的container,然后启动它。

    (3)MRAppMaster启动起来后向ResourcManager注册。

    (4)ResourcManagerJobClient向ResourcManager获取到MRAppMaster相关的信息,然后直接与MRAppMaster进行通信。

    (5)MRAppMaster算splits并为所有的map构造资源请求。

    (6)MRAppMaster做一些必要的MR OutputCommitter的准备工作。

    (7)MRAppMaster向RM(Scheduler)发起资源请求,得到一组供map/reduce task运行的container,然后与NodeManager一起对每一个container执行一些必要的任务,包括资源本地化等。

    (8)MRAppMaster 监视运行着的task 直到完成,当task失败时,申请新的container运行失败的task。

    (9)当每个map/reduce task完成后,MRAppMaster运行MR OutputCommitter的cleanup 代码,也就是进行一些收尾工作。

    (10)当所有的map/reduce完成后,MRAppMaster运行OutputCommitter的必要的job commit或者abort APIs。

    (11)MRAppMaster退出。

    1 MapReduce编程

    编写在Hadoop中依赖Yarn框架执行的MapReduce程序,并不需要自己开发MRAppMaster和YARNRunner,因为Hadoop已经默认提供通用的YARNRunner和MRAppMaster程序, 大部分情况下只需要编写相应的Map处理和Reduce处理过程的业务程序即可。

    编写一个MapReduce程序并不复杂,关键点在于掌握分布式的编程思想和方法,主要将计算过程分为以下五个步骤:

    (1)迭代。遍历输入数据,并将之解析成key/value对。

    (2)将输入key/value对映射(map)成另外一些key/value对。

    (3)依据key对中间数据进行分组(grouping)。

    (4)以组为单位对数据进行归约(reduce)。

    (5)迭代。将最终产生的key/value对保存到输出文件中。

    2 Java API解析

    (1)InputFormat:用于描述输入数据的格式,常用的为TextInputFormat提供如下两个功能:

    数据切分: 按照某个策略将输入数据切分成若干个split,以便确定Map Task个数以及对应的split。

    为Mapper提供数据:给定某个split,能将其解析成一个个key/value对。

    (2)OutputFormat:用于描述输出数据的格式,它能够将用户提供的key/value对写入特定格式的文件中。

    (3)Mapper/Reducer: Mapper/Reducer中封装了应用程序的数据处理逻辑。

    (4)Writable:Hadoop自定义的序列化接口。实现该类的接口可以用作MapReduce过程中的value数据使用。

    (5)WritableComparable:在Writable基础上继承了Comparable接口,实现该类的接口可以用作MapReduce过程中的key数据使用。(因为key包含了比较排序的操作)。

    四、实验步骤

    本实验主要分为,确认前期准备,编写MapReduce程序,打包提交代码。查看运行结果这几个步骤,详细如下:

    1 启动Hadoop

    在这里插入图片描述

    2 验证HDFS上没有wordcount的文件夹

    在这里插入图片描述

    此时HDFS上应该是没有wordcount文件夹。

    3 上传数据文件到HDFS

    wordcount.txt:
    在这里插入图片描述
    在这里插入图片描述

    4 编写MapReduce程序

    主要编写Map和Reduce类,其中Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法;Reduce过程需要继承org.apache.hadoop.mapreduce包中Reduce类,并重写其reduce方法。

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    
    public class WordCount {
        public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
            //map方法,划分一行文本,读一个单词写出一个<单词,1>
            public void map(Object key, Text value, Context context)throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);//写出<单词,1>
                }
            }
        }
        //定义reduce类,对相同的单词,把它们中的VList值全部相加
        public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values,Context context)
                    throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();//相当于,将两个1相加
                }
                result.set(sum);
                context.write(key, result);//写出这个单词,和这个单词出现次数<单词,单词出现次数>
            }
        }
        public static void main(String[] args) throws Exception {//主方法,函数入口
            Configuration conf = new Configuration();           //实例化配置文件类
            Job job = new Job(conf, "WordCount");             //实例化Job类
            job.setInputFormatClass(TextInputFormat.class);     //指定使用默认输入格式类
            TextInputFormat.setInputPaths(job, args[0]);      //设置待处理文件的位置
            job.setJarByClass(WordCount.class);               //设置主类名
            job.setMapperClass(TokenizerMapper.class);        //指定使用上述自定义Map类
            job.setCombinerClass(IntSumReducer.class);        //指定开启Combiner函数
            job.setMapOutputKeyClass(Text.class);            //指定Map类输出的,K类型
            job.setMapOutputValueClass(IntWritable.class);     //指定Map类输出的,V类型
            job.setPartitionerClass(HashPartitioner.class);       //指定使用默认的HashPartitioner类
            job.setReducerClass(IntSumReducer.class);         //指定使用上述自定义Reduce类
            job.setNumReduceTasks(Integer.parseInt(args[2]));  //指定Reduce个数
            job.setOutputKeyClass(Text.class);                //指定Reduce类输出的,K类型
            job.setOutputValueClass(Text.class);               //指定Reduce类输出的,V类型
            job.setOutputFormatClass(TextOutputFormat.class);  //指定使用默认输出格式类
            TextOutputFormat.setOutputPath(job, new Path(args[1]));    //设置输出结果文件位置
            System.exit(job.waitForCompletion(true) ? 0 : 1);    //提交任务并监控任务状态
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    在这里插入图片描述

    5 使用命令将代码打包

    上述代码在编译运行的时候会进行报错:
    在这里插入图片描述

    主要是在Hadoop版本3.x中,Job构造函数已过时,需要使用Job.getInstance构造函数。另外,有一个潜在的问题是设置job.setOutputValueClassText.class,但您的Reduce类输出类型是IntWritable,这两者需要匹配。

    下面是修改之后的代码:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    public class WordCount {
    
        public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "WordCount");
            job.setJarByClass(WordCount.class);
    
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径
            FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    下面是打包过程:

    • 在我们创建的java项目根目录下创建一个名为src的文件夹。

    • 将所有的Java源代码文件(.java)移动到src文件夹中。

    • 在项目根目录中创建一个名为Manifest.txt的文件,用于指定JAR文件的入口点。

    • Manifest.txt文件中,添加以下内容:

      Main-Class: 
      
      • 1

      替换为包含main方法的主类的完整类名,例如我的是SalesDriver

    • 回到项目根目录下,使用以下命令编译Java源代码并创建一个临时目录来保存编译后的类文件:

      mkdir classes
      javac -d classes src/*.java
      
      • 1
      • 2

      如果你在使用编译命令时出现程序包×××存在的问题,这个时候我们需要将Hadoop相关的jar文件添加到编译路径中才可以解决:

      javac -classpath /usr/local/servers/hadoop/share/hadoop/common/h
      
      adoop-common-3.1.3.jar:/usr/local/servers/hadoop/share/hadoop/mapreduce/hadoop-map
      
      reduce-client-core-3.1.3.jar -d classes src/*.java
      
      • 1
      • 2
      • 3
      • 4
      • 5

      注意上面的命令是一个而不是多个。

    • 创建一个空的JAR文件,命名为WordCount.jar

      jar -cvf WordCount.jar -C classes/ .
      
      • 1
    • 将编译后的类文件和Manifest.txt添加到JAR文件中:

      jar -uf WordCount.jar -C classes/ .
      
      jar -uf WordCount.jar Mainfest.txt 
      
      • 1
      • 2
      • 3

    到现在,我们的整个java项目就打包成功了。

    6 在Hadoop集群上提交jar文件来运行MapReduce作业

    我们将打包好的WordCount.jar使用如下命令提交到集群上面:

    hadoop jar WordCount.jar WordCount /user/wordcount.txt /wordcount
    
    • 1

    顺利执行之后终端会打印如下信息:

    在这里插入图片描述

    然后我们查看我们的输出目录:

    hdfs dfs -ls /wordcount
    
    • 1

    在这里插入图片描述

    红框所示就是我们需要的结果,我们将其下载下来进行查看:

    hdfs dfs -get /wordcount1/part-r-00000 /root/WordCount
    vim part-r-00000
    
    • 1
    • 2

    在这里插入图片描述
    可以看见运行出我们想要的结果了,至此本次实验结束。

  • 相关阅读:
    java基础巩固19
    Java全栈开发第一阶段--01.Java语言概述(开发环境搭建)
    【Mock】Neo4j知识图谱数据集Mock、问答训练数据集mock
    深度学习笔记:2.Jupyter Notebook
    【C语言刷LeetCode】451. 根据字符出现频率排序(M)
    bit、bin 、mcs文件区别
    斩获 offer 的 Java 面试宝典
    Presto 聚合中groupBy分组的实现
    DataSophon——国产开源一站式运维平台
    Python进阶复习-Numpy库
  • 原文地址:https://blog.csdn.net/qq_52417436/article/details/134040681