• Hadoop核心之MapReduce案例总结


    1. WordCount案例

    需求:在给定的文本文件中统计输出每一个单词出现的总次数
    (1)输入数据

    在这里插入图片描述

    (2)期望输出数据

    在这里插入图片描述

    2)需求分析
    按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。

    代码:
    Mapper:

    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        private Text outK = new Text();
        private IntWritable outV = new IntWritable(1);
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    
            //1.获取一行数据
            String line = value.toString();
    
            //2.对获取文本中的一行单词进行切割【分为一个单词一个单词】
            String[] words = line.split(" ");
    
            //3.循环写出
            for (String word : words) {
    
                //封装
                outK.set(word);
                //写出
                context.write(outK, outV);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    Reducer:

    public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    
        private  IntWritable outV = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text,
                IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    
            int sum = 0;//记录每一个单词出现的次数
            //lcl(1,1)
            for (IntWritable value : values) {
    
                sum += value.get();//将IntWritable转化为int
    
            }
    
            outV.set(sum);
            //写入
            context.write(key,outV);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    Driver:

    public class WordCountDriver {
    
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    
            //1 获取Job
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            //2. 设置jar包路径
            job.setJarByClass(WordCountDriver.class);
    
            //3.关联mapper和reduce
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReduce.class);
    
            //4.设置mapper的输出路径
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //5.设置最终输出路径
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //6.设置输入路径和输出路径
            FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\input\\input1"));
            FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output222"));
    //        FileInputFormat.setInputPaths(job, new Path(args[0]));
    //        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            //7.提交job
            boolean result = job.waitForCompletion(true);
    
            System.exit(result ? 0 : 1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    2.Partition分区案例

    1、需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
    (1)输入数据
    在这里插入图片描述

    (2)期望输出数据
    手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

    FlowBean
    (由于需要将手机号,上行下行总流量作为key输出,所以需要进行封装)

    public class FlowBean implements Writable {
    
        //定义上行流量、下行流量、总流量
        private long upFlow;
        private long downFlow;
        private long sumFlow;
    
    
        //空参构造
        public FlowBean() {
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        public void setSumFlow() {
            this.sumFlow = this.upFlow + this.downFlow;
        }
    
        //序列化方法(将内存中的对象转化为字节码文件,方便在两个不同的服务器之间进行传输)
        @Override
        public void write(DataOutput out) throws IOException {
    
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        //反序列化方法(在对另一台服务器传过来的字节码文件进行反序列化,转化为内存中的对象)
        @Override
        public void readFields(DataInput in) throws IOException {
    
            this.upFlow = in.readLong();
            this.downFlow = in.readLong();
            this.sumFlow = in.readLong();
        }
    
        @Override
        public String toString() {
            return upFlow + "\t" + downFlow + "\t" + sumFlow;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    Mapper:

    public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> {
    
    
        private Text outK = new Text();
        private FlowBean outV = new FlowBean();
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
    
            //1、获取一行数据
            //1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
            String line = value.toString();
    
            //2、分割数据
            String[] split = line.split("\t");
    
            //3、抓取想要的数据
            //手机号:13736230513
            //上行流量和下行流量:2481	24681
            String phone = split[1];
            String up = split[split.length - 3];
            String down = split[split.length - 2];
    
            //4、封装数据(封装时由于输出的key对应的数据类型是Text,所以不需要进行转化,
            // 但是输出的value,FlowBean对象属性是long类型,所以需要进行转化)
            outK.set(phone);
            outV.setUpFlow(Long.parseLong(up));
            outV.setDownFlow(Long.parseLong(down));
            outV.setSumFlow();
    
            //5、写出outK和outV
            context.write(outK,outV);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    Reducer:

    public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {
    
        private FlowBean outV = new FlowBean();
    
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
    
            //1、将所有value对象中的上行流量和下行流量进行累加
            long totalUp = 0;
            long tatalDown = 0;
    
            for (FlowBean flowBean : values) {
                totalUp += flowBean.getUpFlow();
                tatalDown += flowBean.getDownFlow();
            }
    
            //2、封装数据key value(由于输出的value是一个FlowBean,所以需要创建一个FlowBean对象)
            outV.setUpFlow(totalUp);
            outV.setDownFlow(tatalDown);
            outV.setSumFlow();
    
            //3、写出数据
            context.write(key,outV);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    ProvincePartitioner:(分区)

    public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
        @Override
        public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
    
            //实现分区
            String phone = text.toString();
    
            String prePhone = phone.substring(0, 3);
    
            int partition = 0;
    
            if ("136".equals(prePhone)){
                partition = 0;
            }else if ("137".equals(prePhone)){
                partition = 1;
            }else if ("138".equals(prePhone)){
                partition = 2;
            }else if ("139".equals(prePhone)){
                partition = 3;
            }else{
                partition = 4;
            }
    
            return partition;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    Driver:(驱动)

    public class FlowDriver {
    
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    
            //1、获取job
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            //2、设置jar包路径
            job.setJarByClass(FlowDriver.class);
    
            //3、关联mapper和reducer
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReducer.class);
    
            //4、设置mapper输出的key和value类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
    
            //5、设置最终输出的key和value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
            job.setPartitionerClass(ProvincePartitioner.class);
            job.setNumReduceTasks(5);
    
            //6、设置数据的输入路径和输出路径
            FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\input\\input2"));
            FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output7"));
    
            //7、提交job
            boolean result = job.waitForCompletion(true);
    
            System.exit(result ? 0 : 1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    3.全排序样例

    需求
    根据案例序列化案例产生的结果再次对总流量进行倒序排序。
    (1)输入数据
    原始数据
    在这里插入图片描述
    第一次处理后的数据(按照手机号进行排序此过程和上一步骤一样,只是不设置分区即可)在这里插入图片描述

    (2)期望输出数据
    13509468723 7335 110349 117684
    13736230513 2481 24681 27162
    13956435636 132 1512 1644
    13846544121 264 0 264

    相比较上一个程序只有Bean代码不一样:

    /**
     * 1、定义类实现Writable类
     * 2、重写序列化和反序列化方法
     * 3、创建无参构造器
     * 4、toString()方法
     * 5、如果自定义的bean需要放在key中传输,还需要实现compareTo方法
     */
    public class FlowBean implements WritableComparable<FlowBean> {
    
        //定义上行流量、下行流量、总流量
        private long upFlow;
        private long downFlow;
        private long sumFlow;
    
    
        //空参构造
        public FlowBean() {
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        public void setSumFlow() {
            this.sumFlow = this.upFlow + this.downFlow;
        }
    
        //序列化方法(将内存中的对象转化为字节码文件,方便在两个不同的服务器之间进行传输)
        @Override
        public void write(DataOutput out) throws IOException {
    
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        //反序列化方法(在对另一台服务器传过来的字节码文件进行反序列化,转化为内存中的对象)
        @Override
        public void readFields(DataInput in) throws IOException {
    
            this.upFlow = in.readLong();
            this.downFlow = in.readLong();
            this.sumFlow = in.readLong();
        }
    
        @Override
        public String toString() {
            return upFlow + "\t" + downFlow + "\t" + sumFlow;
        }
    
        @Override
        public int compareTo(FlowBean o) {
    
            //实现总流量倒叙排序
            if (this.sumFlow > o.sumFlow) {
                return -1;
            } else if (this.sumFlow < o.sumFlow) {
                return 1;
            } else {
                //总流量相同的条件下,按照上行流量正序排序
                if (this.upFlow > o.upFlow) {
                    return 1;
                } else if (this.upFlow < o.upFlow) {
                    return -1;
                } else {
                    return 0;
                }
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
  • 相关阅读:
    【力扣-每日一题】714. 买卖股票的最佳时机含手续费
    数据挖掘算法原理与实践:k-均值
    LabVIEW调用MathScript Window
    深眸科技革新升级OCR技术,与AI视觉实现有效融合赋能各行业应用
    Splunk的转发器扮演什么角色?
    30 个常用的 Linux 命令!
    (附源码)springboot车辆管理系统 毕业设计 031034
    分布式 ID 的实现方案——Java全栈知识(13)
    notification控件 通知栏
    2022高教社杯数学建模比赛论文资料1.0
  • 原文地址:https://blog.csdn.net/weixin_44606952/article/details/127678430