• MapReduce 排序三种实现方式


    MapReduce 排序两种实现方式

    MapReduce 排序两种实现方式 都是借助重写 compareTo 方法 实现
    方式一:借助 MapReduce 按照 Key 排序特性,在WritableComparable实现类中 重写 compareTo 方法
    方式二:在 Reduce 阶段 cleanUp方法中将最终结果封装到实现JavaBean对象使用集合的排序方法
    方式三:在 Reduce 阶段 cleanUp方法中将最终结果封装到实现Comparable的实现类使用集合的排序方法

    方式一

    该方式需要两次 MapReduce
    第一次 MapReduce 做分类统计
    第二次 MapReduce 实现排序

    第一次 MapReduce 做分类统计

    1. 第一次 MapReduce 做分类统计 Mapper 类

      package com.lihaozhe.mapreduce.wordcount04;
      
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      
      import java.io.IOException;
      import java.util.StringTokenizer;
      
      /**
       * WordCount Map阶段
       *
       * @author 李昊哲
       * @version 1.0
       * @create 2023-11-7
       */
      public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
          Text outKey = new Text();
          IntWritable outValue = new IntWritable(1);
      
          @Override
          protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
              StringTokenizer itr = new StringTokenizer(value.toString());
              while (itr.hasMoreTokens()) {
                  String word = itr.nextToken();
                  outKey.set(word);
                  context.write(outKey, outValue);
              }
      
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
    2. 第一次 MapReduce 做分类统计 Reducer 类

      package com.lihaozhe.mapreduce.wordcount04;
      
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Reducer;
      
      import java.io.IOException;
      
      /**
       * WordCount reduce阶段
       *
       * @author 李昊哲
       * @version 1.0
       * @create 2023-11-7
       */
      public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
          @Override
          protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
              int sum = 0;
              for (IntWritable value : values) {
                  int num = value.get();
                  sum += num;
              }
              context.write(key, new IntWritable(sum));
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      1. 第一次 MapReduce 做分类统计 Job 类
      package com.lihaozhe.mapreduce.wordcount04;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hdfs.DistributedFileSystem;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      
      import java.io.IOException;
      import java.net.URI;
      
      /**
       * WordCount 驱动类
       * 上传 jar 文件到集集群运行
       * 1、打包项目生成 jar 文件
       * 2、上传 jar 文件到集群
       * 3、在集群上运行 jar 文件
       * @author 李昊哲
       * @version 1.0
       * @create 2023-11-7
       */
      public class WordCountDriver {
          public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
              // 设置环境变量 hadoop 用户名 为 root
              System.setProperty("HADOOP_USER_NAME", "root");
      
              // 参数配置对象
              Configuration conf = new Configuration();
      
              // 跨平台提交
              conf.set("mapreduce.app-submission.cross-platform", "true");
      
              // 本地运行
              // conf.set("mapreduce.framework.name", "local");
      
              // 设置默认文件系统为 本地文件系统
              // conf.set("fs.defaultFS", "file:///");
      
              // 声明Job对象 就是一个应用
              Job job = Job.getInstance(conf, "word count");
              // 指定当前Job的驱动类
              // 本地提交 注释该行
              job.setJarByClass(WordCountDriver.class);
      
              // 本地提交启用该行
              // job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");
      
              // 指定当前Job的 Mapper
              job.setMapperClass(WordCountMapper.class);
              // 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用
              job.setCombinerClass(WordCountReduce.class);
              // 指定当前Job的 Reducer
              job.setReducerClass(WordCountReduce.class);
      
              // 设置 map 输出 key 的数据类型
              job.setMapOutputValueClass(Text.class);
              // 设置 map 输出 value 的数据类型
              job.setMapOutputValueClass(IntWritable.class);
      
              // 设置最终输出 key 的数据类型
              job.setOutputKeyClass(Text.class);
              // 设置最终输出 value 的数据类型
              job.setOutputValueClass(IntWritable.class);
      
              // 定义 map 输入的路径 注意:该路径默认为hdfs路径
              FileInputFormat.addInputPath(job, new Path("/wordcount/input/wcdata.txt"));
              // 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径
              Path dst = new Path("/wordcount/result");
              // 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录
              DistributedFileSystem dfs = new DistributedFileSystem();
              String nameService = conf.get("dfs.nameservices");
              String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;
              dfs.initialize(URI.create(hdfsRPCUrl), conf);
              if (dfs.exists(dst)) {
                  dfs.delete(dst, true);
              }
      //        FileSystem fs = FileSystem.get(conf);
      //        if (fs.exists(dst)) {
      //            fs.delete(dst, true);
      //        }
              FileOutputFormat.setOutputPath(job, dst);
              // 提交 job
              // job.submit();
              System.exit(job.waitForCompletion(true) ? 0 : 1);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91

    第二次 MapReduce 实现排序

    1. 编写JavaBean实现 WritableComparable 重写 compareTo 方法

      package com.lihaozhe.mapreduce.wordcount05;
      
      import lombok.AllArgsConstructor;
      import lombok.Getter;
      import lombok.NoArgsConstructor;
      import lombok.Setter;
      import org.apache.hadoop.io.WritableComparable;
      
      import java.io.DataInput;
      import java.io.DataOutput;
      import java.io.IOException;
      
      /**
       * @author 李昊哲
       * @version 1.0.0
       * @create 2023-11-7
       */
      @Getter
      @Setter
      @NoArgsConstructor
      @AllArgsConstructor
      public class Word implements WritableComparable<Word> {
          /**
           * 单词
           */
          private String name;
          /**
           * 单词数量
           */
          private long count;
      
          @Override
          public String toString() {
              return this.name + "\t" + this.count;
          }
      
          @Override
          public int compareTo(Word word) {
              // 按照单词数量降序
              int x = (int) (word.getCount() - this.count);
              if (x != 0) {
                  return x;
              } else {
                  // 如果单词数量相同 按照单词 hash 值 升序
                  return this.name.compareTo(word.getName());
              }
          }
      
          /**
           * 序列化
           *
           * @param dataOutput 输出的数据
           * @throws IOException IOException
           */
          @Override
          public void write(DataOutput dataOutput) throws IOException {
              // 输出单词
              dataOutput.writeUTF(this.name);
              // 输出单词数量
              dataOutput.writeLong(this.count);
          }
      
          /**
           * 反序列化
           * 读取输入属性顺序与输出属性顺序一直
           *
           * @param dataInput 输入的数据
           * @throws IOException IOException
           */
          @Override
          public void readFields(DataInput dataInput) throws IOException {
              // 输出单词
              this.name = dataInput.readUTF();
              // 输出单词数量
              this.count = dataInput.readLong();
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
    2. 第二次 MapReduce Mapper 类 将实现 WritableComparable 接口的类对象,作为 map 阶段输出的 key 实现排序

      package com.lihaozhe.mapreduce.wordcount05;
      
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      
      import java.io.IOException;
      
      /**
       * WordCount Map阶段
       *
       * @author 李昊哲
       * @version 1.0
       * @create 2023-11-7
       */
      public class WordCountMapper extends Mapper<LongWritable, Text, Word, NullWritable> {
      
          private final Word outKey = new Word();
          private final NullWritable outValue = NullWritable.get();
      
          @Override
          protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Word, NullWritable>.Context context) throws IOException, InterruptedException {
              String[] split = value.toString().split("\t");
              outKey.setName(split[0]);
              outKey.setCount(Long.parseLong(split[1]));
              context.write(outKey, outValue);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
    3. 第二次 MapReduce Reducer 类 接收 map 阶段输出的 key 后,按需求格式输出

      package com.lihaozhe.mapreduce.wordcount05;
      
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.mapreduce.Reducer;
      
      import java.io.IOException;
      
      /**
       * WordCount reduce阶段
       *
       * @author 李昊哲
       * @version 1.0
       * @create 2023-11-7
       */
      public class WordCountReduce extends Reducer<Word, NullWritable, Word, NullWritable> {
          private final NullWritable outValue = NullWritable.get();
      
          @Override
          protected void reduce(Word key, Iterable<NullWritable> values, Reducer<Word, NullWritable, Word, NullWritable>.Context context) throws IOException, InterruptedException {
              context.write(key, outValue);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
    4. 第二次 MapReduce job 类

      package com.lihaozhe.mapreduce.wordcount05;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hdfs.DistributedFileSystem;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      
      import java.io.IOException;
      import java.net.URI;
      
      /**
       * WordCount 驱动类
       * 上传 jar 文件到集集群运行
       * 1、打包项目生成 jar 文件
       * 2、上传 jar 文件到集群
       * 3、在集群上运行 jar 文件
       *
       * @author 李昊哲
       * @version 1.0
       * @create 2023-11-7
       */
      public class WordCountDriver {
          public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
              // 设置环境变量 hadoop 用户名 为 root
              System.setProperty("HADOOP_USER_NAME", "root");
      
              // 参数配置对象
              Configuration conf = new Configuration();
      
              // 跨平台提交
              conf.set("mapreduce.app-submission.cross-platform", "true");
      
              // 本地运行
              // conf.set("mapreduce.framework.name", "local");
      
              // 设置默认文件系统为 本地文件系统
              // conf.set("fs.defaultFS", "file:///");
      
              // 声明Job对象 就是一个应用
              Job job = Job.getInstance(conf, "word count sort");
              // 指定当前Job的驱动类
              // 本地提交 注释该行
              job.setJarByClass(WordCountDriver.class);
      
              // 本地提交启用该行
              // job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");
      
              // 指定当前Job的 Mapper
              job.setMapperClass(WordCountMapper.class);
              // 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用
              // job.setCombinerClass(WordCountReduce.class);
              // 指定当前Job的 Reducer
              job.setReducerClass(WordCountReduce.class);
      
              // 设置 reduce 数量 为 0
              // job.setNumReduceTasks(0);
      
              // 设置 map 输出 key 的数据类型
              job.setMapOutputValueClass(Word.class);
              // 设置 map 输出 value 的数据类型
              job.setMapOutputValueClass(NullWritable.class);
      
              // 设置最终输出 key 的数据类型
              job.setOutputKeyClass(Word.class);
              // 设置最终输出 value 的数据类型
              job.setOutputValueClass(NullWritable.class);
      
              // 定义 map 输入的路径 注意:该路径默认为hdfs路径
              FileInputFormat.addInputPath(job, new Path("/wordcount/result/part-r-00000"));
              // 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径
              Path dst = new Path("/wordcount/sort");
              // 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录
              DistributedFileSystem dfs = new DistributedFileSystem();
              String nameService = conf.get("dfs.nameservices");
              String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;
              dfs.initialize(URI.create(hdfsRPCUrl), conf);
              if (dfs.exists(dst)) {
                  dfs.delete(dst, true);
              }
      //        FileSystem fs = FileSystem.get(conf);
      //        if (fs.exists(dst)) {
      //            fs.delete(dst, true);
      //        }
              FileOutputFormat.setOutputPath(job, dst);
              // 提交 job
              // job.submit();
              System.exit(job.waitForCompletion(true) ? 0 : 1);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94

    方式二

    该方式有两次 重写 compareTo 方法

    • JavaBean 实现 Comparable 重写 compareTo 方法
    • lambada 表达式 重写 compareTo 方法
      本案例使用 lambada 表达式 重写 compareTo 方法
    1. 编写 JavaBean

      package com.lihaozhe.mapreduce.wordcount06;
      
      import lombok.AllArgsConstructor;
      import lombok.Getter;
      import lombok.NoArgsConstructor;
      import lombok.Setter;
      import org.apache.hadoop.io.Writable;
      
      import java.io.DataInput;
      import java.io.DataOutput;
      import java.io.IOException;
      
      /**
       * @author 李昊哲
       * @version 1.0.0
       * @create 2023-11-7
       */
      @Getter
      @Setter
      @NoArgsConstructor
      @AllArgsConstructor
      public class Word {
          /**
           * 单词
           */
          private String name;
          /**
           * 单词数量
           */
          private int count;
      
          @Override
          public String toString() {
              return this.name + "\t" + this.count;
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
    2. 编写 Mapper 类

      package com.lihaozhe.mapreduce.wordcount06;
      
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      
      import java.io.IOException;
      import java.util.StringTokenizer;
      
      /**
       * WordCount Map阶段
       *
       * @author 李昊哲
       * @version 1.0
       * @create 2023-11-7
       */
      public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
          Text outKey = new Text();
          IntWritable outValue = new IntWritable(1);
      
          @Override
          protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
              StringTokenizer itr = new StringTokenizer(value.toString());
              while (itr.hasMoreTokens()) {
                  String word = itr.nextToken();
                  outKey.set(word);
                  context.write(outKey, outValue);
              }
      
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
    3. 编写 Reducer 类

      package com.lihaozhe.mapreduce.wordcount06;
      
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Reducer;
      
      import java.io.IOException;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.stream.Collectors;
      
      /**
       * WordCount reduce阶段
       *
       * @author 李昊哲
       * @version 1.0
       * @create 2023-11-7
       */
      public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
          private final List<Word> words = new ArrayList<>();
      
          @Override
          protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
              int sum = 0;
              for (IntWritable value : values) {
                  int num = value.get();
                  sum += num;
              }
              Word word = new Word(key.toString(), sum);
              words.add(word);
          }
      
          @Override
          protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
              List<Word> list = words.stream().sorted((a, b) -> {
                  int x = b.getCount() - a.getCount();
                  if (x != 0) {
                      return x;
                  } else {
                      return a.getName().compareTo(b.getName());
                  }
              }).collect(Collectors.toList());
              Text outKey = new Text();
              IntWritable outValue = new IntWritable();
              for (Word word : list) {
                  outKey.set(word.getName());
                  outValue.set(word.getCount());
                  context.write(outKey, outValue);
              }
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
    4. 编写 Job 类

      package com.lihaozhe.mapreduce.wordcount06;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hdfs.DistributedFileSystem;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      
      import java.io.IOException;
      import java.net.URI;
      
      /**
       * WordCount 驱动类
       * 上传 jar 文件到集集群运行
       * 1、打包项目生成 jar 文件
       * 2、上传 jar 文件到集群
       * 3、在集群上运行 jar 文件
       *
       * @author 李昊哲
       * @version 1.0
       * @create 2023-11-7
       */
      public class WordCountDriver {
          public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
              // 设置环境变量 hadoop 用户名 为 root
              System.setProperty("HADOOP_USER_NAME", "root");
      
              // 参数配置对象
              Configuration conf = new Configuration();
      
              // 跨平台提交
              conf.set("mapreduce.app-submission.cross-platform", "true");
      
              // 本地运行
              // conf.set("mapreduce.framework.name", "local");
      
              // 设置默认文件系统为 本地文件系统
              // conf.set("fs.defaultFS", "file:///");
      
              // 声明Job对象 就是一个应用
              Job job = Job.getInstance(conf, "word count sort");
              // 指定当前Job的驱动类
              // 本地提交 注释该行
              job.setJarByClass(WordCountDriver.class);
      
              // 本地提交启用该行
              // job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");
      
              // 指定当前Job的 Mapper
              job.setMapperClass(WordCountMapper.class);
              // 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用
              // job.setCombinerClass(WordCountReduce.class);
              // 指定当前Job的 Reducer
              job.setReducerClass(WordCountReduce.class);
      
              // 设置 reduce 数量 为 0
              // job.setNumReduceTasks(0);
      
              // 设置 map 输出 key 的数据类型
              job.setMapOutputValueClass(Text.class);
              // 设置 map 输出 value 的数据类型
              job.setMapOutputValueClass(IntWritable.class);
      
              // 设置最终输出 key 的数据类型
              job.setOutputKeyClass(Text.class);
              // 设置最终输出 value 的数据类型
              job.setOutputValueClass(IntWritable.class);
      
              // 定义 map 输入的路径 注意:该路径默认为hdfs路径
              FileInputFormat.addInputPath(job, new Path("/wordcount/input/wcdata.txt"));
              // 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径
              Path dst = new Path("/wordcount/sort");
              // 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录
              DistributedFileSystem dfs = new DistributedFileSystem();
              String nameService = conf.get("dfs.nameservices");
              String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;
              dfs.initialize(URI.create(hdfsRPCUrl), conf);
              if (dfs.exists(dst)) {
                  dfs.delete(dst, true);
              }
      //        FileSystem fs = FileSystem.get(conf);
      //        if (fs.exists(dst)) {
      //            fs.delete(dst, true);
      //        }
              FileOutputFormat.setOutputPath(job, dst);
              // 提交 job
              // job.submit();
              System.exit(job.waitForCompletion(true) ? 0 : 1);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96

    方式三

    该方式有两次 重写 compareTo 方法

    • JavaBean 实现 Comparable 重写 compareTo 方法
    • lambada 表达式 重写 compareTo 方法
      本案例使用 JavaBean 实现 Comparable 重写 compareTo 方法
    1. 编写 JavaBean

      package com.lihaozhe.mapreduce.wordcount07;
      
      import lombok.AllArgsConstructor;
      import lombok.Getter;
      import lombok.NoArgsConstructor;
      import lombok.Setter;
      
      /**
       * @author 李昊哲
       * @version 1.0.0
       * @create 2023-11-7
       */
      @Getter
      @Setter
      @NoArgsConstructor
      @AllArgsConstructor
      public class Word implements Comparable<Word> {
          /**
           * 单词
           */
          private String name;
          /**
           * 单词数量
           */
          private int count;
      
          @Override
          public String toString() {
              return this.name + "\t" + this.count;
          }
      
          @Override
          public int compareTo(Word word) {
              // 按照单词数量降序
              int x = word.getCount() - this.count;
              if (x != 0) {
                  return x;
              } else {
                  // 如果单词数量相同 按照单词 hash 值 升序
                  return this.name.compareTo(word.getName());
              }
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
    2. 编写 Mapper 类

      package com.lihaozhe.mapreduce.wordcount07;
      
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      
      import java.io.IOException;
      import java.util.StringTokenizer;
      
      /**
       * @author 李昊哲
       * @version 1.0.0
       * @create 2023-11-7
       */
      public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
          //    @Override
          //    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
          //        // 获取 输入内容
          //        String line = value.toString();
          //        // 将输入内容存储到字符串数组当中
          //        String[] words = line.split(" ");
          //        // 输出的 key
          //        Text text = new Text();
          //        // 输出的 value
          //        IntWritable intWritable = new IntWritable();
          //        // 遍历每一个单词 封装后输出
          //        for (String word : words) {
          //            // ELT 将数据封装为 自定义格式
          //            // context.write(new Text(word),new IntWritable(1));
          //            // 为输出的 key 赋值
          //            text.set(word);
          //            // 为输出的 value 赋值
          //            intWritable.set(1);
          //            // ELT 将数据封装为 自定义格式
          //            context.write(text, intWritable);
          //        }
          //
          //    }
          @Override
          protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
              // 获取 输入内容
              String line = value.toString();
              // 输出的 key
              Text text = new Text();
              // 输出的 value
              IntWritable intWritable = new IntWritable();
      
              StringTokenizer itr = new StringTokenizer(line);
              while (itr.hasMoreTokens()) {
                  String word = itr.nextToken();
                  // ELT 将数据封装为 自定义格式
                  // context.write(new Text(word),new IntWritable(1));
                  // 为输出的 key 赋值
                  text.set(word);
                  // 为输出的 value 赋值
                  intWritable.set(1);
                  // ELT 将数据封装为 自定义格式
                  context.write(text, intWritable);
              }
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
    3. 编写 Reducer 类

      package com.lihaozhe.mapreduce.wordcount07;
      
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Reducer;
      
      import java.io.IOException;
      import java.util.Set;
      import java.util.TreeSet;
      
      /**
       * @author 李昊哲
       * @version 1.0.0
       * @create 2023-11-7
       */
      public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
          // 将单词 存入set中排序
          private final Set<Word> words = new TreeSet<>();
      
          @Override
          protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
              // 单词数量累加的和
              int sum = 0;
              // 获取单词标记迭代器
              for (IntWritable value : values) {
                  // 单词标记累加得到单词数量
                  sum += value.get();
              }
              // 将 单词 和 单词数量 封装到 word 对象中
              Word word = new Word(key.toString(), sum);
              // 将 word 对象存入 TreeSet 进行排序
              words.add(word);
          }
      
          @Override
          protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
              for (Word word : words) {
                  // 计算结果持久化
                  context.write(new Text(word.getName()), new IntWritable(word.getCount()));
              }
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
    4. 编写 Job 类

      package com.lihaozhe.mapreduce.wordcount07;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hdfs.DistributedFileSystem;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      
      import java.io.IOException;
      import java.net.URI;
      
      /**
       * @author 李昊哲
       * @version 1.0.0
       * @create 2023-11-7
       */
      public class WordCountDriver {
          public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
              // 设置环境变量 hadoop 用户名 为 root
              System.setProperty("HADOOP_USER_NAME", "root");
              // 参数配置对象
              Configuration conf = new Configuration();
              // 跨平台提交
              conf.set("mapreduce.app-submission.cross-platform", "true");
              // 声明Job对象 就是一个应用
              Job job = Job.getInstance(conf, "word count");
              // 指定当前Job的驱动类
              // 本地提交 注释该行
              job.setJarByClass(WordCountDriver.class);
              // 本地提交启用该行
              // job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");
              // 指定当前Job的 Mapper和Reducer
              job.setMapperClass(WordCountMapper.class);
              // job.setCombinerClass(WordCountReduce.class);
              job.setReducerClass(WordCountReduce.class);
              // 指定输出 key 的数据类型
              job.setOutputKeyClass(Text.class);
              // 指定输出 value 的数据类型
              job.setOutputValueClass(IntWritable.class);
      
              // FileInputFormat.addInputPath(job, new Path(args[0]));
              // FileOutputFormat.setOutputPath(job, new Path(args[1]));
              // 定义读取数据的路径 注意:该路径为hdfs路径
              FileInputFormat.addInputPath(job, new Path("/wordcount/input/wcdata.txt"));
              // 定义输出数据持久化的路径 注意:该路径为hdfs路径
              Path dst = new Path("/wordcount/sort");
              DistributedFileSystem dfs = new DistributedFileSystem();
              String nameService = conf.get("dfs.nameservices");
              String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;
              dfs.initialize(URI.create(hdfsRPCUrl), conf);
              if (dfs.exists(dst)) {
                  dfs.delete(dst, true);
              }
      //        FileSystem fs = FileSystem.get(conf);
      //        if (fs.exists(dst)) {
      //            fs.delete(dst, true);
      //        }
              FileOutputFormat.setOutputPath(job, dst);
              System.exit(job.waitForCompletion(true) ? 0 : 1);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
  • 相关阅读:
    伪代码实现几种常见的时间复杂度算法
    如何实现矩阵的重采样问题
    ppt技能提升
    Java 并发编程解析 | 如何正确理解Java领域中的并发锁,我们应该具体掌握到什么程度?
    rabbitmq跟着b站尚硅谷老师学习
    SVM-支持向量机实验分析(软硬间隔,线性核,高斯核)
    详解:网络虚拟化卸载加速技术的演进
    WPF —— MVVM架构
    【测试总结】测试时如何定位一个bug?是前端还是后端?
    绿洲艺术学院网站
  • 原文地址:https://blog.csdn.net/qq_24330181/article/details/134320135