在使用 MapReduce 处理数据的时候,难免其中部分数据在其他文件中存在,因此就可能出现类似 DB 中的 Join 关联操作。
比如:有以下数据,分别表示,date(日期),county(县),stateId(州ID),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)
:
28/1/2021 00:00:00 Autauga 1 01001 5554 69
28/1/2021 00:00:00 Piatt 15 17147 1208 18
28/1/2021 00:00:00 Emanuel 11 13107 2283 65
28/1/2021 00:00:00 Clay 18 20027 760 22
其中 stateId(州ID)
又对应另一个文件的 ID
:
1 Alabama
15 Illinois
11 Georgia
18 Kansas
如果需要得到完整的州对应的数据,就需要进行两个文件的Join 关联。
在 MapReduce 中分为 Map 和 Reduce 两个阶段,两个阶段都可以对数据进行处理,也就可以通过一些技术手段对数据进行 Join 操作。
Reduce 阶段 Join 进行处理是非常容易实现的,可以将多个文件共同的 join 字段作为 KEY,一起发往 Reduce 阶段,在 Reduce 中再进行数据的整合,比如下方的操作:
@Slf4j
public class CovidJoinMapper extends Mapper<LongWritable, Text, Text, CountVO> {
Text outKey = new Text();
CountVO outValue = new CountVO();
String filename = null;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
filename = inputSplit.getPath().getName();
log.info("currentFile:{} ", filename);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
if (filename.contains("covid_input.txt")) {
outKey.set(fields[2]);
outValue.set(fields[0], fields[1], Integer.parseInt(fields[2]), fields[3], Long.parseLong(fields[4]), Long.parseLong(fields[5]), "covid_input");
context.write(outKey, outValue);
} else {
outKey.set(fields[0]);
outValue.set(Integer.parseInt(fields[0]), fields[1], "state");
context.write(outKey, outValue);
}
}
}
public class CovidJoinReducer extends Reducer<Text, CountVO, CountVO, NullWritable> {
NullWritable outValue = NullWritable.get();
@Override
protected void reduce(Text key, Iterable<CountVO> values, Context context) throws IOException, InterruptedException {
List<CountVO> covidList = new ArrayList<>();
List<CountVO> stateList = new ArrayList<>();
for (CountVO value : values) {
if (Objects.equals(value.getType(), "covid_input")) {
covidList.add(new CountVO(value));
} else {
stateList.add(new CountVO(value));
}
}
// 对 covidList 根据 stateId 补充 state
covidList.stream().filter(Objects::nonNull).peek(vo ->
vo.setState(stateList.stream()
.filter(Objects::nonNull)
.filter(s -> Objects.equals(s.getStateId(), vo.getStateId()))
.map(CountVO::getState)
.findFirst().orElse(null))
).forEach(vo -> {
try {
context.write(vo, outValue);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
上面虽然实现了数据的整个,但是 Reduce 端 join最大的问题是整个 join 的工作是在 Reduce 阶段完成的,但是通常情况下MapReduce中 Reduce 的并行度是极小的(默认是1个),这就使得所有的数据都挤压到 Reduce 阶段处理,压力颇大,极易出现数据倾斜现象。
下面继续看如果在 Map 阶段进行数据的 Join:
Map 阶段 Join 处理,则避免了 shuffle 时候的繁琐,同时 Reduce 阶段的压力也会减小,但是 Map 阶段会根据文件的大小拆分成多个 mapTask 分开并行处理,每个 mapTask 都需要进行数据的 Join 处理,那每个 mapTask 都对被 Join 文件进行完整的 IO 读取也会造成性能低下,不过在 Mapreduce 中有提供分布式缓存机制,可以将指定文件缓存到各个mapTask 运行的机器上,实践操作如下:
在驱动类中声明 job 时,指定缓存文件:
job.addCacheFile(new URI("/test/input1/state.txt"));
在 Mapper 中,可以读取该文件解析成 Map 使用:
@Slf4j
public class CovidJoinMapper extends Mapper<LongWritable, Text, CountVO, NullWritable> {
Map<Integer, String> stateMap = new HashMap<>();
CountVO outKey = new CountVO();
NullWritable outValue = NullWritable.get();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//读取缓存文件,直接指定文件名
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("state.txt")));
String line = null;
while ((line = br.readLine()) != null) {
String[] fields = line.toString().split("\t");
stateMap.put(Integer.parseInt(fields[0]), fields[1]);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
outKey.set(fields[0], fields[1], Integer.parseInt(fields[2]), fields[3], Long.parseLong(fields[4]), Long.parseLong(fields[5]), "covid_input");
outKey.setState(stateMap.get(Integer.parseInt(fields[2])));
context.write(outKey, outValue);
}
}
这种方式虽然可以提升处理的速度,但也有缺陷,如果缓存文件大的话,势必会造成不必要的内存浪费,这种情况可以考虑引入第三方工具,Redis、HBase 等 NoSql ,将被 Join 的数据存至 NoSql 中,在 Mapper 中查找相关数据进行整合,然后给到 Reduce 聚合处理。