• 大数据从入门到实战 - MapReduce基础实战


    什么是MapReduce

    MapReduce是一种可用于数据处理的编程模型,我们现在设想一个场景,你接到一个任务,任务是:挖掘分析我国气象中心近年来的数据日志,该数据日志大小有3T,让你分析计算出每一年的最高气温,如果你现在只有一台计算机,如何处理呢?我想你应该会读取这些数据,并且将读取到的数据与目前的最大气温值进行比较。比较完所有的数据之后就可以得出最高气温了。不过以我们的经验都知道要处理这么多数据肯定是非常耗时的。

    如果我现在给你三台机器,你会如何处理呢?看到下图你应该想到了:最好的处理方式是将这些数据切分成三块,然后分别计算处理这些数据(Map),处理完毕之后发送到一台机器上进行合并(merge),再计算合并之后的数据,归纳(reduce)并输出。

    这就是一个比较完整的MapReduce的过程了。

     

    如何使用MapReduce进行运算

    我们通过一个示例,来体验Map/Reduce的使用。

    我们从一个问题入手:目前我们想统计两个文本文件中,每个单词出现的次数。

    首先我们在当前目录下创建两个文件:

    创建file01输入内容:

     
    
    1. Hello World Bye World

    创建file02输入内容:

     
    
    1. Hello Hadoop Goodbye Hadoop

    将文件上传到HDFS/usr/input/目录下:

    不要忘了启动DFSstart-dfs.sh

    然后创建文件夹并上传:

     

    在右侧代码区域编写,文件WordCount.java,添加如下内容:

    1. public class WordCount {
    2. //Mapper类
    3. /*LongWritable表示每一行起始偏移量
    4. 第一个Text是用来接受文件中的内容,
    5. 第二个Text是用来输出给Reduce类的key,
    6. IntWritable是用来输出给Reduce类的value*/
    7. public static class TokenizerMapper
    8. extends Mapper{
    9. private final static IntWritable one = new IntWritable(1);
    10. private Text word = new Text();
    11. public void map(LongWritable key, Text value, Context context
    12. ) throws IOException, InterruptedException {
    13. StringTokenizer itr = new StringTokenizer(value.toString());
    14. while (itr.hasMoreTokens()) {
    15. word.set(itr.nextToken());
    16. context.write(word, one);
    17. }
    18. }
    19. }
    20. public static class IntSumReducer
    21. extends Reducer {
    22. private IntWritable result = new IntWritable();
    23. public void reduce(Text key, Iterable values,
    24. Context context
    25. ) throws IOException, InterruptedException {
    26. int sum = 0;
    27. for (IntWritable val : values) {
    28. sum += val.get();
    29. }
    30. result.set(sum);
    31. context.write(key, result);
    32. }
    33. }
    34. public static void main(String[] args) throws Exception {
    35. //创建配置对象
    36. Configuration conf = new Configuration();
    37. //创建job对象
    38. Job job = new Job(conf, "word count");
    39. //设置运行job的类
    40. job.setJarByClass(WordCount.class);
    41. //设置Mapper的类
    42. job.setMapperClass(TokenizerMapper.class);
    43. //设置Reduce的类
    44. job.setReducerClass(IntSumReducer.class);
    45. //设置输出的key value格式
    46. job.setOutputKeyClass(Text.class);
    47. job.setOutputValueClass(IntWritable.class);
    48. //设置输入路径
    49. String inputfile = "/usr/input";
    50. //设置输出路径
    51. String outputFile = "/usr/output";
    52. //执行输入
    53. FileInputFormat.addInputPath(job, new Path(inputfile));
    54. //执行输出
    55. FileOutputFormat.setOutputPath(job, new Path(outputFile));
    56. //是否运行成功,true输出0,false输出1
    57. System.exit(job.waitForCompletion(true) ? 0 : 1);
    58. }
    59. }

    点击评测,运行代码,可以看到/usr/output目录下已经生成了文件。

     

    我们来查看part--r-00000文件的内容:

     

    可以看到统计的数据已经生成在文件中了。

    如果你还想要运行一次,那么你需要删除输出路径的文件夹和文件。

    代码解释

    示例中,Map/Reduce程序总共分为三块即:Map,Recude,JobMap负责处理输入文件的内容。

     

    TokenizerMappermap方法,它通过StringTokenizer 以空格为分隔符将一行切分为若干tokens,之后,输出< , 1> 形式的键值对。

    对于示例中的第一个输入,map输出是:
    < Hello, 1>
    < World, 1>
    < Bye, 1>
    < World, 1>

    第二个输入,map输出是:
    < Hello, 1>
    < Hadoop, 1>
    < Goodbye, 1>
    < Hadoop, 1>

    WordCount还指定了一个combiner。因此,每次map运行之后,会对输出按照key进行排序,然后把输出传递给本地的combiner(按照作业的配置与Reducer一样),进行本地聚合

     

    第一个map的输出是:
    < Bye, 1>
    < Hello, 1>
    < World, 2>

    第二个map的输出是:
    < Goodbye, 1>
    < Hadoop, 2>
    < Hello, 1>

    reduce的数据是这样的:

    < Bye , [1]>
    < GoodBye , [1]>
    < Hadoop , [2]>
    < Hello , [1,1]>
    < World , [2]>

    Reducer中的reduce方法 仅是将每个key(本例中就是单词)出现的次数求和。

     

    因此这个作业的输出就是:
    < Bye, 1>
    < Goodbye, 1>
    < Hadoop, 2>
    < Hello, 2>
    < World, 2>

    在之后的实训中我们还会学习到JobConf, JobClient,Tool这些对象。

    编程要求

    使用MapReduce计算班级每个学生的最好成绩,输入文件路径为/user/test/input,请将计算后的结果输出到/user/test/output/目录下。

    代码文件

    1. import java.io.IOException;
    2. import java.util.StringTokenizer;
    3. import java.io.IOException;
    4. import java.util.StringTokenizer;
    5. import org.apache.hadoop.conf.Configuration;
    6. import org.apache.hadoop.fs.Path;
    7. import org.apache.hadoop.io.*;
    8. import org.apache.hadoop.io.Text;
    9. import org.apache.hadoop.mapreduce.Job;
    10. import org.apache.hadoop.mapreduce.Mapper;
    11. import org.apache.hadoop.mapreduce.Reducer;
    12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    14. import org.apache.hadoop.util.GenericOptionsParser;
    15. public class WordCount {
    16. /********** Begin **********/
    17. //Mapper函数
    18. public static class TokenizerMapper extends Mapper {
    19. private final static IntWritable one = new IntWritable(1);
    20. private Text word = new Text();
    21. private int maxValue = 0;
    22. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    23. StringTokenizer itr = new StringTokenizer(value.toString(),"\n");
    24. while (itr.hasMoreTokens()) {
    25. String[] str = itr.nextToken().split(" ");
    26. String name = str[0];
    27. one.set(Integer.parseInt(str[1]));
    28. word.set(name);
    29. context.write(word,one);
    30. }
    31. //context.write(word,one);
    32. }
    33. }
    34. public static class IntSumReducer extends Reducer {
    35. private IntWritable result = new IntWritable();
    36. public void reduce(Text key, Iterable values, Context context)
    37. throws IOException, InterruptedException {
    38. int maxAge = 0;
    39. int age = 0;
    40. for (IntWritable intWritable : values) {
    41. maxAge = Math.max(maxAge, intWritable.get());
    42. }
    43. result.set(maxAge);
    44. context.write(key, result);
    45. }
    46. }
    47. public static void main(String[] args) throws Exception {
    48. Configuration conf = new Configuration();
    49. Job job = new Job(conf, "word count");
    50. job.setJarByClass(WordCount.class);
    51. job.setMapperClass(TokenizerMapper.class);
    52. job.setCombinerClass(IntSumReducer.class);
    53. job.setReducerClass(IntSumReducer.class);
    54. job.setOutputKeyClass(Text.class);
    55. job.setOutputValueClass(IntWritable.class);
    56. String inputfile = "/user/test/input";
    57. String outputFile = "/user/test/output/";
    58. FileInputFormat.addInputPath(job, new Path(inputfile));
    59. FileOutputFormat.setOutputPath(job, new Path(outputFile));
    60. job.waitForCompletion(true);
    61. /********** End **********/
    62. }
    63. }

    命令行

    1. root@evassh-12041368:~# touch file01
    2. root@evassh-12041368:~# echo Hello World Bye World
    3. Hello World Bye World
    4. root@evassh-12041368:~# cat file01
    5. Hello World Bye World
    6. root@evassh-12041368:~# echo Hello World Bye World >file01
    7. root@evassh-12041368:~# cat file01
    8. Hello World Bye World
    9. root@evassh-12041368:~# touch file02
    10. root@evassh-12041368:~# echo Hello Hadoop Goodbye Hadoop >file02
    11. root@evassh-12041368:~# cat file02
    12. Hello Hadoop Goodbye Hadoop
    13. root@evassh-12041368:~# start-dfs.sh
    14. Starting namenodes on [localhost]
    15. localhost: Warning: Permanently added 'localhost' (ECDSA) to the list of known hosts.
    16. localhost: starting namenode, logging to /usr/local/hadoop/logs/hadoop-root-namenode-evassh-12041368.out
    17. localhost: Warning: Permanently added 'localhost' (ECDSA) to the list of known hosts.
    18. localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-root-datanode-evassh-12041368.out
    19. Starting secondary namenodes [0.0.0.0]
    20. 0.0.0.0: Warning: Permanently added '0.0.0.0' (ECDSA) to the list of known hosts.
    21. 0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-root-secondarynamenode-evassh-12041368.out
    22. root@evassh-12041368:~# hadoop fs -mkdir /usr
    23. mkdir: Cannot create directory /usr. Name node is in safe mode.
    24. root@evassh-12041368:~# hadoop fs -mkdir /usr/
    25. root@evassh-12041368:~# hadoop fs -mkdir /usr/input
    26. root@evassh-12041368:~# hadoop fs -ls /usr/output
    27. ls: `/usr/output': No such file or directory
    28. root@evassh-12041368:~# hadoop fs -mkdir /usr/output
    29. root@evassh-12041368:~# hadoop fs -ls /usr/output
    30. root@evassh-12041368:~# hadoop fs -ls /
    31. Found 3 items
    32. drwxr-xr-x - root supergroup 0 2022-07-27 06:44 /uer
    33. drwxr-xr-x - root supergroup 0 2022-07-27 06:46 /user
    34. drwxr-xr-x - root supergroup 0 2022-07-27 07:02 /usr
    35. root@evassh-12041368:~# hadoop fs -ls /usr
    36. Found 2 items
    37. drwxr-xr-x - root supergroup 0 2022-07-27 07:00 /usr/input
    38. drwxr-xr-x - root supergroup 0 2022-07-27 07:02 /usr/output
    39. root@evassh-12041368:~# hadoop fs -put file01 /usr/input
    40. root@evassh-12041368:~# hadoop fs -put file02 /usr/input
    41. root@evassh-12041368:~# hadoop fs -ls /usr/input
    42. Found 2 items
    43. -rw-r--r-- 1 root supergroup 22 2022-07-27 07:03 /usr/input/file01
    44. -rw-r--r-- 1 root supergroup 28 2022-07-27 07:05 /usr/input/file02
    45. root@evassh-12041368:~#

    结果

     

  • 相关阅读:
    java中string长度有限制吗,最大是多少?
    CSS3 弹性盒子(Flex Box)
    Android:自定义原生TimePickerDialog样式
    C++11 lambda表达式 可调用对象包装器function bind
    Spring Cloud 部署时如何使用 Kubernetes 作为注册中心和配置中心
    Android系统_MSM8953_android10_adb连接adbd加入密码检测
    硅树脂油漆申请美国标准UL 790 Class A 合适吗?
    TCP重头戏来!了!(3)—— 小林图解学习摘记
    docker镜像打包上传阿里云镜像仓库
    JMeter入门教程(9) --参数化
  • 原文地址:https://blog.csdn.net/qq_61604164/article/details/126015302