• 94-98-Hadoop-MapReduce工作流程(重要)


    Hadoop-MapReduce工作流程(重要):

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vrVajSmi-1668950931481)(png/1624088951196.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rdTlACsf-1668950931482)(png/1624088959945.png)]

    上面的流程是整个 MapReduce 最全工作流程,但是 Shuffle 过程只是从第 7 步开始到第

    16 步结束,具体 Shuffle 过程详解,如下:

    (1)MapTask 收集我们的 map()方法输出的 kv 对,放到内存缓冲区中

    (2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

    (3)多个溢出文件会被合并成大的溢出文件

    (4)在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序

    (5)ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据

    (6)ReduceTask 会抓取到同一个分区的来自不同 MapTask 的结果文件,ReduceTask 会

    将这些文件再进行合并(归并排序)

    (7)合并成大文件后,Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过

    程(从文件中取出一个一个的键值对 Group,调用用户自定义的 reduce()方法)

    注:

    (1)Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区

    越大,磁盘 io 的次数越少,执行速度就越快。

    (2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb 默认 100M。

    Shuffle 机制

    Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NTkI6zUM-1668950931482)(png/1624089044121.png)]

    Partition 分区

    1、问题引出

    要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机

    归属地不同省份输出到不同文件中(分区)

    2、默认Partitioner分区

    public class HashPartitioner extends Partitioner {
    	public int getPartition(K key, V value, int numReduceTasks) {
    		return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 
    	} 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    默 认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个

    key存储到哪个分区。

    3、自定义Partitioner步骤

    (1)自定义类继承Partitioner,重写getPartition()方法

    public class CustomPartitioner extends Partitioner {
        @Override
        public int getPartition(Text key, FlowBean value, int numPartitions) {
            // 控制分区代码逻辑
            … …
    		return partition; 
    	} 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    (2)在Job驱动中,设置自定义Partitioner

    job.setPartitionerClass(CustomPartitioner.class);

    (3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

    job.setNumReduceTasks(5);

    4、分区总结

    (1)如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

    (2)如果1

    (3)如 果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个

    ReduceTask,最终也就只会产生一个结果文件 part-r-00000;

    (4)分区号必须从零开始,逐一累加。

    Partition 分区案例实操 (尚硅谷)

    1)需求

    将统计结果按照手机归属地不同省份输出到不同文件中(分区)

    (1)输入数据

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-k1ELV0x3-1668950931483)(png/1624089550487.png)]

    (2)期望输出数据

    手机号 136、137、138、139 开头都分别放到一个独立的 4 个文件中,其他开头的放到

    一个文件中。

    2)需求分析

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-niK3fPq2-1668950931483)(png/1624089686982.png)]

    package com.zh.mapreduce.paritioner;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class ProvincePartitioner extends Partitioner {
        @Override
        public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
            //获取手机号前三位 prePhone
            String phone = text.toString();
            String prePhone = phone.substring(0, 3);
            //定义一个分区号变量 partition,根据 prePhone 设置分区号
            int partition;
            if("136".equals(prePhone)){
                partition = 0;
            }else if("137".equals(prePhone)){
                partition = 1;
            }else if("138".equals(prePhone)){
                partition = 2;
            }else if("139".equals(prePhone)){
                partition = 3;
            }else {
                partition = 4;
            }
            //最后返回分区号 partition
            return partition;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    驱动修改

            //8 指定自定义分区器
            job.setPartitionerClass(ProvincePartitioner.class);
            //9 同时指定相应数量的 ReduceTask
            job.setNumReduceTasks(5);
    
    • 1
    • 2
    • 3
    • 4

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x6Xf2LmX-1668950931484)(png/1624090507736.png)]

    学习路径:https://space.bilibili.com/302417610/,如有侵权,请联系q进行删除:3623472230

  • 相关阅读:
    【C语言】basename函数
    汇佳学校冰球/高尔夫/影视艺术/中英朗诵均获大奖
    大数据讲课笔记1.3 Linux目录操作
    网络安全 — 零信任架构
    (附源码)小程序 音乐播放器小程序 毕业设计 170900
    计算机毕业设计(附源码)python学生宿舍管理系统
    setState到底是异步还是同步?
    JDBC学习笔记(2)事务
    抄写Linux源码(Day17:你的键盘是什么时候生效的?)
    罗克韦尔AB PLC RSLogix5000中定时器指令使用方法介绍
  • 原文地址:https://blog.csdn.net/qq_37171694/article/details/127954630