添加依赖 jar 包
/root/training/hadoop-2.7.3/share/hadoop/common/*.jar
/root/training/hadoop-2.7.3/share/hadoop/common/lib/*.jar
/root/training/hadoop-2.7.3/share/hadoop/mapreduce/*.jar
/root/training/hadoop-2.7.3/share/hadoop/mapreduce/lib/*.jar
WordCountMap.java
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key1, Text value1, Mapper.Context context)
throws IOException, InterruptedException {
/**
* content:map的上下文
* 上文:HDFS
* 下文:Reduce
*/
//得到数据 I Love Beijing
String data = value1.toString();
//分词
String[] words = data.split(" ");
//输出
for (String w : words){
// k2就是单词 v2:记一次数
context.write(new Text(w), new IntWritable(1));
}
}
}
WordCountReduce.java
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key3, Iterable<IntWritable> values3, Context context)
throws IOException, InterruptedException {
/**
* content是Reducer的上下文
* 上文:Map
* 下文:HDFS
*/
int total = 0;
for (IntWritable v : values3){
//求和
total = total + v.get();
}
context.write(key3, new IntWritable(total));
}
}
WordCountMain.java
public class WordCountMain {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、创建一个任务,指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class);
//2、指定任务的map和map输出的数据类型
job.setMapperClass(WordCountMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//3、指定任务的Reduce
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//4、指定任务的输入路径、任务的输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、执行任务
job.waitForCompletion(true);
}
}
java代码写完之后,将其打包成jar文件,上传到虚拟机上,用法同HDFS的jar文件一样
SQL> select deptno,sum(sal) from emp group by deptno order by deptno;
DEPTNO SUM(SAL)
10 8750
20 10875
30 9400
SalaryTotalMapper.java
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
//取数据 7654,MARTIN,SALESMAN,7698,1998/9/29,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//输出
context.write(new IntWritable(Integer.parseInt(words[7])),
new IntWritable(Integer.parseInt(words[5])));
}
}
SalaryTotalReducer.java
public class SalaryTotalReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
@Override
protected void reduce(IntWritable key3, Iterable<IntWritable> values3, Context context) throws IOException, InterruptedException {
//对v3求和
int total = 0;
for (IntWritable v : values3)
total += v.get();
//输出 k4:部门 v4:部门总工资
context.write(key3, new IntWritable(total));
}
}
SalaryTotalMain.java
public class SalaryTotalMain {
public static void main(String[] args) throws Exception {
//1、创建一个任务,指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SalaryTotalMain.class);
//2、指定任务的map和map输出的数据类型
job.setMapperClass(SalaryTotalMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
//3、指定任务的Reduce和reduce输出的数据类型
job.setReducerClass(salaryTotalReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//4、指定任务的输入路径、任务的输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、执行任务
job.waitForCompletion(true);
}
}
java代码写完之后,将其打包成jar文件,上传到虚拟机上,用法同HDFS的jar文件一样