这里相较于 wordcount,新的知识点在于学生实体类的编写以及使用
数据信息:

1. Student 实体类
- import org.apache.hadoop.io.WritableComparable;
-
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
-
- public class Student implements WritableComparable
{ - // Object
- private long stuid;
- private String stuName;
- private int score;
-
- public Student(long stuid, String stuName, int score) {
- this.stuid = stuid;
- this.stuName = stuName;
- this.score = score;
- }
-
- @Override
- public String toString() {
- return "Student{" +
- "stuid=" + stuid +
- ", stuName='" + stuName + '\'' +
- ", score=" + score +
- '}';
- }
-
- public Student() {
- }
-
- public long getStuid() {
- return stuid;
- }
-
- public void setStuid(long stuid) {
- this.stuid = stuid;
- }
-
- public String getStuName() {
- return stuName;
- }
-
- public void setStuName(String stuName) {
- this.stuName = stuName;
- }
-
- public int getScore() {
- return score;
- }
-
- public void setScore(int score) {
- this.score = score;
- }
-
- // 自动整理文件格式 ctrl + shift + f 英文输放状态
- @Override
- public int compareTo(Student o) {
- return this.score > o.score ? 1 : 0;
- }
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeLong(stuid);
- dataOutput.writeUTF(stuName);
- dataOutput.writeInt(score);
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- this.stuid = dataInput.readLong();
- this.stuName = dataInput.readUTF();
- this.score = dataInput.readInt();
- }
- }
2. mapper 阶段,StudentMapper 类
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
- import java.io.IOException;
-
- /**
- * 输出 key:学生id value:Student对象
- */
- public class StudentMapper extends Mapper
{ - @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- String[] split = value.toString().split(",");
- LongWritable stuidKey = new LongWritable(Long.parseLong(split[0]));
- Student stuValue = new Student(Long.parseLong(split[0]),split[1],Integer.parseInt(split[2]));
- context.write(stuidKey,stuValue);
- }
- }
3. reduce 阶段,StudentReduce 类
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.Reducer;
-
- import java.io.IOException;
-
- public class StudentReducer extends Reducer
{ - @Override
- protected void reduce(LongWritable key, Iterable
values, Context context) throws IOException, - InterruptedException {
- Student stuOut = new Student();
- int sumScore = 0;
- String stuName = "";
- for (Student stu :
- values) {
- sumScore+=stu.getScore();
- stuName = stu.getStuName();
- }
- stuOut.setScore(sumScore);
- stuOut.setStuid(key.get());
- stuOut.setStuName(stuName);
- System.out.println(stuOut.toString());
- context.write(stuOut, NullWritable.get());
- }
- }
4. 驱动类,studentDriver 类
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- import java.io.IOException;
-
- public class StudentDriver {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
-
- job.setJarByClass(StudentDriver.class);
-
- //配置 job中map阶段处理类和map阶段的输出类型
- job.setMapperClass(StudentMapper.class);
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(Student.class);
-
-
- //配置 job中deduce阶段处理类和reduce阶段的输出类型
- job.setReducerClass(StudentReducer.class);
- job.setOutputKeyClass(Student.class);
- job.setOutputValueClass(NullWritable.class);
-
- // 输入路径配置 "hdfs://kb131:9000/kb23/hadoopstu/stuscore.csv"
- Path inpath = new Path(args[0]); // 外界获取文件输入路径
- FileInputFormat.setInputPaths(job, inpath);
- // 输出路径配置 "hdfs://kb131:9000/kb23/hadoopstu/out2"
- Path path = new Path(args[1]); //
- FileSystem fs = FileSystem.get(path.toUri(), conf);
- if (fs.exists(path))
- fs.delete(path,true);
- FileOutputFormat.setOutputPath(job,path);
-
- job.waitForCompletion(true);
- }
- }