注意:map()方法是对输入的一个KV对调用一次!!
创建提交YARN集群运行的Job对象,其中封装了MapReduce程序运行所需要的相关参数入输入数据路径,输出数据路径等,也相当于是一个YARN集群的客户端,主要作用就是提交我们MapReduce程序运行。
apache 2
clickhouse 2
hadoop 1
mapreduce 1
spark 2
xiaoming 1
按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。
(1)新建maven工程
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0modelVersion>
-
- <groupId>com.lagougroupId>
- <artifactId>WordcountartifactId>
- <version>1.0-SNAPSHOTversion>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.logging.log4jgroupId>
- <artifactId>log4j-coreartifactId>
- <version>2.8.2version>
- dependency>
- <dependency>
- <groupId>org.apache.hadoopgroupId>
- <artifactId>hadoop-commonartifactId>
- <version>2.9.2version>
- dependency>
- <dependency>
- <groupId>org.apache.hadoopgroupId>
- <artifactId>hadoop-clientartifactId>
- <version>2.9.2version>
- dependency>
- <dependency>
- <groupId>org.apache.hadoopgroupId>
- <artifactId>hadoop-hdfsartifactId>
- <version>2.9.2version>
- dependency>
- dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.pluginsgroupId>
- <artifactId>maven-compiler-pluginartifactId>
- <version>3.5.1version>
- <configuration>
- <source>1.8source>
- <target>1.8target>
- configuration>
- plugin>
-
-
-
- <plugin>
- <artifactId>maven-compiler-pluginartifactId>
- <version>2.3.2version>
- <configuration>
- <source>1.8source>
- <target>1.8target>
- configuration>
- plugin>
- <plugin>
- <artifactId>maven-assembly-pluginartifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependenciesdescriptorRef>
- descriptorRefs>
- configuration>
- <executions>
- <execution>
- <id>make-assemblyid>
- <phase>packagephase>
- <goals>
- <goal>singlegoal>
- goals>
- execution>
- executions>
- plugin>
- plugins>
- build>
-
- project>
- log4j.rootLogger=INFO, stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
- log4j.appender.logfile=org.apache.log4j.FileAppender
- log4j.appender.logfile.File=target/spring.log
- log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
- log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
(2)整体思路梳理(仿照源码)
Map阶段:
Reduce阶段:
Driver
(3)编写Mapper类
- package com.lagou.mr.wc;
-
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapreduce.Mapper;
-
- import java.io.IOException;
-
- // 继承Mapper类
- // Mapper类的泛型参数共四个,2对kv
- /*
- * 第一对kv:map输入参数类型
- * 第二队kv:map输出参数类型
- * LongWritable, Text ->文本偏移量(后面不会用到),一行文本内容
- * Text, IntWritable ->单词,1
- */
- public class WordCountMapper extends Mapper
{ - // 重写Mapper类的map方法
- /**
- * 1、接收文本内容,转为String类型
- * 2、按照空格进行拆分
- * 3、输出<单词, 1>
- */
-
- // 提升为全局方法,避免每次执行map方法,都执行此操作
- Text word = new Text();
- IntWritable one = new IntWritable(1);
-
- // LongWritable, Text ->文本偏移量,一行文本内容,map方法的输入参数,一行文本调用一次map方法
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- // 1、接收文本内容,转为String类型
- String str = value.toString();
- // 2、按照空格进行拆分
- String[] words = str.split(" ");
- // 3、输出<单词, 1>
- // 遍历数据
- for (String s : words) {
- word.set(s);
- context.write(word, one);
- }
-
- }
- }
继承的Mapper类型选择新版本API:
(4)编写Reducer类
- package com.lagou.mr.wc;
-
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.io.*;
-
- import java.io.IOException;
-
- // 继承的Reducer类有四个泛型参数 2对kv
- // 第一对kv:类型要与Mapper输出类型一致:Text, IntWritable
- public class WordCountReducer extends Reducer
{ - // 1、重写reduce方法
-
- // Text key:map方法输出的key,本案中就是单词,
- // Iterable
values: 一组key相同的kv的value组成的集合 -
- /**
- * 假设map方法:hello 1; hello 1; hello 1
- * reduce的key和value是什么
- * key:hello
- * values:<1,1,1>
- *
- * 假设map方法输出:hello 1, hello 1, hadoop 1, mapreduce 1, hadoop 1
- * reduce的key和value是什么?
- * reduce方法何时调用:一组key相同的kv中的value组成集合然后调用一次reduce方法
- * 第一次:key:hello ,values:<1,1,1>
- * 第二次:key:hadoop ,values<1,1>
- * 第三次:key:mapreduce ,values<1>
- */
- IntWritable total = new IntWritable();
-
- @Override
- protected void reduce(Text key, Iterable
values, Reducer.Context context) throws IOException, InterruptedException { - // 2、遍历key对应的values,然后累加结果
- int sum = 0;
- for (IntWritable value : values) {
- int i = value.get();
- sum += 1;
- }
- // 3、直接输出当前key对应的sum值,结果就是单词出现的总次数
- total.set(sum);
- context.write(key, total);
- }
- }
选择继承的Reducer类
(5) 编写Driver驱动类
- package com.lagou.mr.wc;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- import java.io.IOException;
-
- // 封装任务并提交运行
- public class WordCountDriver {
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- /*
- 1. 获取配置文件对象,获取job对象实例
- 2. 指定程序jar的本地路径
- 3. 指定Mapper/Reducer类
- 4. 指定Mapper输出的kv数据类型
- 5. 指定最终输出的kv数据类型
- 6. 指定job处理的原始数据路径
- 7. 指定job输出结果路径
- 8. 提交作业
- */
-
- // 1. 获取配置文件对象,获取job对象实例
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf, "WordCountDriver");/// jobName可以自定义
- // 2. 指定程序jar的本地路径
- job.setJarByClass(WordCountDriver.class);
-
- // 3. 指定Mapper/Reducer类
- job.setMapperClass(WordCountMapper.class);
- job.setReducerClass(WordCountReducer.class);
-
- // 4. 指定Mapper输出的kv数据类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
-
- // 5. 指定最终输出的kv数据类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- // 6. 指定job处理的原始数据路径
- FileInputFormat.setInputPaths(job, new Path(args[0])); // 指定读取数据的原始路径
-
- // 7. 指定job输出结果路径
- FileOutputFormat.setOutputPath(job, new Path(args[1])); // 指定结果数据输出路径
-
- // 8. 提交作业
- boolean flag = job.waitForCompletion(true);
- // jvm退出:正常退出0,非0值则是错误退出
- System.exit(flag ? 0 : 1);
-
- }
- }
直接Idea中运行驱动类即可
idea运行需要传入参数:
选择editconfiguration
在program arguments设置参数
运行时报错 ----> 参见博文org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z的解决办法_zhouang770377的博客-CSDN博客
运行结束,去到输出结果路径查看结果
注意本地idea运行mr任务与集群没有任何关系,没有提交任务到yarn集群,是在本地使用多线程方式模拟的mr的运行。
把程序打成jar包,改名为wc.jar;上传到Hadoop集群
选择合适的Jar包
准备原始数据文件,上传到HDFS的路径,不能是本地路径,因为跨节点运行无法获取数据!!
启动Hadoop集群(Hdfs,Yarn)
使用Hadoop 命令提交任务运行
hadoop jar wc.jar com.lagou.wordcount.WordcountDriver /user/lagou/input /user/lagou/output
Yarn集群任务运行成功展示图