• MapReduce项目案例1


    统计单词数

    1.输入内容

    hello world i am teacher
    hello world i am teacher
    hello world i am teacher
    hello world i am teacher
    hello world
    hello world
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.输出内容

    am	4
    hello	6
    i	4
    teacher	4
    world	6
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.代码

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    public class WordCount {
    
        /**
         * map映射器:将输入<字节偏移量,一行文本>切分成<单词,次数>
         * map之前有一步split,将文本切分成<字节偏移量,一行文本>
         */
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {//<前两位表示输入类型<偏移量,一行文本>,后两位表示输出类型<单词,数字>>
    
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            /**
             * 前两个参是输入类型
             *
             * @param key     偏移量
             * @param value   一行文本,Text类型可序列化,可比较(WritableComparable接口)
             * @param context hadoop运行容器,可取出运行时的环境变量
             * @throws IOException
             * @throws InterruptedException
             */
            @Override
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
                StringTokenizer itr = new StringTokenizer(value.toString());//根据自然分隔符分割
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());//写入文本对象
                    context.write(word, one);//保存出去(单词,数字)
                }
            }
        }
    
        /**
         * combiner(单节点合并)和reduce(多节点数据合并)都是对相同键的数据进行规约
         * <前两个泛型表示规约的输入数据来源于map的输出,后两个是规约后的单词与数字>
         */
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            /**
             * @param key     单词
             * @param values  相同单词对应出现次数的集合
             *                类中泛型约束是IntWritable,为什么方法上是Iterable<IntWritable>?
             *                因为在统计之前会相同的键做成列表word [1,1,1],然后在规约word 3
             * @param context
             * @throws IOException
             * @throws InterruptedException
             */
            @Override
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {//对一个键
                    sum += val.get();//求和规约
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
            Job job = Job.getInstance(conf, "word count");//名字
            job.setJarByClass(WordCount.class);//上线的jar
            job.setMapperClass(TokenizerMapper.class);//mapper
            job.setCombinerClass(IntSumReducer.class);//combine:合并一个节点
            job.setReducerClass(IntSumReducer.class);//reduce:合并不同节点
            job.setOutputKeyClass(Text.class);//输出键的类型,与上面一致
            job.setOutputValueClass(IntWritable.class);//输出值的类型,与上面一致
            FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
            FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
           /* FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入
            FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
            System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    去除重复项

    1.输入内容

    11
    11
    11
    12
    13
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.输出内容

    11	
    12	
    13	
    
    • 1
    • 2
    • 3

    3.代码

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class RemoveRepeat {
    
        /**
         * map映射器:将输入<字节偏移量,一行文本>切分成<单词,次数>
         * map之前有一步split,将文本切分成<字节偏移量,一行文本>
         */
        public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
    
            @Override
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
                context.write(new Text(value), new Text(""));
            }
    
        }
    
        /**
         * map后的数据
         * 11 ""
         * 11 ""
         * 11 ""
         * 12 ""
         * 底层有一次合并,传给reduce的数据为
         * 11 ["","",""]
         * 12 [""]
         */
    
        public static class MyReducer extends Reducer<Text, Text, Text, Text> {
            @Override
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                context.write(key, new Text(""));
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
            Job job = Job.getInstance(conf, "remove repeat");//名字
            job.setJarByClass(RemoveRepeat.class);//上线的jar
            job.setMapperClass(MyMapper.class);//mapper
            job.setCombinerClass(MyReducer.class);//combine:合并一个节点
            job.setReducerClass(MyReducer.class);//reduce:合并不同节点
            job.setOutputKeyClass(Text.class);//输出键的类型,与上面一致
            job.setOutputValueClass(Text.class);//输出值的类型,与上面一致
            FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
            FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
            /*FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入
            FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
            System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    去重并两种方式输出序号

    1.输入内容

    6
    16
    8
    12
    5
    7
    6
    6
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2.输出内容

    1	1	5
    2	2	6
    3	5	7
    4	6	8
    5	7	12
    6	8	16
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.代码

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class Order {
    
        public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
            @Override
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
                int va = Integer.parseInt(value.toString());
                context.write(new IntWritable(va), new Text("1"));
            }
        }
    
        /**
         * map后的数据,根据key会自动排号序
         * 5    ""
         * 6    ""
         * 6    ""
         * 6    ""
         * 7    ""
         * 8    ""
         * 12   ""
         * 16   ""
         * 底层有一次合并,传给reduce的数据为
         * 5    [""]
         * 6    ["","",""]
         * 7    [""]
         * 8    [""]
         * 12   [""]
         * 16   [""]
         */
    
        public static class MyReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
            private int num1 = 1;//负责连续序号
            private int num2 = 1;//负责跳跃序号
    
            @Override
            protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                context.write(new IntWritable(num1), new Text(num2 + "\t" + key.get()));
                for (Text val : values) {
                    num2++;
                }
                num1++;
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
            Job job = Job.getInstance(conf, "order");//名字
            job.setJarByClass(Order.class);//上线的jar
            job.setMapperClass(MyMapper.class);//mapper
    
    
            /**
             * 单节点不能合并,若合并会变成  如下
             * combine完了之后的数据,也就是reduce接收的数据
             * 1    1   5
             * 2    2   6
             * 3    5   7
             * 4    6   8
             * 5    7   12
             * 6    8   16
             * 然乎reduce又会执行一次MyReduce,但是传入的结果如上,此时key已经发生了变化,按照上边算法不会再合并了******
             * 1    1   1
             * 2    2   2
             * 3    3   3
             * 4    4   4
             * 5    5   5
             * 6    6   6
             */
            //job.setCombinerClass(MyReducer.class);//combine:合并一个节点
    
    
            job.setReducerClass(MyReducer.class);//reduce:合并不同节点
            job.setOutputKeyClass(IntWritable.class);//输出键的类型,与上面一致
            job.setOutputValueClass(Text.class);//输出值的类型,与上面一致
            FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
            FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
    
            /*FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入
            FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
            System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95

    同一个学生的平均值

    1.输入内容

    张三	98
    李四	94
    王五	89
    张三	86
    李四	92
    王五	86
    张三	82
    李四	90
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2.输出内容

    张三	88.66666666666667
    李四	89.5
    王五	89.54166666666667
    
    • 1
    • 2
    • 3

    3.代码

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    public class Average {
    
        public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
            @Override
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
                StringTokenizer tokenizer = new StringTokenizer(value.toString());//根据自然分割符分割
                while (tokenizer.hasMoreTokens()) {
                    context.write(new Text(tokenizer.nextToken()), new Text(tokenizer.nextToken()));
                }
            }
        }
    
        /**
         * map后的数据,根据key会自动排号序
         * 张三	98
         * 李四	94
         * 王五	89
         * 张三	86
         * 李四	92
         * 王五	86
         * 张三	82
         * 李四	90
         * 底层有一次合并,传给reduce的数据为
         * 张三	[98,86,82]
         * 李四	[94,92,90]
         * 王五	[89,86]
         */
    
        public static class MyReducer extends Reducer<Text, Text, Text, Text> {
            private int count = 0;//记录该人有多少成绩
            private double sum = 0;//记录成绩总和
    
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                for (Text val : values) {
                    sum += Double.parseDouble((val.toString()));
                    count++;
                }
                context.write(key, new Text(String.valueOf(sum / count)));
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
            Job job = Job.getInstance(conf, "average");//名字
            job.setJarByClass(Average.class);//上线的jar
            job.setMapperClass(MyMapper.class);//mapper
            job.setCombinerClass(MyReducer.class);//combine:合并一个节点
            job.setReducerClass(MyReducer.class);//reduce:合并不同节点
            job.setOutputKeyClass(Text.class);//输出键的类型,与上面一致
            job.setOutputValueClass(Text.class);//输出值的类型,与上面一致
            FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
            FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
    
            /*FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入
            FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
            System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
  • 相关阅读:
    Swift 5.9 有哪些新特性(一)
    优秀的前端开发框架
    Java Spring Boot: 极简配置与快速开发的利器
    基于智能分析网关与EasyCVR技术的考场智能化视频监管方案
    Oracle Linux 9.2 发布 - Oracle 提供支持 RHEL 兼容发行版
    MindSpore:环境问题案例
    大一新生HTML期末作业 学生个人网页设计作业 HTML5响应式个人简历网站模板 web前端网页制作课作业
    MATLAB----矩阵求逆的123!
    使用shell脚本编写监控系统资源(CPU,内存,磁盘)使用情况
    【java计算机毕设】博客管理系统 javaweb springboot vue mysql
  • 原文地址:https://blog.csdn.net/weixin_51699336/article/details/125352483