• Hadoop3 - MapReduce Join 关联注意点


    一、MapReduce Join 关联注意点

    在使用 MapReduce 处理数据的时候,难免其中部分数据在其他文件中存在,因此就可能出现类似 DB 中的 Join 关联操作。

    比如:有以下数据,分别表示,date(日期),county(县),stateId(州ID),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)

    28/1/2021 00:00:00	Autauga	1	01001	5554	69
    28/1/2021 00:00:00	Piatt	15	17147	1208	18
    28/1/2021 00:00:00	Emanuel	11	13107	2283	65
    28/1/2021 00:00:00	Clay	18	20027	760	22
    
    • 1
    • 2
    • 3
    • 4

    其中 stateId(州ID) 又对应另一个文件的 ID

    1	Alabama
    15	Illinois
    11	Georgia
    18	Kansas
    
    • 1
    • 2
    • 3
    • 4

    如果需要得到完整的州对应的数据,就需要进行两个文件的Join 关联。

    在 MapReduce 中分为 Map 和 Reduce 两个阶段,两个阶段都可以对数据进行处理,也就可以通过一些技术手段对数据进行 Join 操作。

    Reduce 阶段 Join

    Reduce 阶段 Join 进行处理是非常容易实现的,可以将多个文件共同的 join 字段作为 KEY,一起发往 Reduce 阶段,在 Reduce 中再进行数据的整合,比如下方的操作:

    @Slf4j
    public class CovidJoinMapper extends Mapper<LongWritable, Text, Text, CountVO> {
    
        Text outKey = new Text();
        CountVO outValue = new CountVO();
        String filename = null;
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            filename = inputSplit.getPath().getName();
            log.info("currentFile:{} ", filename);
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split("\t");
            if (filename.contains("covid_input.txt")) {
                outKey.set(fields[2]);
                outValue.set(fields[0], fields[1], Integer.parseInt(fields[2]), fields[3], Long.parseLong(fields[4]), Long.parseLong(fields[5]), "covid_input");
                context.write(outKey, outValue);
            } else {
                outKey.set(fields[0]);
                outValue.set(Integer.parseInt(fields[0]), fields[1], "state");
                context.write(outKey, outValue);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    public class CovidJoinReducer extends Reducer<Text, CountVO, CountVO, NullWritable> {
    
        NullWritable outValue = NullWritable.get();
    
        @Override
        protected void reduce(Text key, Iterable<CountVO> values, Context context) throws IOException, InterruptedException {
            List<CountVO> covidList = new ArrayList<>();
            List<CountVO> stateList = new ArrayList<>();
            for (CountVO value : values) {
                if (Objects.equals(value.getType(), "covid_input")) {
                    covidList.add(new CountVO(value));
                } else {
                    stateList.add(new CountVO(value));
                }
            }
            // 对 covidList 根据 stateId 补充 state
            covidList.stream().filter(Objects::nonNull).peek(vo ->
                    vo.setState(stateList.stream()
                            .filter(Objects::nonNull)
                            .filter(s -> Objects.equals(s.getStateId(), vo.getStateId()))
                            .map(CountVO::getState)
                            .findFirst().orElse(null))
            ).forEach(vo -> {
                try {
                    context.write(vo, outValue);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    上面虽然实现了数据的整个,但是 Reduce 端 join最大的问题是整个 join 的工作是在 Reduce 阶段完成的,但是通常情况下MapReduce中 Reduce 的并行度是极小的(默认是1个),这就使得所有的数据都挤压到 Reduce 阶段处理,压力颇大,极易出现数据倾斜现象。

    下面继续看如果在 Map 阶段进行数据的 Join:

    Map 阶段 Join

    Map 阶段 Join 处理,则避免了 shuffle 时候的繁琐,同时 Reduce 阶段的压力也会减小,但是 Map 阶段会根据文件的大小拆分成多个 mapTask 分开并行处理,每个 mapTask 都需要进行数据的 Join 处理,那每个 mapTask 都对被 Join 文件进行完整的 IO 读取也会造成性能低下,不过在 Mapreduce 中有提供分布式缓存机制,可以将指定文件缓存到各个mapTask 运行的机器上,实践操作如下:

    在驱动类中声明 job 时,指定缓存文件:

     job.addCacheFile(new URI("/test/input1/state.txt"));
    
    • 1

    在 Mapper 中,可以读取该文件解析成 Map 使用:

    @Slf4j
    public class CovidJoinMapper extends Mapper<LongWritable, Text, CountVO, NullWritable> {
    
        Map<Integer, String> stateMap = new HashMap<>();
        CountVO outKey = new CountVO();
        NullWritable outValue = NullWritable.get();
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //读取缓存文件,直接指定文件名
            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("state.txt")));
            String line = null;
            while ((line = br.readLine()) != null) {
                String[] fields = line.toString().split("\t");
                stateMap.put(Integer.parseInt(fields[0]), fields[1]);
            }
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split("\t");
            outKey.set(fields[0], fields[1], Integer.parseInt(fields[2]), fields[3], Long.parseLong(fields[4]), Long.parseLong(fields[5]), "covid_input");
            outKey.setState(stateMap.get(Integer.parseInt(fields[2])));
            context.write(outKey, outValue);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    这种方式虽然可以提升处理的速度,但也有缺陷,如果缓存文件大的话,势必会造成不必要的内存浪费,这种情况可以考虑引入第三方工具,Redis、HBase 等 NoSql ,将被 Join 的数据存至 NoSql 中,在 Mapper 中查找相关数据进行整合,然后给到 Reduce 聚合处理。

  • 相关阅读:
    236. 二叉树的最近公共祖先
    八大排序(四)--------直接插入排序
    鸿蒙入门-13Gauge组件
    【LCD应用编程】绘制点、线、矩形框
    【Rust日报】2022-06-30 - 第 3 届开源操作系统学习训练营
    【Python(一)】环境搭建之Anaconda安装
    好书赠送丨海伦·尼森鲍姆著:《场景中的隐私——技术、政治和社会生活中的和谐》,王苑等译
    基于Yolov8的NEU-DET钢材表面缺陷检测,优化组合新颖程度较高:CVPR2023 PConv和BiLevelRoutingAttention,涨点明显
    凤凰架构2——访问远程服务
    消费品行业报告:化妆品容器市场现状研究分析与发展前景预测
  • 原文地址:https://blog.csdn.net/qq_43692950/article/details/127671525