目录
- package com.sanqian.mr;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- import java.io.IOException;
-
- /**
- * 需求: 读取HDFS上hello.txt为念,计算文件中每个单词出现的次数
- */
- public class WorldCountJob {
- /**
- * Map阶段
- */
- public static class MyMapper extends Mapper
{ - /**
- * 需要实现map函数
- * 这个map函数可以接收
,产生 - * @param k1
- * @param v1
- * @param context
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
- //k1 代表的是每一行数据的行首偏移量,va1代表的是每一行内容
- //对每一行的内容进行切分,把单词切出来
- String[] words = v1.toString().split(" ");
- //迭代切割出来的单词数据
- for (String word: words){
- // 把迭代出来的单词封装称
的形式 - Text k2 = new Text(word);
- LongWritable v2 = new LongWritable(1L);
- // 把
写出去 - context.write(k2, v2);
- }
-
- }
- }
-
- /**
- * Reduce阶段
- */
- public static class MyReduce extends Reducer
{ - /**
- * 针对
的数据进行累加求和,并且把数据转换成k3,v3写出去 - * @param k2
- * @param v2s
- * @param context
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- protected void reduce(Text k2, Iterable
v2s, Context context) throws IOException, InterruptedException { - // 创建sum变量,保存v2s的和
- long sum = 0L;
- // 对v2s中的数据进行累加求和
- for (LongWritable v2: v2s){
- sum += v2.get();
- }
-
- // 组装k3, v3
- Text k3 = k2;
- LongWritable v3 = new LongWritable(sum);
- //把结果写出去
- context.write(k3, v3);
- }
- }
-
- /**
- * 组装job : map + reduce
- */
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- /**
- * args[0]: 全类名, args[1]: HDFS 输入路径, args[2]: HDFS 输出路径
- */
- if (args.length != 3){
- //如果传递的参数不够,直接退出
- System.out.println("参数的长度为:" + args.length);
- System.exit(100);
- }
- // 创建一个job
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
-
- // 注意了:这一行必须设置,否则在集群中执行的时候找不到WordCountJob这个类
- job.setJarByClass(WorldCountJob.class);
-
- // 指定输入路径(可以是文件,也可以是目录)
- FileInputFormat.setInputPaths(job, new Path(args[1]));
- // 指定输出路径(只能指定一个不存在的目录)
- FileOutputFormat.setOutputPath(job, new Path(args[2]));
-
- //指定map相关代码
- job.setMapperClass(MyMapper.class);
- // 指定k2的类型
- job.setMapOutputKeyClass(Text.class);
- // 指定v2的类型
- job.setMapOutputValueClass(LongWritable.class);
-
- //指定reduce相关的代码
- job.setReducerClass(MyReduce.class);
- //指定k3的类型
- job.setOutputKeyClass(Text.class);
- //指定v3的类型
- job.setOutputValueClass(LongWritable.class);
-
- //提交job
- job.waitForCompletion(true);
- }
- }
说明:
1.
provided 表示这个依赖只在编译的时候使用,在执行或打jar包的时候不适用;集群已有的jar包,这里可以不用再次打包
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0modelVersion>
-
- <groupId>org.examplegroupId>
- <artifactId>db_bigdataartifactId>
- <version>1.0-SNAPSHOTversion>
-
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.hadoopgroupId>
- <artifactId>hadoop-clientartifactId>
- <version>2.7.2version>
-
- <scope>providedscope>
- dependency>
-
- <dependency>
- <groupId>org.apache.hadoopgroupId>
- <artifactId>hadoop-commonartifactId>
- <version>2.7.2version>
- <scope>providedscope>
- dependency>
- <dependency>
- <groupId>org.apache.hadoopgroupId>
- <artifactId>hadoop-hdfsartifactId>
- <version>2.7.2version>
- <scope>providedscope>
- dependency>
- dependencies>
-
- project>
说明:
1. 在pom.xml中 需要添加的打包插件依赖, 包括两个:编译插件和打包插件
2. mainClass 全类名可以置空,提交任务时动态指定
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-pluginartifactId>
- <version>2.3.2version>
- <configuration>
- <source>1.8source>
- <target>1.8target>
- <encoding>UTF-8encoding>
- configuration>
- plugin>
- <plugin>
- <artifactId>maven-assembly-plugin artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependenciesdescriptorRef>
- descriptorRefs>
- <archive>
- <manifest>
-
- <mainClass>com.sanqian.mr.WorldCountJobmainClass>
- manifest>
- archive>
- configuration>
- <executions>
- <execution>
- <id>make-assemblyid>
- <phase>packagephase>
- <goals>
- <goal>singlegoal>
- goals>
- execution>
- executions>
- plugin>
- plugins>
- build>
使用maven进行打包会生成两个jar包,一个不包含依赖,一个包含依赖
mvn clean package -DskipTests
参数说明:
a. clean package: 表示打包之前清除掉target 目录下的jar包
b. -DskipTest : 忽略测试代码
打包之前先使用clean一下清除旧的jar包,然后再使用package进行打包
将jar包拷贝到集群中, 提交命令
hadoop jar db_bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar com.sanqian.mr.WorldCountJob /data/xxx/words.txt /data/xxx/wc_output3
参数说明:
第一个参数: 全类名,程序入口
第二个参数:HDFS输入路径
第三个参数:HDFS输出路径
解决参考:maven项目编译时提示“编码GBK的不可映射字符”_冰块的旅行的博客-CSDN博客_maven 编码gbk的不可映射字符