• HIVE 3 使用 MR 引擎多表关联 (JOIN) 导致丢数的问题复现、问题根源及解决方案 (附代码)


    概述

    本文意图解决 HIVE 3 版本中使用 MR 作为运算引擎进行 JOIN 操作时导致的丢数情况。

    问题描述

    Apache Hive 在 2.3 版本后宣布放弃维护 MapReduce 作为底层执行引擎,并转而使用 Tez 作为默认的查询引擎。但是由于 Tez 在大作业量和高并发时的严重性能问题,导致许多任务不得不继续使用 MapReduce 进行操作,因此就需要开发者自行维护 Hive 对于 MR 的可用性。

    然而,在 Hive 升级至 Hive 3 版本中,继续使用 MapReduce 会导致非常严重的恶性错误。例如,即使进行非常简单的 JOIN 操作,都会导致部分应该被关联上的数据丢失。

    本文档意图提供测试场景浮现上述恶性漏洞,并阐述其根本原因,最后对出现问题部分的源代码进行修改,以彻底修复该问题。

    问题复现

    场景1: 多表 (超过三张表) 时数据丢失

    在复现开始之前先对 Hive 的部分参数进行设置:

    1. SET hive.execution.engine=mr;
    2. SET mapred.reduce.tasks=2;
    3. SET hive.auto.convert.join=false;

    首先,创建三张表。这三张表除了表名不一样,其他包括列信息甚至数据在内完全相同。

    建表语句如下。我们使用文件的形式快速插入,当然为了复现这个问题您也可以手动插入如下数据:

    1. USE default;
    2. create table table_a(id string, name string, addr string) stored as orc;
    3. create table table_b(id string, name string, addr string) stored as orc;
    4. create table table_c(id string, name string, addr string) stored as orc;
    5. LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_a_data.orc" INTO TABLE table_a;
    6. LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_b_data.orc" INTO TABLE table_b;
    7. LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_c_data.orc" INTO TABLE table_c;

    通过以下语句查看三张表的内容,可以看到其中的数据完全一致。

    1. hive> select * from table_a;
    2. OK
    3. 11 a aaa
    4. 22 b bbb
    5. 33 c ccc
    6. 44 d ddd
    7. 55 e eee
    8. 66 f fff
    9. 77 g ggg
    10. 88 h hhh
    11. 99 i iii
    12. 00 j jjj
    13. Time taken: 0.157 seconds, Fetched: 10 row(s)
    14. hive> select * from table_b;
    15. OK
    16. 11 a aaa
    17. 22 b bbb
    18. 33 c ccc
    19. 44 d ddd
    20. 55 e eee
    21. 66 f fff
    22. 77 g ggg
    23. 88 h hhh
    24. 99 i iii
    25. 00 j jjj
    26. Time taken: 0.471 seconds, Fetched: 10 row(s)
    27. hive> select * from table_c;
    28. OK
    29. 11 a aaa
    30. 22 b bbb
    31. 33 c ccc
    32. 44 d ddd
    33. 55 e eee
    34. 66 f fff
    35. 77 g ggg
    36. 88 h hhh
    37. 99 i iii
    38. 00 j jjj
    39. Time taken: 0.186 seconds, Fetched: 10 row(s)

    在确认三张表的数据准确无误后,使用如下关联语句对三张表进行关联:

    select a.id as a_id, b.name as b_name, c.addr as c_addr from table_a a join table_b b on(a.id=b.id) join table_c c on(c.name=b.name);  

    关联结果如下,数据丢失的结果令人咋舌。

    1. MapReduce Jobs Launched:
    2. Stage-Stage-1: HDFS Read: 23439 HDFS Write: 5508 SUCCESS
    3. Stage-Stage-2: HDFS Read: 26292 HDFS Write: 5508 SUCCESS
    4. Total MapReduce CPU Time Spent: 0 msec
    5. OK
    6. 22 b bbb
    7. 66 f fff
    8. 88 h hhh
    9. 55 e eee
    10. 99 i iii
    11. Time taken: 3.343 seconds, Fetched: 5 row(s)

    可以非常明显地看到,本来应该被完全关联在一起的 10 条数据,居然出现了严重的数据丢失。有一半的数据竟然没有被成功关联。如果多次运行关联语句,可以发现这不是偶然情况。每次关联 2 张表以上的数据都会出现极为严重的数据丢失问题。

    场景2: 表的某些属性 (e.g. bucketing_version) 不同时,即使两张表关联也会导致数据丢失

    使用如下数据进行数据建表关联。在建表时使用不同的 bucketing_version 进行表的初始化。

    数据文件如下:

    1. 0,Kurt,vulnedcasey@yahoo.co.uk
    2. 1,Rolland,naejose@gmx.com
    3. 2,Cortez,blategarfield@yahoo.com
    4. 3,Tyron,tameprobes@gmail.com
    5. 4,Matthew,wellezekiel@yahoo.co.uk
    6. 5,Jeffrey,fabingeborg@comcast.net
    7. 6,Gerard,oughtoutgo@att.net
    8. 7,Hal,coursedmauro@hotmail.com
    9. 8,Virgil,squintprude@gmail.com
    10. 9,Hector,lewddillon@email.com

    利用如下语句建表,并在建表时使用不同的 bucketing_version 属性。

    1. CREATE TABLE `join_test_1`(`id` string, `first` string, `email` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' TBLPROPERTIES ('bucketing_version'='1');
    2. LOAD DATA LOCAL INPATH '/home/hadoop/reproduce_hive/Scenario2/test_data.csv' OVERWRITE INTO TABLE join_test_1;
    3. CREATE TABLE `join_test_2`(`id` string, `first` string, `email` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' TBLPROPERTIES ('bucketing_version'='2');
    4. LOAD DATA LOCAL INPATH '/home/hadoop/reproduce_hive/Scenario2/test_data.csv' OVERWRITE INTO TABLE join_test_2;

    运行关联操作的 SQL 语句:

    1. SET hive.execution.engine=mr;
    2. SET mapred.reduce.tasks=2;
    3. SET hive.auto.convert.join=false;
    4. SELECT * from (SELECT id from join_test_1) as tbl1 LEFT JOIN (SELECT id from join_test_2) as tbl2 on tbl1.id = tbl2.id;

    查询关联结果,令人惊讶的事情再一次发生:

    1. Ended Job = job_local184369678_0005
    2. MapReduce Jobs Launched:
    3. Stage-Stage-1: HDFS Read: 28434 HDFS Write: 7956 SUCCESS
    4. Total MapReduce CPU Time Spent: 0 msec
    5. OK
    6. 0 NULL
    7. 2 NULL
    8. 4 NULL
    9. 6 NULL
    10. 8 8
    11. 1 NULL
    12. 3 NULL
    13. 5 5
    14. 7 NULL
    15. 9 NULL

    蕴含同样数据的两张表,仅仅由于建表时的某些属性不同,就导致了绝大部分数据的关联都不成功。数据最基本的准确性都无法得到保障,这毫无疑问是 HIVE 3 中非常致命的问题。

    问题根源

    由于该问题影响过于严重,导致许多使用 HIVE 的开发者第一时间发现了本问题并及时进行了 Bug Report。在 HIVE Jira 上面可以看到非常多的针对该问题的问题报告和可能的解决方案。

    本文主要采用了 HIVE JIRA 中编号为 HIVE-22098 的问题描述和相应解决思路。

    根据 HIVE-22098 的问题描述,究其根源,是由于 HIVE 2 与 HIVE 3 在 JOIN 操作时使用了不同的 Hash 算法,导致同样的值在关联时被不同的 Hash 算法映射成了不同的值,而这些不同的 Hash 值在进行关联时无法被相互匹配。最终导致本来该被关联在一起的数据由于 Hash 值得不同未能被关联在一起。而决定到底应用哪套 Hash 值算法则是根据 bucketing_version 的值来进行评判的。

    特别地,在进行多表关联时,即使相同 bucketing_version 的 Hive 表,由于其关联的中间过程所产生的中间表,在源代码中 bucketing_version 值会被置为 -1,因此该中间表再与第三张乃至更多的表关联时会直接导致 Hash 算法的混乱计算。

    因此,为了保障关联的数据准确性,必须要确保 bucketing_version 在进行多表关联或者多版本表关联时的稳定。即,保障 bucketing_version 的稳定性就是保证 Hive 3 数据关联时的准确性。

    此外,HIVE 社区已经针对 bucketing_version 不稳定的问题进行了集中的问题汇总和修改建议指导。可以通过查看 JIRA: HIVE-21304 了解系统性的 bucketing_version 稳定性提高方法,此处不做过多赘述。

    Ps: 该问题还可能导致许多其他异常的出现,比如 HIVE-18983HIVE-20164HIVE-22429 等诸多问题的出现。因此该 BUG 的严重级别是最高的。修复了本问题,其余数十个问题也就都可以迎刃而解。

    解决思路

    由于 HIVE 中 JOIN 操作执行流程的本质是一个二叉树,因此我们只需要通过算法在关联时遍历每个节点,并将每个节点的 bucketing_version 在关联前手动设置为该二叉树中的最高版本,即可保证 bucketing_version 在关联时的稳定,也就可以保障关联不丢数。

    源码修改及编译上传

    将如下代码替换至原代码即可修复本问题。

    下面给出 Patch 中的 Git 代码,对相应的类进行修复。Patch 的代码在 Jira HIVE-22098 中都可以找到。在此衷心感谢各位 Code Contributors 对于 HIVE 社区的贡献。

    1. From c0774da927451008ba78ed7b8637a1a4899d9e12 Mon Sep 17 00:00:00 2001
    2. From: luguangming <luguangming1@huawei.com>
    3. Date: Mon, 12 Aug 2019 14:24:05 +0800
    4. Subject: [PATCH]HIVE-22098
    5. ---
    6. .../apache/hadoop/hive/ql/exec/mr/ExecMapper.java | 41 ++++++++++++++++++++++
    7. 1 file changed, 41 insertions(+)
    8. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
    9. index 99b33a3..d0c847e 100644
    10. --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
    11. +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
    12. @@ -20,10 +20,12 @@
    13. import java.io.IOException;
    14. import java.net.URLClassLoader;
    15. +import java.util.ArrayList;
    16. import java.util.Arrays;
    17. import java.util.List;
    18. import java.util.Map;
    19. +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
    20. import org.slf4j.Logger;
    21. import org.slf4j.LoggerFactory;
    22. import org.apache.hadoop.conf.Configuration;
    23. @@ -104,6 +106,10 @@ public void configure(JobConf job) {
    24. // initialize map operator
    25. mo.initialize(job, null);
    26. mo.setChildren(job);
    27. +
    28. + // defined self balance ReduceSinkOperator of bucketVersion
    29. + balanceRSOpbucketVersion(mo);
    30. +
    31. l4j.info(mo.dump(0));
    32. // initialize map local work
    33. localWork = mrwork.getMapRedLocalWork();
    34. @@ -138,6 +144,41 @@ public void configure(JobConf job) {
    35. }
    36. }
    37. }
    38. +
    39. + /**
    40. + * defined-self balance ReduceSinkOperator of bucketVersion, keep values to sameness
    41. + * @param rootOp
    42. + */
    43. + private static void balanceRSOpbucketVersion(Operator rootOp){
    44. + List<Operator<? extends OperatorDesc>> needDealOps = new ArrayList>();
    45. + visitChildGetRSOps(rootOp, needDealOps);
    46. + int bucketVersion = -1;
    47. + for(Operator<? extends OperatorDesc> rsop : needDealOps){
    48. + if(rsop.getBucketingVersion() != 2 && rsop.getBucketingVersion() != 1){
    49. + rsop.setBucketingVersion(-1);
    50. + }
    51. + if(rsop.getBucketingVersion() > bucketVersion){
    52. + bucketVersion = rsop.getBucketingVersion();
    53. + }
    54. + }
    55. + for(Operator<? extends OperatorDesc> rsop : needDealOps){
    56. + l4j.info("update reduceSinkOperator name="+rsop.getName()+", opId="+rsop.getOperatorId()+", oldBucketVersion="+rsop.getBucketingVersion()+", newBucketVersion="+bucketVersion);
    57. + rsop.setBucketingVersion(bucketVersion);
    58. + }
    59. + needDealOps.clear();
    60. + }
    61. + private static void visitChildGetRSOps(Operator rootOp, List<Operator<? extends OperatorDesc>> needDealOps){
    62. + List<Operator<? extends OperatorDesc>> ops = rootOp.getChildOperators();
    63. + if(ops == null || ops.isEmpty()){
    64. + return;
    65. + }
    66. + for(Operator<? extends OperatorDesc> op : ops) {
    67. + if (op instanceof ReduceSinkOperator) {
    68. + needDealOps.add(op);
    69. + }
    70. + visitChildGetRSOps(op, needDealOps);
    71. + }
    72. + }
    73. @Override
    74. public void map(Object key, Object value, OutputCollector output,
    75. Reporter reporter) throws IOException {
    76. --
    77. 2.9.2

    编译对应的模块 hive-exec-3.1.2.jar,并将该 Jar 包替换 Hive 3 自带的 Jar 包。编译 HIVE 的命令可以去查询 HIVE 的官方文档,这里不做过多赘述。

    重启 Hive 让其重新加载我们修改源码后的 Jar 包,再次重复上述两个场景,即可观察到 MapReduce 的结果正常,该问题被成功修复。

    关联结果示例:

    1. Ended Job = job_local184369678_0006
    2. MapReduce Jobs Launched:
    3. Stage-Stage-1: HDFS Read: 22434 HDFS Write: 7966 SUCCESS
    4. Total MapReduce CPU Time Spent: 0 msec
    5. OK
    6. 0 0
    7. 1 1
    8. 2 2
    9. 4 4
    10. 6 6
    11. 8 8
    12. 3 3
    13. 5 5
    14. 7 7
    15. 9 9

    小结

    HIVE JIRA 中有许多关于异常信息和报错的讨论。经常性地浏览社区,配合阅读源代码可以对 HIVE 的理解更加深入。

    再次,HIVE 社区已经针对 bucketing_version 不稳定的问题进行了集中的问题汇总和修改建议指导。可以通过查看 JIRA: HIVE-21304 了解系统性的 bucketing_version 稳定性提高方法,此处不做过多赘述。

    希望本篇文章对您的 HIVE 使用有所帮助。

    References

  • 相关阅读:
    NET8中增加的简单适用的DI扩展库Microsoft.Extensions.DependencyInjection.AutoActivation
    Java毕业设计之基于SSM实现的宠物销售网站
    TikTok营销 如何进行TikTok养号?有什么简单有用的技巧
    Qt应用软件【文件篇】json文件读写
    Java面试题之并发
    React render方法的原理?在什么时候会被触发?
    2023高教社杯全国大学生数学建模竞赛E题代码解析
    python使用钉钉机器人给钉钉发送消息
    (02)Cartographer源码无死角解析-(24) Collator类与顺序多队列类
    GitLab CI/CD关键词(十二):条件限定,only ,except,触发规则rules,工作流workflow
  • 原文地址:https://blog.csdn.net/weixin_38070561/article/details/126895259