• Hadoop3教程(十七):MapReduce之ReduceJoin案例分析


    (113)ReduceJoin案例需求分析

    现在有两个文件:

    • orders.txt,存放的是订单ID、产品ID、产品数量
    • pd.txt,这是一个产品码表,存放的是产品ID、产品中文名;

    现在是想通过join,来实现这么一个预期输出,即订单ID、产品中文名、产品数量。

    以上是本次案例需求。

    简单思考一下思路。我们需要将关联条件作为Map输出的key,将两表满足Join条件的数据以及数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。

    具体该怎么做呢?

    Map中在处理的时候,需要获取输入的文件内容和文件名(这个是可以在切片的时候获取的),然后不同文件分别做不同处理,处理完成后封装bean对象输出。

    注意,Map在输出的时候,需要以产品ID作为key,只有这样做,才能将相同产品ID的orders.txt记录和pd.txt记录,放在同一个reduceTask里,进而实现最终的替换。value的话,选择订单ID、订单数量、文件名。这里传入文件名的原因是Reduce阶段需要根据不同文件名实现不同处理,所以一定得需要传一个文件名进来。

    另外提一句,封装bean对象的时候,需要把两个文件里的所有字段合起来作为一个bean对象,这样子,orders文件的数据可以用这个bean对象,pd.txt里的数据也可以用这个bean对象。相当于做一个大宽表。

    reduce阶段就很简单了,相同产品ID的orders.txt记录和pd.txt记录,被放在同一个reduceTask里,可以把来自orders的bean放在一个集合里,来自pd的bean放在一个集合里,然后遍历set覆盖就可以。

    (114)ReduceJoin案例代码实操 - TableBean

    首先需要定义一个Bean对象,用来序列化两个输入文件的数据,我们命名为TableBean。

    package com.atguigu.mapreduce.reducejoin;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class TableBean implements Writable {
    
        private String id; //订单id
        private String pid; //产品id
        private int amount; //产品数量
        private String pname; //产品名称
        private String flag; //判断是order表还是pd表的标志字段
    
        public TableBean() {
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getPid() {
            return pid;
        }
    
        public void setPid(String pid) {
            this.pid = pid;
        }
    
        public int getAmount() {
            return amount;
        }
    
        public void setAmount(int amount) {
            this.amount = amount;
        }
    
        public String getPname() {
            return pname;
        }
    
        public void setPname(String pname) {
            this.pname = pname;
        }
    
        public String getFlag() {
            return flag;
        }
    
        public void setFlag(String flag) {
            this.flag = flag;
        }
    
        @Override
        public String toString() {
            return id + "\t" + pname + "\t" + amount;
        }
    
        // 序列化方法
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(id);
            out.writeUTF(pid);
            out.writeInt(amount);
            out.writeUTF(pname);
            out.writeUTF(flag);
        }
    
        // 反序列化方法
        // 注意,序列化的顺序必须要跟反序列化的顺序一致
        @Override
        public void readFields(DataInput in) throws IOException {
            this.id = in.readUTF();
            this.pid = in.readUTF();
            this.amount = in.readInt();
            this.pname = in.readUTF();
            this.flag = in.readUTF();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    注意,序列化的顺序必须要跟反序列化的顺序一致。

    (115)ReduceJoin案例代码实操 - TableMapper

    TableMapper的主要作用,就是将输入的数据,划分成指定的KV对,以供Reduce阶段使用。

    命名为TableMapper,获取文件名称的代码也包含在这里。

    package com.atguigu.mapreduce.reducejoin;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> {
    
        private String filename;
        private Text outK = new Text();
        private TableBean outV = new TableBean();
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //获取对应文件名称
            InputSplit split = context.getInputSplit();
            FileSplit fileSplit = (FileSplit) split;
            filename = fileSplit.getPath().getName();
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            //获取一行
            String line = value.toString();
    
            //判断是哪个文件,然后针对文件进行不同的操作
            if(filename.contains("order")){  //订单表的处理
                String[] split = line.split("\t");
                //封装outK
                outK.set(split[1]);
                //封装outV
                outV.setId(split[0]);
                outV.setPid(split[1]);
                outV.setAmount(Integer.parseInt(split[2]));
                outV.setPname("");
                outV.setFlag("order");
            }else {                             //商品表的处理
                String[] split = line.split("\t");
                //封装outK
                outK.set(split[0]);
                //封装outV
                outV.setId("");
                outV.setPid(split[0]);
                outV.setAmount(0);
                outV.setPname(split[1]);
                outV.setFlag("pd");
            }
    
            //写出KV
            context.write(outK,outV);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    (116)ReduceJoin案例代码实操 - Reducer及Driver

    主要是编写Reduce部分。Mapper之后,一组相同的key的数据会进入一个ReduceTask,接下来需要编写自定义逻辑,让Reduce可以实现关联后输出。

    需要创建两个集合,每个集合接收不同文件,一个接收order文件数据,另一个接收码表数据。然后循环遍历order集合,把码表集合里的值set进去。

    新建TableReducer:

    package com.atguigu.mapreduce.reducejoin;
    
    import org.apache.commons.beanutils.BeanUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.lang.reflect.InvocationTargetException;
    import java.util.ArrayList;
    
    public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
    
            ArrayList<TableBean> orderBeans = new ArrayList<>();
            TableBean pdBean = new TableBean();
    
            for (TableBean value : values) {
    
                //判断数据来自哪个表
                if("order".equals(value.getFlag())){   //订单表
    
    			  //创建一个临时TableBean对象接收value
                    TableBean tmpOrderBean = new TableBean();
    
                    try {
                        BeanUtils.copyProperties(tmpOrderBean,value);
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } catch (InvocationTargetException e) {
                        e.printStackTrace();
                    }
    
    			  //将临时TableBean对象添加到集合orderBeans
                    orderBeans.add(tmpOrderBean);
                }else {                                    //商品表
                    try {
                        BeanUtils.copyProperties(pdBean,value);
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } catch (InvocationTargetException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            //遍历集合orderBeans,替换掉每个orderBean的pid为pname,然后写出
            for (TableBean orderBean : orderBeans) {
    
                orderBean.setPname(pdBean.getPname());
    
    		   //写出修改后的orderBean对象
                context.write(orderBean,NullWritable.get());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    根据教程上说的,这里只有一个地方需要注意,但是用处也不是很大,就是 集合在add value的时候,不能直接add 传进来的value,而是需要重新new一个TableBean,将value值赋值给这个新的TableBean,最后add这个新的TableBean。

    这么做的原因是,传进来的values,其实是一个Iterable ,不是传统意义上的迭代器,可以简单理解成,Iterable 里的每个value用的是同一个内存地址,每次读取出value就总是赋给那个内存地址,所以不能直接add value,否则add 一百次,也只会记住最后一次add的那个value。

    这似乎是Hadoop为了避免因创建过多实例引起资源浪费,而做的优化。

    没有测过,做简单了解吧。

    最后在驱动类里注册:

    package com.atguigu.mapreduce.reducejoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class TableDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job = Job.getInstance(new Configuration());
    
            job.setJarByClass(TableDriver.class);
            job.setMapperClass(TableMapper.class);
            job.setReducerClass(TableReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(TableBean.class);
    
            job.setOutputKeyClass(TableBean.class);
            job.setOutputValueClass(NullWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:\\input"));
            FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
    
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    大功告成

    参考文献

    1. 【尚硅谷大数据Hadoop教程,hadoop3.x搭建到集群调优,百万播放】
  • 相关阅读:
    重新认识mysql
    EasyExcel入门使用教程
    虹科新闻 | 虹科电子与 Mend 正式建立合作伙伴关系
    debian 已安装命令找不到 解决方法
    OPUS 中DTX的作用
    Mybatis Plus 详解 IService、BaseMapper、自动填充、分页查询功能
    JavaSE - 深度探讨继承与多态,私有成员是否被继承问题
    基于react18+arco+zustand通用后台管理系统React18Admin
    Compose也能跨平台?Compose Multiplatform是啥?KMM又是什么?
    Babeljs简介与使用
  • 原文地址:https://blog.csdn.net/wlh2220133699/article/details/133872104