• Hadoop核心之MapReduce案例总结Ⅱ


    1. Reduce Join案例

    • 需求:将下列两个表进行合并,订单中的pid经过合并之后编程pname

    订单数据表t_order

    idpidamount
    1001011
    1002022
    1003033
    1004014
    1005025
    1006036

    商品表:

    pidpname
    01小米
    02华为
    03格力

    合并后:

    idpnameamount
    1001小米1
    1002华为2
    1003格力3
    1004小米4
    1005华为5
    1006格力6
    • 需求分析:通过将关联条件作为Map输出的key(pid),将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。

    实现代码:
    TableBean.java

    public class TableBean implements Writable {
    
        private String id; // 订单id
        private String pid; // 商品id
        private int amount; // 商品数量
        private String pname;// 商品名称
        private String flag; // 标记是什么表 order pd
    
        // 空参构造
        public TableBean() {
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getPid() {
            return pid;
        }
    
        public void setPid(String pid) {
            this.pid = pid;
        }
    
        public int getAmount() {
            return amount;
        }
    
        public void setAmount(int amount) {
            this.amount = amount;
        }
    
        public String getPname() {
            return pname;
        }
    
        public void setPname(String pname) {
            this.pname = pname;
        }
    
        public String getFlag() {
            return flag;
        }
    
        public void setFlag(String flag) {
            this.flag = flag;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(id);
            out.writeUTF(pid);
            out.writeInt(amount);
            out.writeUTF(pname);
            out.writeUTF(flag);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
    
            this.id = in.readUTF();
            this.pid = in.readUTF();
            this.amount = in.readInt();
            this.pname = in.readUTF();
            this.flag = in.readUTF();
        }
    
        @Override
        public String toString() {
            // id	pname	amount
            return  id + "\t" +  pname + "\t" + amount ;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    TableMapper.java

    public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
    
        private String fileName;
        private Text outK  = new Text();
        private TableBean outV = new TableBean();
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            // 初始化  order  pd(获取文件的名称)
            FileSplit split = (FileSplit) context.getInputSplit();
    
            fileName = split.getPath().getName();
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 1 获取一行
            String line = value.toString();
    
            // 2 判断是哪个文件的
            if (fileName.contains("order")){// 处理的是订单表
    
                String[] split = line.split("\t");
    
                // 封装k  v
                outK.set(split[1]);
                outV.setId(split[0]);
                outV.setPid(split[1]);
                outV.setAmount(Integer.parseInt(split[2]));
                outV.setPname("");
                outV.setFlag("order");
    
            }else {// 处理的是商品表
                String[] split = line.split("\t");
    
                outK.set(split[0]);
                outV.setId("");
                outV.setPid(split[0]);
                outV.setAmount(0);
                outV.setPname(split[1]);
                outV.setFlag("pd");
            }
    
            // 写出
            context.write(outK, outV);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    TableReducer.java

    public class TableReducer extends Reducer<Text, TableBean,TableBean, NullWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
    //        01 	1001	1   order
    //        01 	1004	4   order
    //        01	小米   	     pd
            // 准备初始化集合
            ArrayList<TableBean> orderBeans = new ArrayList<>();
            TableBean pdBean = new TableBean();
    
            // 循环遍历
            for (TableBean value : values) {
    
                if ("order".equals(value.getFlag())){// 订单表
    
                    TableBean tmptableBean = new TableBean();
    
                    try {
                        BeanUtils.copyProperties(tmptableBean,value);
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } catch (InvocationTargetException e) {
                        e.printStackTrace();
                    }
    
                    orderBeans.add(tmptableBean);
                }else {// 商品表
    
                    try {
                        BeanUtils.copyProperties(pdBean,value);
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } catch (InvocationTargetException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            // 循环遍历orderBeans,赋值 pdname
            for (TableBean orderBean : orderBeans) {
    
                orderBean.setPname(pdBean.getPname());
    
                context.write(orderBean,NullWritable.get());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    TableDriver.java

    public class TableDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job = Job.getInstance(new Configuration());
    
            job.setJarByClass(TableDriver.class);
            job.setMapperClass(TableMapper.class);
            job.setReducerClass(TableReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(TableBean.class);
    
            job.setOutputKeyClass(TableBean.class);
            job.setOutputValueClass(NullWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\input\\inputtable"));
            FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output12"));
    
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    2. Map Join案例

    • 需求:将下列两个表进行合并,订单中的pid经过合并之后编程pname

    订单数据表t_order

    idpidamount
    1001011
    1002022
    1003033
    1004014
    1005025
    1006036

    商品表:

    pidpname
    01小米
    02华为
    03格力

    合并后:

    idpnameamount
    1001小米1
    1002华为2
    1003格力3
    1004小米4
    1005华为5
    1006格力6
    • 需求分析:在驱动中设置缓存文件,在Map初始化阶段读取缓存文件,将商品表信息封装到一个Map集合中,在map()方法中获取数据【根据map中的pid获取pname】进行拼接

    MapJoinMapper .java

    public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    
        private HashMap<String, String> pdMap = new HashMap<>();
        private Text OutK = new Text();
    
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    
    
            //获取缓存文件
            URI[] cacheFiles = context.getCacheFiles();
    
            //获取系统对象,创建输入流
            FileSystem fs = FileSystem.get(context.getConfiguration());
            FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));
    
            BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
    
            String line;
            //将商品信息放入到Map集合中    
            while (StringUtils.isNotEmpty(line = reader.readLine())) {
    
                String[] split = line.split("\t");
                pdMap.put(split[0], split[1]);
    
            }
    
            IOUtils.closeStream(reader);
    
    
        }
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    
            //获取订单表信息,并进行赋值
            String line = value.toString();
            String[] split = line.split("\t");
            
            //将需要输出的结果进行拼接
            String pname = pdMap.get(split[1]);
            OutK.set(split[0] + "\t" + pname + "\t" + split[2]);
    
            context.write(OutK,NullWritable.get());
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    MapJoinDriver .java

    public class MapJoinDriver {
    
        public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
    
            Job job = Job.getInstance(new Configuration());
    
            job.setJarByClass(MapJoinDriver.class);
            job.setMapperClass(MapJoinMapper.class);
    
            //设置map的kv
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            job.addCacheFile(new URI("file:///D:/hadoop/input/tablecahe/pd.txt"));
            // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
            job.setNumReduceTasks(0);
    
            FileInputFormat.setInputPaths(job,new Path("D:\\hadoop\\input\\inputtable"));
            FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\output2"));
    
            boolean result = job.waitForCompletion(true);
    
            System.exit(result ? 0 : 1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    3. 数据清洗(ETL)

        在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

    • 需求:去除日志中字段个数小于等于11的日志。

    • 需求分析:需要在Map阶段对输入的数据根据规则进行过滤清洗。

    WebLogMapper.java

    public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    
            //1.获取一行数据
            String line = value.toString();
    
            //2.ETL处理(数据清洗)
            boolean result = parseLog(line, context);
    
            //3.判断是否选择输出
            if (!result) {
                return;//如果日志长度小于11,则直接返回将数据过滤
            }
    
            //4.写出
            context.write(value, NullWritable.get());
        }
    
        private boolean parseLog(String line, Context context) {
    
            String[] fields = line.split(" ");
    
            if (fields.length > 11) {
                return true;
            } else {
                return false;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    WebLogDriver.java

    public class WebLogDriver {
    
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    
            Job job = Job.getInstance(new Configuration());
    
            job.setJarByClass(WebLogDriver.class);
            job.setMapperClass(WebLogMapper.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            job.setNumReduceTasks(0);
    
            FileInputFormat.setInputPaths(job,new Path("D:\\hadoop\\input\\inputlog"));
            FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\output5"));
    
            boolean result = job.waitForCompletion(true);
    
            System.exit(result ? 0 : 1) ;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
  • 相关阅读:
    每日4道算法题——第030天
    SparkSQL
    【16】c++11新特性 —>弱引用智能指针weak_ptr(1)
    ORA-1142 signalled during: ALTER DATABASE END BACKUP...
    iOS调试技巧——使用Python 自定义LLDB
    2022谷粒商城学习笔记(二十五)支付宝沙箱模拟支付
    图像生成2
    Leetcode: 931.下降路径最小和(动态规划)
    Python | 机器学习之PCA降维
    用cpolar发布Ubuntu上的网页(2)
  • 原文地址:https://blog.csdn.net/weixin_44606952/article/details/127704507