MapReduce是一种分布式计算模型,主要用于搜索领域,解决海量数据的计算问题. 由两个阶段组成:Map和Reduce,
Map阶段对数据集上的独立元素进行指定操作,生成键值对形式中间结果;Reduce则对中间结果中相同的值进行规约,以得到最终的结果
目标:对文件hello.txt中单词进行统计
- public static class MyMapper extends Mapper
{ -
- @Override
- protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
- //接受传入进来的一行文件,把数据类型转成String,并把这行内容按照分割符切割
- String[] words = v1.toString().split("");
- //遍历数组,每出现一个单词就标志一个数组1,列入<单词,1>
- for (String word : words) {
- Text k2 = new Text(word);
- LongWritable V2 = new LongWritable(1L);
- //使用context,把map阶段处理的数据发送给Reduce阶段作为输入数据
- context.write(k2, V2);
- }
- }
- }
- public static class MyReducer extends Reducer
{ -
- @Override
- protected void reduce(Text k2, Iterable
v2s, Context context) throws IOException, InterruptedException { - //定义一个计数器
- long sum = 0L;
- //遍历一组迭代器,把每一个数量1累加起来构成了单词的总次数
- for (LongWritable v2 : v2s) {
- sum += v2.get();
- }
- Text k3 = k2;
- LongWritable v3 = new LongWritable(sum);
- context.write(k3, v3);
- }
- }
-
- public class WordCountJob {
-
-
- public static void main(String[] args) {
-
- if (args.length != 2) {
- System.exit(100);
- }
- try {
- Configuration conf = new Configuration();
- Job job = null;
- try {
- job = Job.getInstance(conf);
- } catch (IOException e) {
- e.printStackTrace();
- }
- //指定job jar运行的主类
- job.setJarByClass(WordCountJob.class);
-
-
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- //指定本次mr所有的mapper reducer类
- job.setMapperClass(MyMapper.class);
- job.setReducerClass(MyReducer.class);
-
- //指定业务逻辑mapper类的输出key和value的类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
-
- //指定业务逻辑reducer类的输出key和value的类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- job.waitForCompletion(true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
接下来就可以向集群提交MapReduce任务了 具体的命令是这样的
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar hadoop.WordCountJob /test/hello.txt /out
任务提交到集群上面之后,可以在shell窗口中看到如下日志信息,最终map执行到100%,reduce执行 到100%,说明任务执行成功了。
当然了,也可以到web界面中查看任务执行情况。
最终结果:
解决方案:
在yarn-site.xml添加
yarn.application.classpath
/data/soft/hadoop-3.2.0/etc/hadoop,
/data/soft/hadoop-3.2.0/share/hadoop/common/*,
/data/soft/hadoop-3.2.0/share/hadoop/common/lib/*,
/data/soft/hadoop-3.2.0/share/hadoop/hdfs/*,
/data/soft/hadoop-3.2.0/share/hadoop/hdfs/lib/*,
/data/soft/hadoop-3.2.0/share/hadoop/mapreduce/*,
/data/soft/hadoop-3.2.0/share/hadoop/mapreduce/lib/*,
/data/soft/hadoop-3.2.0/share/hadoop/yarn/*,
/data/soft/hadoop-3.2.0/share/hadoop/yarn/lib/*
原因:本应该是:job.setMapOutputValueClass(LongWritable.class); 误写成job.setMapOutputKeyClass(LongWritable.class);这块需要注意