public class TableMapper extends Mapper<LongWritable, Text, Text,TableBean> {
private String filename;
private Text outK;
private TableBean outV;
//初始化,每个文件开始一次maptask,并进行一次初始化
//获取到文件的名称
@Override
protected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {
//拿到切片信息
FileSplit split = (FileSplit) context.getInputSplit();
filename = split.getPath().getName();
outK = new Text();
outV = new TableBean();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {
//1. 获取一行
String line = value.toString();
//2.判断是哪个文件的
if(filename.contains("order")){//处理的是订单表
String[] split = line.split("\t");
//封装
outK.set(split[1]);//pid作为key
outV.setId(split[0]);
outV.setPid(split[1]);
outV.setAmount(Integer.parseInt(split[2]));
outV.setTableName("order");
outV.setPname("");
}else{//处理的是商品表
String[] split = line.split(" ");
// System.out.println("=========> " + Arrays.toString(split)+" <=========");
// System.out.println("=========> " + split[1] +" <=========");
//封装
outK.set(split[0]);//pid作为key
outV.setId("");
outV.setPid(split[0]);
outV.setAmount(0);
outV.setTableName("pd");
outV.setPname(split[1]);
}
//写出
context.write(outK, outV);
}
}
BeanUtils.copyProperties(dest, src);
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
private ArrayList<TableBean> orderBeans;
private TableBean pdBean;
@Override
protected void setup(Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {
//1.创建集合
orderBeans = new ArrayList<>();
pdBean = new TableBean();
}
@Override
protected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {
orderBeans.clear();//清空集合
//2.遍历赋值
for (TableBean value : values) {
if ("order".equals(value.getTableName())) {
TableBean temp = new TableBean();
try {
BeanUtils.copyProperties(temp,value);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
orderBeans.add(temp);
} else {//商品表
try {
BeanUtils.copyProperties(pdBean, value);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
}
}
//循环遍历orderBeans,赋值pdname
for (TableBean orderBean : orderBeans) {
orderBean.setPname(pdBean.getPname());
context.write(orderBean,NullWritable.get());
}
}
}
总结:这种写法,在reduce阶段创建了对象和集合,这些方式都是比较消耗资源的,容易造成数据倾斜问题。
MR在环形缓冲区快排时倒排索引,反向溢写,会导致数据反向输出,类似栈结构的的先进后出。