• 99-104-Hadoop-MapReduce-排序:


    99-Hadoop-MapReduce-排序:

    WritableComparable 排序

    排序是MapReduce框架中最重要的操作之一。 MapTask和ReduceTask均会对数据按 照key进行排序。该操作属于

    Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。 默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使 用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数 据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。 对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到

    一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完 毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

    (1)部分排序

    MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

    (2)全排序

    最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

    (3)辅助排序:(GroupingComparator分组)

    在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

    (4)二次排序

    在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

    WritableComparable 排序案例实操(全排序)(尚硅谷案例测试)

    1)需求

    根据案例 2.3 序列化案例产生的结果再次对总流量进行倒序排序。

    (1)输入数据

    原始数据 第一次处理后的数据

    phone_data .txt part-r-00000(输入数据)

    (2)期望输出数据

    13509468723 7335 110349 117684

    13736230513 2481 24681 27162

    13956435636 132 1512 1644

    13846544121 264 0 264

    。。。 。。。

    2)需求分析

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hRoY7A3P-1668951117055)(png/1624097244824.png)]

    代码地址:https://gitee.com/HaoZhouRS/bigdata-study-code/tree/master/big-data-study/MapReduce-Demom/src/main/java/com/zh/mapreduce/writableCompare

    WritableComparable 排序案例实操(区内排序)

    1)需求

    要求每个省份手机号输出的文件中按照总流量内部排序。

    2)需求分析

    基于前一个需求,增加自定义分区类,分区按照省份手机号设置。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xIBmwmgT-1668951117057)(png/1624103267523.png)]

    3)案例实操

    (1)增加自定义分区类

    package com.zh.mapreduce.partitionandwritableComparable;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    public class ProvincePartitionerTwo extends Partitioner {
        @Override
        public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
            //获取手机号前三位
            String phone = text.toString();
            String prePhone = phone.substring(0, 3);
            //定义一个分区号变量 partition,根据 prePhone 设置分区号
            int partition;
            switch (prePhone) {
                case "136":
                    partition = 0;
                    break;
                case "137":
                    partition = 1;
                    break;
                case "138":
                    partition = 2;
                    break;
                case "139":
                    partition = 3;
                    break;
                default:
                    partition = 4;
                    break;
            }
            //最后返回分区号 partition
            return partition;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    (2)在驱动类中添加分区类

            job.setPartitionerClass(ProvincePartitionerTwo.class);
            job.setNumReduceTasks(5);
    
    • 1
    • 2

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ajOFgPaF-1668951117057)(png/1624103336629.png)]

    103-Hadoop-MapReduce-Combiner合并

    Combiner 合并

    (1)Combiner是MR程序中Mapper和Reducer之外的一种组件。

    (2)Combiner组件的父类就是Reducer。

    (3)Combiner和Reducer的区别在于运行的位置

    ​ Combiner是在每一个MapTask所在的节点运行;

    ​ Reducer是接收全局所有Mapper的输出结果;

    (4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。

    (5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv 应该跟Reducer的输入kv类型要对应起来。 (场景,不能对结果有影响)

    Mapper Reducer

    3 5 7 ->(3+5+7)/3=5 (3+5+7+2+6)/5=23/5 不等于 (5+4)/2=9/2

    2 6 ->(2+6)/2=4

    (6)自定义 Combiner 实现步骤

    ​ (a)自定义一个 Combiner 继承 Reducer,重写 Reduce 方法

    public class WordCountCombiner extends Reducer {
     private IntWritable outV = new IntWritable();
     @Override
     protected void reduce(Text key, Iterable values, Context 
    context) throws IOException, InterruptedException {
     int sum = 0;
     for (IntWritable value : values) {
     sum += value.get();
     }
     
     outV.set(sum);
     
     context.write(key,outV);
     } }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    (b)在 Job 驱动类中设置:

    job.setCombinerClass(WordCountCombiner.class); 
    
    • 1

    Combiner 合并案例实操 (尚硅谷案例)

    1)需求

    统计过程中对每一个 MapTask 的输出进行局部汇总,以减小网络传输量即采用 Combiner 功能。

    (1)数据输入

    (2)期望输出数据

    期望:Combine 输入数据多,输出时经过合并,输出数据降低。

    2)需求分析

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tKX7dg8H-1668951226525)(png/1624183153130.png)]

    3)案例实操——方案一

    (1)增加一个 WordCountCombiner 类继承 Reducer

    package com.zh.mapreduce.combinerone;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class WordCountCombiner extends Reducer {
        private IntWritable outV = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            outV.set(sum);
            context.write(key,outV);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    (2)在 WordcountDriver 驱动类中指定 Combiner

    // 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
    job.setCombinerClass(WordCountCombiner.class);
    
    • 1
    • 2

    运行日志,输入122,输出5

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QD2Ef0M6-1668951226526)(png/1624183986821.png)]

    job.setNumReduceTasks(0);//shuff机制在map、和reducer中间,混洗数据部分,没有reduce,直接会从map返回
    
    • 1

    4)案例实操——方案二

    (1)将 WordcountReducer 作为 Combiner 在 WordcountDriver 驱动类中指定

    // 指定需要使用 Combiner,以及用哪个类作为 Combiner 的逻辑 ,使用自己的reduce
    job.setCombinerClass(WordCountReducer.class);
    
    • 1
    • 2

    学习路径:https://space.bilibili.com/302417610/,如有侵权,请联系q进行删除:3623472230

  • 相关阅读:
    30-33、SpringBoot项目部署\属性配置方式\多环境开发(一个文件)\多环境分组(多个文件)
    9.4语言是一种实践2
    PDO:从 MySQL 中选择行
    springboot自习室管理系统 小程序毕业设计源码221535
    vue里面使用EventBus(事件总线)
    基于 Openzeppelin 的可升级合约解决方案的注意事项
    Seata 1.5.2 源码学习(事务执行)
    Vscode g++ cmake 学习笔记
    Linux常用命令(3)-文件和目录管理
    树莓派学习
  • 原文地址:https://blog.csdn.net/qq_37171694/article/details/127954665