定长文件指,每一行的byte是相同的。且有一个定义定长数据中,每一部分是什么字段,长度多少等信息。
需要写入到指定的Hive分区的时候, 需要创建对应分区并指定地址为输出地址。既可完成。
实例代码,配置读取文件,Map操作,Reduce操作以及输出文件。
package com.study.spark.mr;
import com.study.spark.mr.mapper.FixedLengthMapper;
import com.study.spark.mr.reduce.OrcFixedReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FixedLengthInputFormat;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcOutputFormat;
import org.apache.parquet.hadoop.ParquetInputFormat;
public class FileOrcParquetExample {
public static void main(String[] args) throws Exception{
mr();
}
public static void mr() throws Exception {
Configuration configuration = new Configuration();
int recordLength = 200; //定长文件每行长度,如果文件每行带有/n则需要加1
configuration.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH,recordLength);
configuration.set("encode","文件编号格式");
configuration.set("orc.mapred.output.schema",schema().toString());
Job job = Job.getInstance(configuration);
//设置执行的
job.setJarByClass(FileParquetExample.class);
job.setJobName("FileParquetExample");
Path path = new Path("hdfs:");
ParquetInputFormat.setInputPaths(job,path);
job.setInputFormatClass(FixedLengthInputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setMapperClass(FixedLengthMapper.class);
job.setOutputFormatClass(OrcOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(OrcStruct.class);
job.setReducerClass(OrcFixedReduce.class);
//文件输出位置
OrcOutputFormat.setOutputPath(job,new Path("hdfs://"));
job.waitForCompletion(true);
}
public static TypeDescription schema(){
OrcStruct各种数据格式参考链接:https://blog.csdn.net/swg321321/article/details/125879576
TypeDescription description = new TypeDescription(TypeDescription.Category.STRUCT);
description.addField("boolean",TypeDescription.createBoolean());
description.addField("decimal",TypeDescription.createDecimal()).withPrecision(22).withScale(2);
return description;
}
}
代码实现,定长文件读取出现的数据,在这里进入Mapper处理。
package com.study.spark.mr.mapper;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FixedLengthMapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> {
/**
* 在这里完成,对数据的修改。如果不错修改也可以放到Reduce中进行修改
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
代码实现,从Map读取到数据转为Orc文件
package com.study.spark.mr.reduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
public class OrcFixedReduce extends Reducer<LongWritable, BytesWritable, NullWritable, OrcStruct> {
private TypeDescription typeDescription;
/**
* Called once at the start of the task.
*/
protected void setup(Context context) throws IOException, InterruptedException {
Configuration config = context.getConfiguration();
if(config.get("orc.mapred.output.schema") == null){
throw new RuntimeException("需要设置ORC的Schema,orc.mapred.output.schema");
}
typeDescription = TypeDescription.fromString(config.get("orc.mapred.output.schema"));
}
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(LongWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
for(BytesWritable value: values) {
OrcStruct orcStruct = new OrcStruct(typeDescription);
byte[] bs = value.getBytes();
//在这里实现自己的分割字符
//OrcStruct各种数据格式写入参考链接:https://blog.csdn.net/swg321321/article/details/125879576
context.write(NullWritable.get(),orcStruct);
}
}
}