• Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑


    一、情景描述

    我们知道,在MapTask阶段开始时,需要InputFormat读取数据
    而在ReduceTask阶段结束时,将处理完成的数据,输出到磁盘,此时就要用到OutputFormat

    在之前的程序中,我们都没有设置过这部分配置
    所以,采用的是默认输出格式:TextOutputFormat

    在实际工作中,我们的输出不一定是到磁盘,可能是输出到MySQL、HBase

    那么,如何实现自定义的OutputFormat
    在这里插入图片描述

    二、案例

    1、源数据

    http://www.baidu.com
    http://www.google.com
    http://cn.bing.com
    http://www.atguigu.com
    http://www.sohu.com
    http://www.baidu.com
    http://www.sina.com
    http://www.sin2a.com
    http://www.baidu.com
    http://www.sin2desa.com
    http://www.sindsafa.com
    

    2、需求分析

    过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log

    3、代码实现

    LogMapper.java

    package com.atguigu.mapreduce.outputformat;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // http://www.baidu.com
            //http://www.google.com
            // (http://www.google.com, NullWritable)
            // 不做任何处理
            context.write(value, NullWritable.get());
        }
    }
    

    LogReducer.java

    package com.atguigu.mapreduce.outputformat;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    
            // http://www.baidu.com
            // http://www.baidu.com
            // 防止有相同数据,丢数据
            for (NullWritable value : values) {
                context.write(key, NullWritable.get());
            }
        }
    }
    

    LogRecordWriter.java

    package com.atguigu.mapreduce.outputformat;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    
    import java.io.IOException;
    
    public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
    
        private  FSDataOutputStream atguiguOut;
        private  FSDataOutputStream otherOut;
    
        public LogRecordWriter(TaskAttemptContext job) {
            // 创建两条流
            try {
                FileSystem fs = FileSystem.get(job.getConfiguration());
    
                atguiguOut = fs.create(new Path("D:\\hadoop\\atguigu.log"));
    
                otherOut = fs.create(new Path("D:\\hadoop\\other.log"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            String log = key.toString();
    
            // 具体写
            if (log.contains("atguigu")){
                atguiguOut.writeBytes(log+"\n");
            }else {
                otherOut.writeBytes(log+"\n");
            }
        }
    
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            // 关流
            IOUtils.closeStream(atguiguOut);
            IOUtils.closeStream(otherOut);
        }
    }
    

    LogOutputFormat.java

    package com.atguigu.mapreduce.outputformat;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
        @Override
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
    
            LogRecordWriter lrw = new LogRecordWriter(job);
    
            return lrw;
        }
    }
    

    LogDriver.java

    package com.atguigu.mapreduce.outputformat;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class LogDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(LogDriver.class);
            job.setMapperClass(LogMapper.class);
            job.setReducerClass(LogReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            //设置自定义的outputformat
            job.setOutputFormatClass(LogOutputFormat.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputoutputformat"));
            //虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
            //而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录
            FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output1111"));
    
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
    
        }
    }
    

    3、测试

    在这里插入图片描述
    在这里插入图片描述

    三、总结

    关键文件:
    LogRecordWriter.java
    LogOutputFormat.java
    LogDriver.java

            //设置自定义的outputformat
            job.setOutputFormatClass(LogOutputFormat.class);
    
  • 相关阅读:
    固件供应链公司Binarly获得WestWave Capital和Acrobator Ventures的360万美元投资
    有序表2:跳表
    蓝牙Mesh系统开发五 ble mesh设备增加与移除
    2022年最新宁夏建筑八大员(材料员)模拟考试试题及答案
    《计算机体系结构量化研究方法》1.7 可信任度
    动态内存管理
    拿下!这些证书可以帮你职场晋升!(PMP/CSPM/NPDP)
    Purple Pi OH鸿蒙开发板7天入门OpenHarmony开源鸿蒙教程【五】
    了解WhatsAppBusiness这五个关键功能,助你提升客户体验
    【FLY】Android内存性能优化
  • 原文地址:https://blog.csdn.net/Brave_heart4pzj/article/details/139862008