• 【Hadoop】学习笔记(六)


    三、MapReduce

    3.8、Shuffle机制

    3.8.1、Shuffle流程

    Mapper端的Shuffle:

    • Collect阶段:将MapTask的结果收集输出到默认大小为100M的环形缓冲区,保存之前会对 key 进行分区的计算,默认hash分区
    • Spill阶段:当内存中的数据量达到一定的阈值时,就会将数据写入本地磁盘。在将数据写入磁盘之前需要对数据进行一次排序的操作。如果配置了combiner,还会将有相同分区号和key的数据进行排序。
    • Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。

    Reducer端的Shuffle:

    • Copy阶段:ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一个属于自己的数据
    • Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据进行合并操作
    • Sort阶段:在对数据进行合并的同事,会进行排序操作。由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据最终整体有效性即可。

    Shuffle整体流程:
    在这里插入图片描述

    3.8.2、Partition分区

    将统计结果按照条件输出到不同文件中(分区),比如将MapReduce结果按照手机号前3位输出到不同的文件中。

    分区的个数决定了会产生多少个ReduceTask,也决定了最后生成的结果文件。

    当不配置ReduceTask个数时,默认只有1个ReduceTask,也就只有1个分区。此时走的分区类是Hadoop的一个内部类,其分区方法getPartition会固定返回一个0,即最后所有的结果都生成到0号文件(part-r-00000)中。

    当配置了ReduceTask个数大于1,但是没有指定分区类时,Hadoop 默认使用的分区类是HashPartitioner,分区方式是 hash分区 :

    将key取 hash 值,然后对ReduceTask个数取余。key.hashcode() % numReduceTask(每个分区都会产生一个ReduceTask,所以ReduceTask个数就是分区个数)

    // 设置ReduceTask个数为2,
    // 每个ReduceTask产生一个结果文件,最后就会产生2个结果文件:0号文件(part-r-00000)、1号文件(part-r-00001)
    // 按照key的hash值,对2取余,结果为0的存入0号文件,结果为1的存入1号文件
    job.setNumReduceTasks(2);
    
    • 1
    • 2
    • 3
    • 4

    用户可以自定义分区类Partitioner:

    1. 自定义类,继承Partitioner,实现getPartition()分区方法
    2. 在 job 驱动类中,设置使用自定义的Partitioner:
    3. 根据自定义Partitioner的逻辑,设置相应数量的ReduceTask:
    //第二步
    job.setPartitionerClass(MyPartitioner.class);
    //第三步
    job.setNumReduceTasks(5);
    
    • 1
    • 2
    • 3
    • 4

    自定义Partitioner:

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    /**
     * 根据手机号前3位分区
     */
    public class MyPartitioner extends Partitioner<Text, IntWritable> {
    
        @Override
        public int getPartition(Text key, IntWritable value, int numPartitions) {
            String phoneStart = key.toString().substring(0, 3);
            int result = 0;
            
            // 分区号必须从0开始,逐一增加
            switch (phoneStart) {
                case "133":
                    result = 0;
                    break;
                case "139":
                    result = 1;
                    break;
                case "192":
                    result = 2;
                    break;
                case "188":
                    result = 3;
                    break;
                default:
                    break;
            }
            return result;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    在Driver中配置使用该分区类,根据分区类中的逻辑设置ReduceTask个数:

    // 设置分区类、ReduceTask个数
    job.setPartitionerClass(MyPartitioner.class);
    job.setNumReduceTasks(4);
    
    • 1
    • 2
    • 3

    如果Partitioner中分区结果有 5 个,而设置的ReduceTask个数是 4 个(比分区个数少),程序运行时就可能出现IOException:Illegal partition for xxxx(4)

    如果设置的ReduceTask为1,那么就不会走我们的Partitioner,而是全部输出的0号文件(走的Hadoop默认的Partitioner内部类)。

    如果Partitioner中分区结果有5个,但是设置的 ReduceTask 个数是 6 个,程序可以正常运行,最后会产生 6 个结果文件,且第6个结果文件是空的。这样最后分配的第6个ReduceTask被浪费了,分配了该节点但是没有处理数据。

    总结:

    • 如果 ReduceTask的数量 > getPartition的结果数,程序可以正常运行,但是会产生几个空的输出文件,最后几个分配的节点没有处理数据,空耗资源;
    • 如果 1 < ReduceTask的数量 < getPartition的结果数,则有部分数据没有ReduceTask处理,会抛出IOException;
    • 如果 ReduceTask的数量 = 1,则不管有没有设置自定义分区类,最终走的都是Hadoop默认的一个固定返回0的分区类,只会分配一个ReduceTask,也只会产生一个结果文件;
    • 如果ReduceTask的数量 = 0,则表示不进行Reduce汇总,Hadoop也不会再进行Shuffle,直接将MapTask的结果输出到文件
    • 分区getPartition方法中,分区号必须从0开始,逐1增加

    3.8.3、WritableComparable排序

    MapTask和ReduceTask均会对数据按照key进行排序。该操作是Hadoop的默认行为,任何应用程序中的数据均会被排序,不管业务逻辑上是否需要。

    默认的排序是按照字典顺序排序,且实现该排序的方法是快速排序算法

    所以mapper()方法的结果中,key必须要是可以排序的。

    对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序

    对于ReduceTask,它从每个MapTask上远程拷贝(拉取fetch)相应的数据文件。如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中:

    • 如果内存中文件大小或数目超过一定阈值,则进行一次合并后溢写到磁盘上;
    • 如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大的文件;
    • 当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序;

    排序分类:

    • 部分排序:MapReduce根据输入记录的 key 对数据集进行排序,保证输出的每个文件内部有序。
    • 全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率低,因为一台机器处理完所有文件,完全丧失了MapREduce所提供的并行架构。
    • 辅助排序(GroupingComparator分组排序):在Reduce端对 key 进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同的key进入同一个reduce方法时,可以采用分组排序。
    • 二次排序:在自定义排序过程中,如果compareTo中的判断条件为两个,即为二次排序。

    如果使用自定义的 JavaBean 作为key传输,那么这个 JavaBean 需要实现 WritableComparable 接口并重写compareTo方法。

    WritableComparable接口继承了 Writable 和 Comparable。我们也可以直接在 JavaBean 里面实现 Writable 和 Comparable

    3.8.4、Combiner合并

    Reducer之前,对当前Mapper计算结果先进行一次小的合并再输出。例如第一个MapTask上计算结果是{a:1, a:1, a:1, b:1, c:1, c:1},经过Combiner合并后,MapTask输出结果为{a:3, b:1, c:2}。(Combiner只对当前MapTask合并,而Reducer是对从MapTask上拉取来的所有数据进行合并)

    特点:

    • Combiner 是MR程序中 Mapper 和 Reducer 之外的一种组件。即 Combiner不是必须的,可以选择性的添加
    • 自定义Combiner组件的父类也是Reducer
    • Combiner和Reducer的区别在于运行的位置:
      Combniner是在MapTask所在的节点运行
      Reducer是接收全局所有Mapper的输出结果
    • Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小Reducer从Mapper拉取时的网络传输流量
    • Combiner能够应用的前提是:不能影响最终的业务逻辑。而且 Combiner 输出的 key-value 应该能够跟Reducer的输入 key-value 类型对应起来。

    编写自定义Combiner:

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * Combiner就是一个小的Reducer,继承的类也是Reducer
     * 只在当前的MapTask上运行,对当前MapTask上的结果进行汇总
     */
    public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
        IntWritable outValue = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();  // value是IntWritable类型,需要调用get()进行类型转换
            }
            outValue.set(sum);
            context.write(key, outValue);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    在 job 中使用 Combiner:

    // 设置Combiner
    job.setCombinerClass(MyCombiner.class);
    
    // 将 ReduceTask 数量设置为0,程序就不会运行ReduceTask。
    // 没有ReduceTask,也就不会执行Shuffle阶段,所以Shuffle阶段的Combiner也就不会执行
    job.setNumReduceTasks(0);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    一般情况下,Combiner和Reducer的逻辑完全一样,所以可以直接将Reducer设置到Combiner中:

    // 因为MyCombiner和WordCountReducer的逻辑完全一样,所以可以直接使用Reducer
    job.setCombinerClass(WordCountReducer.class);
    
    • 1
    • 2

    3.9、OutputFormat数据输出

    OutputFormat是MapReduce输出的基类,所有MapReduce的输出类都实现了 OutputFormat接口。
    默认使用的是OutputFormat下的 FileOutputFormat下的TextOutputFormat。

    自定义OutputFormat:例如在FileOutputFormat基础上自定义

    • 自定义类,继承FileOutputFormat
    • 改写FileOutputFormat中实现RecordWrite的内部类FilterRecordWriter

    自定义OutputFormat示例:

    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.UUID;
    
    /**
     * 自定义输出:
     * 在job中设置输出文件夹路径,如果结果的key带有spring就输出到指定文件夹下的test01.txt,否则输出到指定文件夹下的test02.txt
     */
    public class MyOutputFormat extends FileOutputFormat<Text, IntWritable> {
        @Override
        public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            // 返回一个我们自定义的输出Writer
            MyRecordWriter writer = new MyRecordWriter(job);
            return writer;
        }
    }
    
    /**
     * 自定义RecordWriter类,需要继承RecordWriter
     */
    class MyRecordWriter extends RecordWriter<Text, IntWritable> {
    
        private FSDataOutputStream fsOut1;
        private FSDataOutputStream fsOut2;
    
        public MyRecordWriter(TaskAttemptContext job) {
    
            try {
                // 创建输出流
                FileSystem fileSystem = FileSystem.get(job.getConfiguration());
                String defaultPath = "/app/WordCount/myoutput/" + UUID.randomUUID().toString();
                String pathParent = job.getConfiguration().get(FileOutputFormat.OUTDIR, defaultPath);  // 读取job设置的输出路径
    
                // 创建两个输出的文件
                // 获取job中配置的SpringFileName、OtherFileName
                String subPath1 = pathParent + "/" + job.getConfiguration().get("SpringFileName"); 
                String subPath2 = pathParent + "/" + job.getConfiguration().get("OtherFileName");
    
                fsOut1 = fileSystem.create(new Path(subPath1));
                fsOut2 = fileSystem.create(new Path(subPath2));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 写操作
         */
        @Override
        public void write(Text key, IntWritable value) throws IOException, InterruptedException {
            // 根据key是否为spring,输出到指定文件
            if("spring".equalsIgnoreCase(key.toString())) {
                fsOut1.writeBytes(key + "@" + value + "\n");
            } else {
                fsOut2.writeBytes(key + "@" + value + "\n");
            }
        }
    
        /**
         * 关闭IO流
         */
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            IOUtils.closeStream(fsOut1);
            IOUtils.closeStream(fsOut2);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    在 job 中使用我们自定义的OutputFormat:

    // 设置自定义OutputFormat
    job.setOutputFormatClass(MyOutputFormat.class);
    
    // 给job添加一个属性,用作配置输出文件路径。属性名为FileOutputFormat.OUTDIR常量
    // 程序中可以通过 job.getConfiguration().get(FileOutputFormat.OUTDIR) 获取
    // FileOutputFormat的 _SUCCESS 文件默认也会生成到该路径下
    FileOutputFormat.setOutputPath(job, new Path("/app/WordCount/myoutput/output1"));  
    
    // 给job再添加两个属性SpringFileName、OtherFileName
    job.getConfiguration().set("SpringFileName", "test01.txt");
    job.getConfiguration().set("OtherFileName", "test02.txt");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.10、MapTask、ReduceTask工作机制

    MapTask工作流程图:
    在这里插入图片描述
    ReduceTask工作机制:
    在这里插入图片描述

    MapTask的并行度由切片个数决定,切片个数由输入文件和切片规则决定。
    ReduceTask的并行度同样影响整个 job 的执行并发度和效率,但与MapTask的并发数不同,ReduceTask数量可以直接手工设置:

    // 设置ReduceTask的个数,默认值是1
     job.setNumReduceTasks(4);
    
    • 1
    • 2

    当ReduceTask = 0 时,表示没有Reduce阶段,输出的文件个数和MapTask个数一致(直接将MapTask的结果输出到文件。
    ReduceTask默认值是1,且OutputFormat默认每个ReduceTask有1个结果文件,所以最终输出的文件只有1个。
    如果ReduceTask个数设置为1,即使设置了自定义分区,最后依然使用Hadoop默认的不分区。即 执行分区的前提是 ReduceTask 个数大于1。
    如果数据分布不均匀,就可能在Reduce阶段产生数据倾斜。此时如果分区不合理,就会造成某几个ReduceTask过于繁忙,而其他的ReduceTask又过于空闲。
    有些情况下需要计算全局的汇总结果,这种情况就只能有1个ReduceTask。

    3.11、join转换两个文件码值

    3.11.1、Join转换两个文件码值

    类似SQL中的 join,需要将两个不同文件进行关联输出,例如一张是详情数据、一张是码值数据,需要关联输出的详情中进行了码值转换。

    3.11.2、在 Reduce 端进行 join

    Map端的主要工作:为来自不同表或文件的 key-value 对,打标签以区别不同来源的记录。然后用连接字段作为 key ,其余部分和新加的标志作为value,最后进行输出。

    Reduce端的主要工作:在Reduce端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就可以了。

    示例,将订单表和产品表使用MapReduce做 join。

    在Mapper中,将 pid 设置为 key,这样相同 key 的订单数据、产品数据 就都能进入同一个 reduce 方法中处理。

    然后在这个reduce方法中,判断如果是产品表,就获取产品名称;如果是订单表,就先存入集合。最后再遍历订单集合,将产品名称设置进去。

    订单表表数据:

    id(订单编号)pid(产品编号)amount(数量)
    1001011
    1002021
    1003034
    1004012
    1005013
    1006025

    产品表:

    pid(产品编号)pname(产品名称)
    01苹果
    02桔子
    03香蕉

    最终输出结果:订单编号、产品名称、数量

    编写JavaBean:

    package com.study.mapreduce.reducejoin;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * JavaBean结果
     */
    public class OrderInfo implements Writable {
        private String id;  // 订单编号
        private String pid;  // 产品编号
        private Integer amount;  // 数量
        private String pname;  // 产品名称
        private String _from_table;  // 标记该条数据的来源表:order、product
    
        public OrderInfo() {
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(id);  // 序列化String时,使用 writeUTF写出、readUTF读取
            out.writeUTF(pid);
            out.writeInt(amount);
            out.writeUTF(pname);
            out.writeUTF(_from_table);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            id = in.readUTF(); 
            pid = in.readUTF();
            amount = in.readInt();
            pname = in.readUTF();
            _from_table = in.readUTF();
        }
    
        
        // 重写toString,将JavaBean输出到文件时会调用toString方法输出
        @Override
        public String toString() {
            return "OrderInfo{" +
                    "id='" + id + '\'' +
                    ", pid='" + pid + '\'' +
                    ", amount=" + amount +
                    ", pname='" + pname + '\'' +
                    ", _from_table='" + _from_table + '\'' +
                    '}';
        }
    
       //getter setter
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    编写Mapper:

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.log4j.Logger;
    
    import java.io.IOException;
    public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, OrderInfo> {
        private Logger logger = Logger.getLogger(ReduceJoinMapper.class);
    
        private Text outKey = new Text();
        private String fileName = ""; // 当前MapTask处理的文件名称
    
        /**
         * 重写初始化方法,在初始化方法中通过切片信息获取到文件名称,避免每次都在map方法中读取耗费资源
         * 因为每个切片对应一个MapTask,所以这个MapTask的初始化方法就是这个切片的初始化方法
         * 因为一个文件可以切多个片,但是一个切片只会对应一个文件,所以这个切片对应的文件信息是固定的
         */
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
            // JobSubmitter 的 writeNewSplits 中会调用 input.getSplits(job) 进行切片
            // 我们使用的默认InputFormat,也就是 TextInputFormat。TextInputFormat 的 getSplits 方法在 FileInputFormat(TextInputFormat的父类)中
            // FileInputFormat 的 getSplits 方法会调用makeSplit创建 FileSplit 类型的切片对象
            fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
        }
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
            String[] valueArray = value.toString().split("@");
    
            logger.info("---------------------------------------------------------");
            logger.info("fileName:" + fileName);
    
            OrderInfo orderInfo = new OrderInfo();
            if(fileName.startsWith("order")) {  // 处理 order.txt文件的内容
                orderInfo.setId(valueArray[0]);
                orderInfo.setPid(valueArray[1]);
                orderInfo.setAmount(Integer.valueOf(valueArray[2]));
                orderInfo.set_from_table("_ORDER_");  // 设置来源表为order表
                orderInfo.setPname("");  // 属性值不能有null,否则会报NPE空指针异常。可以设置一个默认值
    
                outKey.set(valueArray[1]);
            } else {   // 处理 product.txt文件的内容
                orderInfo.setPid(valueArray[0]);
                orderInfo.setPname(valueArray[1]);
                orderInfo.set_from_table("_PRODUCT_");  // 设置来源表为product
                // 防止空指针异常,对null属性设置一个默认值
                orderInfo.setId("");
                orderInfo.setAmount(-1);
    
                outKey.set(valueArray[0]);
            }
            context.write(outKey, orderInfo);
    
            logger.info(orderInfo);
            logger.info("-------------------------------------------");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    编写Reducer:

    package com.study.mapreduce.reducejoin;
    
    import org.apache.commons.beanutils.BeanUtils;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.log4j.Logger;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    public class ReduceJoinReducer extends Reducer<Text, OrderInfo, Text, OrderInfo> {
    
        private Logger logger = Logger.getLogger(ReduceJoinReducer.class);
    
        @Override
        protected void reduce(Text key, Iterable<OrderInfo> values, Reducer<Text, OrderInfo, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
    
            logger.info("=====================================");
            logger.info("key: " + key);
    
            List<OrderInfo> orderInfoList = new ArrayList<>();
            String pname = "";
    
            Iterator<OrderInfo> iterator = values.iterator();
            while (iterator.hasNext()) {
                OrderInfo orderInfo = iterator.next();
                logger.info(orderInfo);
                if("_PRODUCT_".equals(orderInfo.get_from_table())) {
                    pname = orderInfo.getPname();
                } else {
    
                    // 此处不能直接使用:orderInfoList.add(orderInfo);
                    // Hadoop迭代器为了优化效率,使用了对象重用。
                    // 迭代时value始终指向同一个内存地址,改变的只是引用地址中的字段属性
                    // 即这个循环了这么多次的orderInfo,其实是同一个对象,只是对象的属性值在不断变化
                    // 所以这里需要 new 一个新对象来存放这些属性,然后将这个新对象塞给集合
    
                    OrderInfo orderInfoTemp = new OrderInfo();
    //                orderInfoTemp.setId(orderInfo.getId());
    //                orderInfoTemp.setPid(orderInfo.getPid());
    //                orderInfoTemp.setAmount(orderInfo.getAmount());
    //                orderInfoTemp.set_from_table(orderInfo.get_from_table());
                    try {
                        // 使用BeanUtils工具对同名属性进行赋值
                        // 第一个参数:要设置值的目标对象
                        // 第二个参数:源对象
                        BeanUtils.copyProperties(orderInfoTemp, orderInfo);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    orderInfoList.add(orderInfoTemp);
                }
            }
    
            logger.info("**********************************");
            for (OrderInfo orderInfo : orderInfoList) {
                orderInfo.setPname(pname);
                context.write(key, orderInfo);
                logger.info(orderInfo);
            }
            logger.info("===========================================");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    编写Driver:

    package com.study.mapreduce.reducejoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class ReduceJoinDriver {
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
            Configuration config = new Configuration();
            Job job = Job.getInstance(config);
    
            job.setJarByClass(ReduceJoinDriver.class);
    
            job.setMapperClass(ReduceJoinMapper.class);
            job.setReducerClass(ReduceJoinReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(OrderInfo.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(OrderInfo.class);
    
            FileInputFormat.setInputPaths(job, new Path("/app/order/input"));
            FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output2"));
    
            boolean success = job.waitForCompletion(true);
    
            System.exit(success ? 0 : 1);  // 程序退出
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    3.11.3、在 Map 端进行 Join

    join合并的操作如果在Reduce段完成,Reduce端的处理压力就会很大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易发生数据倾斜。

    因为一般情况下,MapTask的节点数量要多于ReduceTask,可以在Map段进行数据join合并。

    适用场景:一张表很小、另一张表很大。先将小表读取加载进内存中,然后在Map端进行 join合并。

    具体方法:采用DistributedCache

    1. 在Mapper的setup阶段,将文件读取到缓存集合中
    2. 在Driver驱动类中加载缓存:
    // 缓存普通文件到Task运行节点
    job.addCacheFile(new URI("file:///app/cache/product.txt"));
    // 如果是集群运行,需要设置成 hdfs 路径
    
    • 1
    • 2
    • 3
    1. 在Map端进行 join,此时不再需要Reduce阶段,可以设置 ReduceTask数量为0
    // 因为在Map端已经完成了join,达到了目的,不再需要ReduceTask,可以设置取消ReduceTask
    job.setNumReduceTasks(0);
    
    • 1
    • 2

    编写Driver:

    package com.study.mapreduce.mapjoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    public class MapJoinDriver {
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
            Configuration config = new Configuration();
            Job job = Job.getInstance(config);
    
            job.setJarByClass(MapJoinDriver.class);
    
            job.setMapperClass(MapJoinMapper.class);
    
            job.setMapOutputKeyClass(Text.class);
            // 不需要汇总出Value,可以设置为null(NullWritable.get())
            job.setMapOutputValueClass(NullWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 设置取消ReduceTask
            job.setNumReduceTasks(0);
    
            // 加载缓存数据
            job.addCacheFile(new URI("/app/order/input/product.txt"));
    
            FileInputFormat.setInputPaths(job, new Path("/app/order/input/order.txt"));
            FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output3"));
    
            boolean success = job.waitForCompletion(true);
    
            System.exit(success ? 0 : 1);  // 程序退出
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    编写Mapper:

    package com.study.mapreduce.mapjoin;
    
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    
    public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    
        private Map<String, String> productMap = new HashMap<>();  // product表数据
    
        private Text outKey = new Text();
    
        /**
         * 在MapTask初始化时加载缓存文件
         */
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
            // 获取product.txt缓存文件
            URI[] cacheFiles = context.getCacheFiles();
            URI productFile = cacheFiles[0];
    
            // 读取product.txt文件
            FileSystem fileSystem = FileSystem.get(context.getConfiguration());
            FSDataInputStream inputStream = fileSystem.open(new Path(productFile));
            BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
            String line = "";
            while((line = br.readLine()) != null) {
                String[] strArray = line.split("@");
                productMap.put(strArray[0], strArray[1]);
            }
    
            IOUtils.closeStream(br);
        }
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("@");
            String id = fields[0];
            String pid = fields[1];
            int amount = Integer.valueOf(fields[2]);
            String pname = productMap.get(pid);
    
            String resultKey = id  +"\t" + pname + "\t" + amount;
            outKey.set(resultKey);
            context.write(outKey, NullWritable.get());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
  • 相关阅读:
    软考高级之系统架构师系列之UP、RUP、4+1视图、JAD、JRP、RAD
    LQ0046 凑算式【枚举】
    【echarts】18、echarts+vue2 - 折线图柱状图
    Java并发 | 22.[方法] 调用wait( )和notify( )的正确姿势
    多玩家“角力”会议平板
    vm的生命周期钩子
    springboot+安全在线学习平台 毕业设计-附源码131019
    HiveSQL源码之语法词法编译文件解析一文详解
    音视频开发常用名词解释
    Linux输入设备应用编程(键盘,按键,触摸屏,鼠标)
  • 原文地址:https://blog.csdn.net/lushixuan12345/article/details/126556854