在运行程序之前,需要启动Hadoop,命令如下:
cd /usr/local/hadoop
./sbin/start-dfs.sh
在启动Hadoop之后,需要首先删除HDFS中与当前Linux用户hadoop对应的input和output目录(即HDFS中的“/user/hadoop/input”和“/user/hadoop/output”目录),这样确保后面程序运行不会出现问题,具体命令如下:
cd /usr/local/hadoop
./bin/hdfs dfs -rm -r input
./bin/hdfs dfs -rm -r output
然后,再在HDFS中新建与当前Linux用户hadoop对应的input目录,即“/user/hadoop/input”目录,具体命令如下:
cd /usr/local/hadoop
./bin/hdfs dfs -mkdir input
生成成绩文件
import random
dic=['Alice','Bob','Charile','Delta','Firefox','Golf']
for i in range(1,6):
f = open(str(i)+'.txt','w')
for i in range(5):
f.write(dic[i]+" "+str(random.randint(60, 100))+'\n')
f.close()
然后,把成绩文件,上传到HDFS中的“/user/hadoop/input”目录下,命令如下:
./bin/hdfs dfs -put /usr/local/hadoop/1.txt
./bin/hdfs dfs -put /usr/local/hadoop/2.txt
./bin/hdfs dfs -put /usr/local/hadoop/3.txt
./bin/hdfs dfs -put /usr/local/hadoop/4.txt
./bin/hdfs dfs -put /usr/local/hadoop/5.txt
如果HDFS中已经存在目录“/user/hadoop/output”,则使用如下命令删除该目录:
cd /usr/local/hadoop
./bin/hdfs dfs -rm -r /user/hadoop/output
打包jar文件
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount{
public WordCount() {
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount [...] " );
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true)?0:1);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString(); //将输入的纯文本文件的数据转化成String
StringTokenizer tokenizerArticle = new StringTokenizer(line,"\n");
//分别对每一行进行处理
while(tokenizerArticle.hasMoreTokens()){
//每行按空格划分
StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
String strName = tokenizerLine.nextToken(); //学生姓名部分
String strScore = tokenizerLine.nextToken();//成绩部分
Text name = new Text(strName);//学生姓名
int scoreInt = Integer.parseInt(strScore);//学生成绩score of student
context.write(name, new IntWritable(scoreInt));//输出姓名和成绩
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
int count=0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
sum += iterator.next().get();//计算总分
count++;//统计总的科目数
}
int average = (int) sum/count;//计算平均成绩
context.write(key, new IntWritable(average));
}
}
}
现在,就可以在Linux系统中,使用hadoop jar命令运行程序,命令如下:
cd /usr/local/hadoop
./bin/hadoop jar ./myapp/WordCount.jar input output
上面命令执行以后,当运行顺利结束时,屏幕上会显示类似如下的信息:
……//这里省略若干屏幕信息
2020-01-27 10:10:55,157 INFO mapreduce.Job: map 100% reduce 100%
2020-01-27 10:10:55,159 INFO mapreduce.Job: Job job_local457272252_0001 completed successfully
2020-01-27 10:10:55,174 INFO mapreduce.Job: Counters: 35
File System Counters
FILE: Number of bytes read=115463648
FILE: Number of bytes written=117867638
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=283
HDFS: Number of bytes written=40
HDFS: Number of read operations=24
HDFS: Number of large read operations=0
HDFS: Number of write operations=5
Map-Reduce Framework
Map input records=9
Map output records=24
Map output bytes=208
Map output materialized bytes=140
Input split bytes=236
Combine input records=24
Combine output records=12
Reduce input groups=6
Reduce shuffle bytes=140
Reduce input records=12
Reduce output records=6
Spilled Records=24
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=0
Total committed heap usage (bytes)=1291321344
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=113
File Output Format Counters
Bytes Written=40
词频统计结果已经被写入了HDFS的“/user/hadoop/output”目录中,可以执行如下命令查看词频统计结果:
cd /usr/local/hadoop
./bin/hdfs dfs -cat output/*
上面命令执行后,会在屏幕上显示如下词频统计结果:
Alice 75
Bob 79
Charile 73
Delta 81
Firefox 83
至此,词频统计程序顺利运行结束。需要注意的是,如果要再次运行WordCount.jar,需要首先删除HDFS中的output目录,否则会报错。