• MapReduce编程规范及示例编写


    1、Mapper类

    • 用户自定义一个Mapper类继承Hadoop的Mapper类
    • Mapper的输入数据是KV对的形式(类型可以自定义)
    • Map阶段的业务逻辑定义在map()方法中
    • Mapper的输出数据是KV对的形式(类型可以自定义)

    注意:map()方法是对输入的一个KV对调用一次!!

    2、Reducer类

    • 用户自定义Reducer类要继承Hadoop的Reducer类
    • Reducer的输入数据类型对应Mapper的输出数据类型(KV对)
    • Reducer的业务逻辑写在reduce()方法中
    • Reduce()方法是对相同K的一组KV对调用执行一次

    3、Driver阶段

            创建提交YARN集群运行的Job对象,其中封装了MapReduce程序运行所需要的相关参数入输入数据路径,输出数据路径等,也相当于是一个YARN集群的客户端,主要作用就是提交我们MapReduce程序运行。

     

    4、WordCount代码实现

    4.1、需求

    • 在给定的文本文件中统计输出每一个单词出现的总次数
    • 输入数据:wc.txt;
    • 输出:

      apache 2
      clickhouse 2
      hadoop 1
      mapreduce 1
      spark 2
      xiaoming 1

    4.2、具体步骤

    按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。

    (1)新建maven工程

    • 导入hadoop依赖
      1. "1.0" encoding="UTF-8"?>
      2. <project xmlns="http://maven.apache.org/POM/4.0.0"
      3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      5. <modelVersion>4.0.0modelVersion>
      6. <groupId>com.lagougroupId>
      7. <artifactId>WordcountartifactId>
      8. <version>1.0-SNAPSHOTversion>
      9. <dependencies>
      10. <dependency>
      11. <groupId>org.apache.logging.log4jgroupId>
      12. <artifactId>log4j-coreartifactId>
      13. <version>2.8.2version>
      14. dependency>
      15. <dependency>
      16. <groupId>org.apache.hadoopgroupId>
      17. <artifactId>hadoop-commonartifactId>
      18. <version>2.9.2version>
      19. dependency>
      20. <dependency>
      21. <groupId>org.apache.hadoopgroupId>
      22. <artifactId>hadoop-clientartifactId>
      23. <version>2.9.2version>
      24. dependency>
      25. <dependency>
      26. <groupId>org.apache.hadoopgroupId>
      27. <artifactId>hadoop-hdfsartifactId>
      28. <version>2.9.2version>
      29. dependency>
      30. dependencies>
      31. <build>
      32. <plugins>
      33. <plugin>
      34. <groupId>org.apache.maven.pluginsgroupId>
      35. <artifactId>maven-compiler-pluginartifactId>
      36. <version>3.5.1version>
      37. <configuration>
      38. <source>1.8source>
      39. <target>1.8target>
      40. configuration>
      41. plugin>
      42. <plugin>
      43. <artifactId>maven-compiler-pluginartifactId>
      44. <version>2.3.2version>
      45. <configuration>
      46. <source>1.8source>
      47. <target>1.8target>
      48. configuration>
      49. plugin>
      50. <plugin>
      51. <artifactId>maven-assembly-pluginartifactId>
      52. <configuration>
      53. <descriptorRefs>
      54. <descriptorRef>jar-with-dependenciesdescriptorRef>
      55. descriptorRefs>
      56. configuration>
      57. <executions>
      58. <execution>
      59. <id>make-assemblyid>
      60. <phase>packagephase>
      61. <goals>
      62. <goal>singlegoal>
      63. goals>
      64. execution>
      65. executions>
      66. plugin>
      67. plugins>
      68. build>
      69. project>
    • 添加log4j.properties
      1. log4j.rootLogger=INFO, stdout
      2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
      3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
      4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
      5. log4j.appender.logfile=org.apache.log4j.FileAppender
      6. log4j.appender.logfile.File=target/spring.log
      7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
      8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

    (2)整体思路梳理(仿照源码)

    Map阶段:

    1. map()方法中把传入的数据转为String类型
    2. 根据空格切分出单词
    3. 输出<单词,1>

    Reduce阶段:

    1. 汇总各个key(单词)的个数,遍历value数据进行累加
    2. 输出key的总数

    Driver

    1. 获取配置文件对象,获取job对象实例
    2. 指定程序jar的本地路径
    3. 指定Mapper/Reducer类
    4. 指定Mapper输出的kv数据类型
    5. 指定最终输出的kv数据类型
    6. 指定job处理的原始数据路径
    7. 指定job输出结果路径
    8. 提交作业

    (3)编写Mapper类

    1. package com.lagou.mr.wc;
    2. import org.apache.hadoop.io.*;
    3. import org.apache.hadoop.mapreduce.Mapper;
    4. import java.io.IOException;
    5. // 继承Mapper类
    6. // Mapper类的泛型参数共四个,2对kv
    7. /*
    8. * 第一对kv:map输入参数类型
    9. * 第二队kv:map输出参数类型
    10. * LongWritable, Text ->文本偏移量(后面不会用到),一行文本内容
    11. * Text, IntWritable ->单词,1
    12. */
    13. public class WordCountMapper extends Mapper {
    14. // 重写Mapper类的map方法
    15. /**
    16. * 1、接收文本内容,转为String类型
    17. * 2、按照空格进行拆分
    18. * 3、输出<单词, 1>
    19. */
    20. // 提升为全局方法,避免每次执行map方法,都执行此操作
    21. Text word = new Text();
    22. IntWritable one = new IntWritable(1);
    23. // LongWritable, Text ->文本偏移量,一行文本内容,map方法的输入参数,一行文本调用一次map方法
    24. @Override
    25. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    26. // 1、接收文本内容,转为String类型
    27. String str = value.toString();
    28. // 2、按照空格进行拆分
    29. String[] words = str.split(" ");
    30. // 3、输出<单词, 1>
    31. // 遍历数据
    32. for (String s : words) {
    33. word.set(s);
    34. context.write(word, one);
    35. }
    36. }
    37. }

    继承的Mapper类型选择新版本API:

    (4)编写Reducer类

    1. package com.lagou.mr.wc;
    2. import org.apache.hadoop.mapreduce.Reducer;
    3. import org.apache.hadoop.io.*;
    4. import java.io.IOException;
    5. // 继承的Reducer类有四个泛型参数 2对kv
    6. // 第一对kv:类型要与Mapper输出类型一致:Text, IntWritable
    7. public class WordCountReducer extends Reducer {
    8. // 1、重写reduce方法
    9. // Text key:map方法输出的key,本案中就是单词,
    10. // Iterable values: 一组key相同的kv的value组成的集合
    11. /**
    12. * 假设map方法:hello 1; hello 1; hello 1
    13. * reduce的key和value是什么
    14. * key:hello
    15. * values:<1,1,1>
    16. *

    17. * 假设map方法输出:hello 1, hello 1, hadoop 1, mapreduce 1, hadoop 1
    18. * reduce的key和value是什么?
    19. * reduce方法何时调用:一组key相同的kv中的value组成集合然后调用一次reduce方法
    20. * 第一次:key:hello ,values:<1,1,1>
    21. * 第二次:key:hadoop ,values<1,1>
    22. * 第三次:key:mapreduce ,values<1>
    23. */
    24. IntWritable total = new IntWritable();
    25. @Override
    26. protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
    27. // 2、遍历key对应的values,然后累加结果
    28. int sum = 0;
    29. for (IntWritable value : values) {
    30. int i = value.get();
    31. sum += 1;
    32. }
    33. // 3、直接输出当前key对应的sum值,结果就是单词出现的总次数
    34. total.set(sum);
    35. context.write(key, total);
    36. }
    37. }

    选择继承的Reducer类

    (5) 编写Driver驱动类

    1. package com.lagou.mr.wc;
    2. import org.apache.hadoop.conf.Configuration;
    3. import org.apache.hadoop.fs.Path;
    4. import org.apache.hadoop.io.IntWritable;
    5. import org.apache.hadoop.io.Text;
    6. import org.apache.hadoop.mapreduce.Job;
    7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    9. import java.io.IOException;
    10. // 封装任务并提交运行
    11. public class WordCountDriver {
    12. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    13. /*
    14. 1. 获取配置文件对象,获取job对象实例
    15. 2. 指定程序jar的本地路径
    16. 3. 指定Mapper/Reducer类
    17. 4. 指定Mapper输出的kv数据类型
    18. 5. 指定最终输出的kv数据类型
    19. 6. 指定job处理的原始数据路径
    20. 7. 指定job输出结果路径
    21. 8. 提交作业
    22. */
    23. // 1. 获取配置文件对象,获取job对象实例
    24. Configuration conf = new Configuration();
    25. Job job = Job.getInstance(conf, "WordCountDriver");/// jobName可以自定义
    26. // 2. 指定程序jar的本地路径
    27. job.setJarByClass(WordCountDriver.class);
    28. // 3. 指定Mapper/Reducer类
    29. job.setMapperClass(WordCountMapper.class);
    30. job.setReducerClass(WordCountReducer.class);
    31. // 4. 指定Mapper输出的kv数据类型
    32. job.setMapOutputKeyClass(Text.class);
    33. job.setMapOutputValueClass(IntWritable.class);
    34. // 5. 指定最终输出的kv数据类型
    35. job.setOutputKeyClass(Text.class);
    36. job.setOutputValueClass(IntWritable.class);
    37. // 6. 指定job处理的原始数据路径
    38. FileInputFormat.setInputPaths(job, new Path(args[0])); // 指定读取数据的原始路径
    39. // 7. 指定job输出结果路径
    40. FileOutputFormat.setOutputPath(job, new Path(args[1])); // 指定结果数据输出路径
    41. // 8. 提交作业
    42. boolean flag = job.waitForCompletion(true);
    43. // jvm退出:正常退出0,非0值则是错误退出
    44. System.exit(flag ? 0 : 1);
    45. }
    46. }

     

    运行任务

    1、本地模式

    直接Idea中运行驱动类即可

    idea运行需要传入参数:

    选择editconfiguration 

    在program arguments设置参数 

    运行时报错 ---->  参见博文org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z的解决办法_zhouang770377的博客-CSDN博客

    运行结束,去到输出结果路径查看结果

            注意本地idea运行mr任务与集群没有任何关系,没有提交任务到yarn集群,是在本地使用多线程方式模拟的mr的运行。 

    2、Yarn集群模式

    把程序打成jar包,改名为wc.jar;上传到Hadoop集群

    选择合适的Jar包

    准备原始数据文件,上传到HDFS的路径,不能是本地路径,因为跨节点运行无法获取数据!! 

    启动Hadoop集群(Hdfs,Yarn)

    使用Hadoop 命令提交任务运行

    hadoop jar wc.jar com.lagou.wordcount.WordcountDriver /user/lagou/input /user/lagou/output

    Yarn集群任务运行成功展示图

  • 相关阅读:
    10 分钟彻底理解 Redis 的持久化和主从复制
    Linux读写锁的容易犯的问题
    python pip安装超时使用国内镜像
    8、Spring 源码学习 ~ 自定义标签的解析
    算法 分糖果-(贪心)
    jbase代码生成器(成型篇)
    Linux -开机、重启和用户登录注销
    2024系统架构师---论软件系统架构风格
    MySQL 锁常见知识点&面试题总结
    绿米Aqara S1【妙控开关 S1E】版本降级或升级自定义固件的方法
  • 原文地址:https://blog.csdn.net/weixin_52851967/article/details/127131833