每一个
给定数据,统计每个单词出现的次数
word.txt文件
单词 数量
单数 数量
…
Mapper
1.1.将MapTask传给我们的文本内容转换成String
1.2.根据空格将每一行都切分为单词
1.3将单词输出为<单词,1>Reducer
2.1.汇总各个key的个数
2.2输出该key的总次数Driver
3.1.获取配置信息,获取job对象实例
3.2.指定本程序的jar包所在的本地路径
3.3.关联Mapper/Reducer业务类
5.4.指定Mapper输出数据的key类型
5.5.指定最终输出的数据的key类型
5.6.指定job的输入原数文件所在目录
5.7.指定job的输出结果所在目录
5.8.提交作业
可以看到run方法,有一个while循环,每次读取一个数据之后执行map方法:
所以map每次处理一行数据,map的参数就是每一个数据的key,value和context
重写map方法:
public static class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
// vlaue : atguigu atguigu
String line = value.toString();
StringTokenizer tonken = new StringTokenizer(line);
while(tonken.hasMoreElements()) {
word.set(tonken.nextToken());
context.write(word, one);
}
}
}
}
同样看run方法,也是每次处理一批数据,但不是一行,是key值相同的一批数据,因为map的时候,会执行shuffle操作,key值相同的数据会挨在一起
重写reduce方法:
public static class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable v=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// add values by key
int sum = 0;
for(IntWritable value: values) {
sum+=value.get();
}
v.set(sum);
context.write(key, v);
}
}
}
直接在main方法里定义即可
public static void main(String []args ) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://hadoop1:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("/user/pp/input/word.txt"));
FileOutputFormat.setOutputPath(job, new Path("/user/pp/output/wordCount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}