• MapReduce读取定长文件入库Hive表Orc格式



    MapReduce读取定长文件入库Hive表Orc格式

    定长文件指,每一行的byte是相同的。且有一个定义定长数据中,每一部分是什么字段,长度多少等信息。

    需要写入到指定的Hive分区的时候, 需要创建对应分区并指定地址为输出地址。既可完成。

    MapReduce启动程序

    实例代码,配置读取文件,Map操作,Reduce操作以及输出文件。

    package com.study.spark.mr;
    
    import com.study.spark.mr.mapper.FixedLengthMapper;
    import com.study.spark.mr.reduce.OrcFixedReduce;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FixedLengthInputFormat;
    import org.apache.orc.TypeDescription;
    import org.apache.orc.mapred.OrcStruct;
    import org.apache.orc.mapreduce.OrcOutputFormat;
    import org.apache.parquet.hadoop.ParquetInputFormat;
    
    public class FileOrcParquetExample {
        
        public static void main(String[] args) throws Exception{
            mr();
        }
    
    
        public static void mr() throws Exception {
    
            Configuration configuration = new Configuration();
            int recordLength = 200; //定长文件每行长度,如果文件每行带有/n则需要加1
            configuration.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH,recordLength);
            configuration.set("encode","文件编号格式");
            configuration.set("orc.mapred.output.schema",schema().toString());
    
            Job job = Job.getInstance(configuration);
            //设置执行的
            job.setJarByClass(FileParquetExample.class);
            job.setJobName("FileParquetExample");
            Path path = new Path("hdfs:");
            ParquetInputFormat.setInputPaths(job,path);
            job.setInputFormatClass(FixedLengthInputFormat.class);
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(BytesWritable.class);
            job.setMapperClass(FixedLengthMapper.class);
    
            job.setOutputFormatClass(OrcOutputFormat.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(OrcStruct.class);
            job.setReducerClass(OrcFixedReduce.class);
            //文件输出位置
            OrcOutputFormat.setOutputPath(job,new Path("hdfs://"));
    
            job.waitForCompletion(true);
    
        }
    
        public static  TypeDescription schema(){
            OrcStruct各种数据格式参考链接:https://blog.csdn.net/swg321321/article/details/125879576
            TypeDescription description = new TypeDescription(TypeDescription.Category.STRUCT);
            description.addField("boolean",TypeDescription.createBoolean());
            description.addField("decimal",TypeDescription.createDecimal()).withPrecision(22).withScale(2);
            return description;
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    FixedLengthMapper

    代码实现,定长文件读取出现的数据,在这里进入Mapper处理。

    package com.study.spark.mr.mapper;
    
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class FixedLengthMapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> {
        /**
         * 在这里完成,对数据的修改。如果不错修改也可以放到Reduce中进行修改
         * @param key
         * @param value
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    OrcFixedReduce

    代码实现,从Map读取到数据转为Orc文件

    package com.study.spark.mr.reduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.orc.TypeDescription;
    import org.apache.orc.mapred.OrcStruct;
    
    import java.io.IOException;
    
    public class OrcFixedReduce extends Reducer<LongWritable, BytesWritable, NullWritable, OrcStruct> {
    
        private TypeDescription typeDescription;
    
        /**
         * Called once at the start of the task.
         */
        protected void setup(Context context) throws IOException, InterruptedException {
    
            Configuration config = context.getConfiguration();
            if(config.get("orc.mapred.output.schema") == null){
                throw new RuntimeException("需要设置ORC的Schema,orc.mapred.output.schema");
            }
            typeDescription =  TypeDescription.fromString(config.get("orc.mapred.output.schema"));
        }
    
        /**
         * This method is called once for each key. Most applications will define
         * their reduce class by overriding this method. The default implementation
         * is an identity function.
         */
        @SuppressWarnings("unchecked")
        protected void reduce(LongWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
            for(BytesWritable value: values) {
                OrcStruct orcStruct = new OrcStruct(typeDescription);
                byte[] bs = value.getBytes();
                //在这里实现自己的分割字符
                //OrcStruct各种数据格式写入参考链接:https://blog.csdn.net/swg321321/article/details/125879576
    
                context.write(NullWritable.get(),orcStruct);
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
  • 相关阅读:
    图像信号处理器及其架构演进
    EasyCVR平台通过接口编辑通道出现报错“ID不能为空”,是什么原因?
    让阿里P8都为之着迷的分布式核心原理解析到底讲了啥?看完我惊了
    ALU——调用加法乘法模块
    万字解析设计模式之代理模式
    【C++入门基础】
    在 linux(腾讯云 centos 7.9)上运行 pytest,allure 并生成基于公网地址的测试报告
    京东架构师呕心整理:jvm与性能调优有哪些核心技术知识点
    Linux系统命令——帮助命令、文件权限命令
    旅游网页(HTML+CSS+JS)
  • 原文地址:https://blog.csdn.net/swg321321/article/details/126414003