• MR源码解析和join案例


    MR源码解析

    1. new Job(): 读取本地文件, xml配置
    2. job.start(): 启动线程
    3. job的run():线程方法
      • runTasks(): 传入对应的接口,启动map或者reduce
      • MapTask类的run(): 设置map阶段的参数,初始化任务,创建上下文对象
        • 创建读取器LineRecordReader
        • 判断是否压缩 compressFactory
        • 如果没有压缩,使用seek方法
        • mapTask的write(),进行溢写
        • mapper类的init()方法,设置溢写百分比和缓冲区大小
        • collector收集器:进行map阶段数据类型检查和分数数量检查
        • keySerializer: 进行数据的序列化,调用自己写的bean对象
        • kvmeta.put(): 写入环形缓冲区
        • mapPhase结束
      • 数据量达到缓冲区的80%,对索引进行快速排序
      • input.close():关闭输入
      • 关闭输出并同时将缓冲区数据按照分区写入磁盘。
        • 如果开启了combine,进行数据合并
      • mergePart:归并分区
      • combine第二次合并,如果溢写次数小于3就不合并了
      • collector.close():关闭环形缓冲区
    4. reduceTask的run方法
      • submit: 5个reduce并行提交
      • cLeanTask:初始化
      • shuffle类:map的排序,recuce中的归并排序
      • Merger合并器:两次归并排序,先内存归并,后磁盘归并
      • 抓取数据:可以从本地或者网络中抓取
      • sort :归并排序
      • reduce阶段:
        • 创建上下文对象
        • 调用reducer的run方法
        • real.write(): LineRecordWrite写入HDFS

    使用MR来进行拷贝去重

    1. 拷贝:values写入上下文时需要迭代遍历
    2. 去重:values写入上下文时不遍历

    使用MR来实现join操作

    在这里插入图片描述
    在这里插入图片描述

    1. 实现TableBean类,四个属性,空参构造器,get-set方法
      • write():序列化
        • out.writeUTF():该方法有换行,不会连在一起
      • readFields(): 反序列化
    2. 实现mapper类
      • setup()
        • 使用context上下文对象获取InputSplit类
        • 强制类型转换为FileSplit类
        • getPath().getName()获取文件名称
      • map()
        • 切分split
        • 封装
        • context写出
    public class TableMapper extends Mapper<LongWritable, Text, Text,TableBean> {
    
        private String filename;
    
        private Text outK;
    
        private TableBean outV;
    
        //初始化,每个文件开始一次maptask,并进行一次初始化
        //获取到文件的名称
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {
            //拿到切片信息
            FileSplit split = (FileSplit) context.getInputSplit();
    
            filename = split.getPath().getName();
    
            outK = new Text();
            outV = new TableBean();
        }
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {
            //1. 获取一行
            String line = value.toString();
    
    
            //2.判断是哪个文件的
            if(filename.contains("order")){//处理的是订单表
                String[] split = line.split("\t");
    
                //封装
                outK.set(split[1]);//pid作为key
                outV.setId(split[0]);
                outV.setPid(split[1]);
                outV.setAmount(Integer.parseInt(split[2]));
                outV.setTableName("order");
                outV.setPname("");
            }else{//处理的是商品表
                String[] split = line.split(" ");
    //            System.out.println("=========> " + Arrays.toString(split)+" <=========");
    //            System.out.println("=========> " + split[1] +" <=========");
                //封装
                outK.set(split[0]);//pid作为key
                outV.setId("");
                outV.setPid(split[0]);
                outV.setAmount(0);
                outV.setTableName("pd");
                outV.setPname(split[1]);
            }
    
            //写出
            context.write(outK, outV);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    1. 实现reduce类
      • 为了分辨map传递过来的数据是哪个表,给bean对象添加一个表名属性
      • 在mapper类中给对应表的抓取过程中添加标记
      • 在获取到value时不能直接使用等于号进行赋值,values是Iterable集合,比较特殊
      • 属性赋值工具类BeanUtils.copyProperties(dest, src);
    public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
    
        private ArrayList<TableBean> orderBeans;
        private TableBean pdBean;
    
        @Override
        protected void setup(Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {
            //1.创建集合
            orderBeans = new ArrayList<>();
            pdBean = new TableBean();
        }
    
        @Override
        protected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {
    
            orderBeans.clear();//清空集合
    
            //2.遍历赋值
            for (TableBean value : values) {
                if ("order".equals(value.getTableName())) {
                    TableBean temp = new TableBean();
                    try {
                        BeanUtils.copyProperties(temp,value);
                    } catch (IllegalAccessException e) {
                        throw new RuntimeException(e);
                    } catch (InvocationTargetException e) {
                        throw new RuntimeException(e);
                    }
                    orderBeans.add(temp);
                } else {//商品表
                    try {
                        BeanUtils.copyProperties(pdBean, value);
                    } catch (IllegalAccessException e) {
                        throw new RuntimeException(e);
                    } catch (InvocationTargetException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
    
            //循环遍历orderBeans,赋值pdname
            for (TableBean orderBean : orderBeans) {
                orderBean.setPname(pdBean.getPname());
                context.write(orderBean,NullWritable.get());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    总结:这种写法,在reduce阶段创建了对象和集合,这些方式都是比较消耗资源的,容易造成数据倾斜问题

    MR在环形缓冲区快排时倒排索引,反向溢写,会导致数据反向输出,类似栈结构的的先进后出。

  • 相关阅读:
    evnoy协议转换关键日志
    Zookeeper 节点权限控制ACL详解
    2022小米运维开发笔试1
    达索系统3DEXPERIENCE云端设计新体验
    计算机网络---网络层
    springboot 多数据源(如何连接两个数据库)
    RPA在跨境电商领域在哪些应用场景?
    漏洞扫描工具AWVS介绍及安装教程
    打造千万级流量秒杀系统第四课 系统架构:如何设计秒杀的系统架构?
    想要精通算法和SQL的成长之路 - 可以攻击国王的皇后
  • 原文地址:https://blog.csdn.net/qq_44273739/article/details/132773492