• 大数据随记 —— WordCount 案例


    大数据系列文章👉 目录 👈

    在这里插入图片描述

    一、WordCount 案例简介

    MapReduce 的命名可以看出,MapReduce 主要由两个阶段组成:Map 与 Reduce。通过编写 map() 函数与 reduce() 函数,就可以使用 MapReduce 完成分布式程序的设计。

    M a p R e d u c e { M a p ⇒ m a p ( ) 函数 R e d u c e ⇒ r e d u c e ( ) 函数 MapReduce \left\{ Mapmap()Reducereduce() \right . MapReduce MapReducemap()函数reduce()函数

    最简单的 MapReduce 程序应该包括 Map 函数、Reduce 函数来实现 MapReduce 的两个阶段,并用一个 main 函数(有的地方也被称为 driver)对以上两个函数的输入输出进行操作。

    而这里所介绍的 WordCount 案例就类似于 MapReduce 中的 “Hello World”,通过分析大量的文本,来统计文本中所出现的单词的个数。

    二、WordCount 实现

    假设我们有一个文本文件,并需要统计其中的锁板喊的单词的个数。

    Hello World Hadoop
    Hello Hadoop World Hello
    Hello World Hadoop Bye
    Bye Hadoop World
    
    • 1
    • 2
    • 3
    • 4

    请添加图片描述

    1、WordCount 实现分析

    Map 阶段主要是通过读取文本,并将其中的单词组成 map,变成 形式。

    Ⅰ、Map 阶段

    在这里,一共读取了以下四行:

    ① 读取第一行 Hello World Hadoop,并分割该行单词形成 map:

    ② 读取第二行 Hello Hadoop World Hello,并分割该行单词形成 map:

    ③ 读取第三行 Hello World Hadoop Bye,并分割该行单词形成 map:

    ④ 读取第四行 Bye Hadoop World,并分割该行单词形成 map:

    Ⅱ、Reduce 阶段

    ① 根据 Map 阶段的结果将相同 key 组合成 形式的数组。

    ② 遍历 key 中的 value[] 数组,分别统计每个单词出现的次数,合并成

    2、WordCount 代码实现

    Ⅰ、Maven 依赖配置

    新建 Maven 工程,并配置以下内容到 pom.xml。

    <dependency>  
        <groupId>org.apache.hadoopgroupId>  
        <artifactId>hadoop-commonartifactId>  
        <version>2.7.2version>  
    dependency>  
    
    <dependency>  
        <groupId>org.apache.hadoopgroupId>  
        <artifactId>hadoop-clientartifactId>  
        <version>2.7.2version>  
    dependency>  
    
      
    <dependency>  
        <groupId>org.apache.hadoopgroupId>  
        <artifactId>hadoop-hdfsartifactId>  
        <version>2.7.2version>  
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    Ⅱ、WordCount 代码实现
    a、map 函数
    public void map(Object key, Text value, Context context)
    
    • 1

    创建 Map 类继承 Mapper 类,并实现 map 方法。map 方法里有三个参数,前两个参数 (Object key, Text value) 即是输入所需的 key 与 value,第三个参数(Context context)则记录了 map 执行的上下文,用来记录 job 运行时的一些信息。

    b、reduce 函数
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
    
    • 1

    创建 Reduce 类继承 Reducer 类,并实现 reduce 方法。reduce 方法中也有三个参数,前两个参数 (Text key, Iterable values) 对应着 map 方法对 key 与 value,第三个参数 (Context context) 与 map 的 context 的作用相同。

    c、main 函数

    大部分 MapReduce 程序的 main 函数都是差不多的,主要都包含以下部分:

    ① Configuration 类
    Configuration conf = new Configuration();
    
    • 1

    MapReduce 程序都需要初始化 Configuration,以此来读取 MapReduce 的系统配置信息。

    ② Job 类
    Job job = Job.getInstance(conf,"WordCount");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    
    • 1
    • 2
    • 3
    • 4

    通过 Job 类中的 getInstance() 函数,可以得到系统当前已经实例化的该类对象。getInstance() 的第一个参数是 conf,第二个参数是 job 的名称。第三行与第四行用来指定 map 函数与 reduce 函数的实现类。

    ③ 设置 map/reduce 的 key/value
    job.setOutputKeyClass(Text.class);  
    job.setOutputValueClass(LongWritable.class);
    
    • 1
    • 2

    这里用来指定 key 与 value 的类型。

    ④ 设置 Job 的输入输出路径
    // String InputPATH = "hdfs://master:8020/Input";  
    // String OutputPATH = "hdfs://master:8020/Output";
    String InputPATH = args[0];  
    String OutputPATH = args[1];  
    
    
    FileInputFormat.addInputPath(job,inputPath);
    FileOutputFormat.setOutputPath(job,outputPath);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这里用来设置 Job 的输入输出路径,可以直接在代码中指定输入输出路径,也可以使用命令行传输传入参数。

    ⑤ 提交代码
    job.waitForCompletion(true);
    
    • 1

    最后这行代码用来程序结束后提交代码。

    Ⅲ、所有代码
    package mapreduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.StringTokenizer;
    
    public class WordCount {
    
        public static class Map extends Mapper<LongWritable, Text,Text,LongWritable>{
    
            @Override
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String string = value.toString();
    
                String[] spilts = string.split(" ");
    
                for(String spilt:spilts){
                    context.write(new Text(spilt),new LongWritable(1L));
                }
            }
    
        }
    
        public static class Reduce extends Reducer<Text,LongWritable,Text,LongWritable>{
    
            @Override
            public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
    
                int sum = 0;
                for(LongWritable val:values){
                    sum += val.get();
                }
    
                context.write(key,new LongWritable(sum));
            }
        }
    
        public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
            //String InputPATH = args[0];
            //String OutputPATH = args[1];
            String InputPATH = "hdfs://master:8020/Input";
            String OutputPATH = "hdfs://master:8020/Output";
    
            Configuration conf = new Configuration();
    
            final FileSystem fileSystem = FileSystem.get(new URI(InputPATH),conf);
            if(fileSystem.exists(new Path(OutputPATH))){
                fileSystem.delete(new Path(OutputPATH),true);
            }
    
            Job job = Job.getInstance(conf,"WordCount");
    
            // 指定 Jar 文件
            job.setJarByClass(WordCount.class);
    
            // 设置 map
            job.setMapperClass(Map.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
    
    
            // 设置 reduce
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            // 设置 input format
            job.setInputFormatClass(TextInputFormat.class);
            Path inputPath = new Path(InputPATH);
            FileInputFormat.addInputPath(job,inputPath);
    
            // 设置 output format
            job.setOutputFormatClass(TextOutputFormat.class);
            Path outputPath = new Path(OutputPATH);
            FileOutputFormat.setOutputPath(job,outputPath);
    
            // 提交 job
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    Ⅳ、打包运行
    ① 打包

    使用 IDEA 的 Maven 打包,并通过 XShell 放到相应文件夹下

    请添加图片描述

    请添加图片描述

    ② 运行
    hadoop jar Hadoop_Demo-1.0-SNAPSHOT-jar-with-dependencies.jar mapreduce.WordCount /input /output
    
    • 1

    注意:我这里 input 目录是由数据的

    请添加图片描述

    可以看到内容输出。

    在这里插入图片描述

  • 相关阅读:
    Spring学习第6篇: 基于注解使用IOC
    金仓数据库KingbaseES客户端应用参考手册--3. createdb
    EXCEL——计算数据分散程度的相关函数
    APT组织最喜欢的工具 Cobalt Strike (CS) 实战
    Python基础入门篇【34】--面向对象:类的多重继承与类的常用高级函数
    攻防世界-WEB-fileinclude
    代码随想录算法训练营第七天|454. 四数相加 II
    C#泛型的逆变协变(个人理解)
    北漂七年拿过阿里、腾讯、华为offer的资深架构师,分享经验总结
    手撕二叉搜索树(Binary Search Tree)
  • 原文地址:https://blog.csdn.net/qq_21484461/article/details/126670750