• Spark - LeftOuterJoin 结果条数与左表条数不一致


    一.引言

    使用 spark lefOuterJoin 寻找下发的 gap,用原始下发 rdd 左join 真实下发后发现最终的结果数与左表不一致,左表数据: 20350,最终数据: 25721。一直以来使用 Hive 都是默认 leftJoin 左表应该与结果一致,所以开始排查。

    二.问题排查

    20350 条变成 25721 条数据,所以大概率是出现了同 key 的情况,分别检查两边的数据,发现左表、右表均有相同的下发记录,所以导致最终进入循环的数目 countNum 超过了左表的行数,为了避免之后再遇到这样的问题,下面遍历下常见的情况,先初始化一个 SaprkContext 并添加 3对 pairRdd,其中 rddA,rddC 存在重复 key,rddB 无重复 key:

    1. val conf = new SparkConf().setAppName("TestLefterJoin").setMaster("local[5]")
    2. val spark = SparkSession
    3. .builder
    4. .config(conf)
    5. .getOrCreate()
    6. val sc = spark.sparkContext
    7. sc.setLogLevel("error")
    8. val rddA = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6")))
    9. val rddB = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5")))
    10. val rddC = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "7")))

    1.左表 key 有重复

    1. rddA.leftOuterJoin(rddB).foreach(info => {
    2. println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
    3. })

    左表 (A,1),(A,6) 重复,二者分别与右表的 (A, 1) 匹配,所以分别得到 (A, 1, 1) 和 (A, 6, 1) ,如果右表没有 "A" 的 key,匹配结果是 (A, 1, NULL) 与 (A, 6, NULL)

    1. (B,2,2)
    2. (D,4,4)
    3. (E,5,5)
    4. (A,1,1)
    5. (C,3,3)
    6. (A,6,1)

     结论:左表有重复 left join 后结果与左表行数一致

    2.右表 key 有重复

    1. rddB.leftOuterJoin(rddA).foreach(info => {
    2. println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
    3. })

    将上述 rddA 与 rddB 对调得到右表有重复的结果,(A, 1) 分别有右表 (A, 1) 与 (A, 6) 匹配得到 (A, 1, 1) 与 (A, 1, 6),结果一对多

    1. (A,1,1)
    2. (C,3,3)
    3. (E,5,5)
    4. (B,2,2)
    5. (D,4,4)
    6. (A,1,6)

    结论:右表有重复 left join 后结果与左表行数不一致,增加行数为右表重复 key 的数 - 1

    3.左右表 key 都有重复

    1. rddA.leftOuterJoin(rddC).foreach(info => {
    2. println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
    3. })

    左表 (A,1) 、(A,6) 与右表 (A,1)、(A,7)  直接得到 2x2 四种匹配,比左表多2条数据

    1. (B,2,2)
    2. (C,3,3)
    3. (E,5,5)
    4. (A,1,1)
    5. (D,4,4)
    6. (A,1,7)
    7. (A,6,1)
    8. (A,6,7)

    结论:左右表有重复 left join 后结果与左表行数不一致, 增加行数为右表重复 key 的数目

    4.左表 key 有 null 且重复

    1. val rddANull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "7"), (null, "8")))
    2. val rddBNull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5")))
    3. rddANull.leftOuterJoin(rddBNull).foreach(info => {
    4. println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
    5. })

    左表的 (null, 7) , (null, 8) 会把 null 当做单独的 key 匹配,所以不影响 

    1. (B,2,2)
    2. (E,5,5)
    3. (null,7,NULL)
    4. (C,3,3)
    5. (A,1,1)
    6. (A,6,1)
    7. (D,4,4)

    结论:左表有重复 null key 不影响 left join 与行数

    5.右表 key 有 null 且重复

    1. val rddANull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "7"), (null, "8")))
    2. val rddCNull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "7"), (null, "8")))
    3. rddCNull.leftOuterJoin(rddANull).foreach(info => {
    4. println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
    5. })

    左表 (null, 8) 与右表 (null,7)、(null,8) 匹配得到两条记录。 

    1. (B,2,2)
    2. (C,3,3)
    3. (D,4,4)
    4. (E,5,5)
    5. (null,8,7)
    6. (null,8,8)
    7. (A,1,1)
    8. (A,1,6)
    9. (A,7,1)
    10. (A,7,6)

    结论:右表有重复 null key 影响 left join 行数,增加数目为右表重复 key 数 - 1 

    6.左右表都有重复 null key

    1. val rddANull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "7"), (null, "8")))
    2. rddANull.leftOuterJoin(rddANull).foreach(info => {
    3. println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
    4. })

    两边都有 (null,7)、(null,8) ,和上面正常 key 左右表重复结果相同,多2条记录

    1. (B,2,2)
    2. (D,4,4)
    3. (E,5,5)
    4. (null,7,7)
    5. (null,7,8)
    6. (null,8,7)
    7. (null,8,8)
    8. (A,1,1)
    9. (A,1,6)
    10. (A,6,1)
    11. (A,6,6)
    12. (C,3,3)

    结论:左右均重复 null key 时影响 left join 行数,其中增加行数为重复 null key的数

    Tips:

    经过上面3次试验可以看到 null 作为 pairRdd 的 key 在进行 join 时和正常的 key join 时是一样的,唯一的区别是处理这类型的 key 时需要注意非 null 的判断,否则容易报错

    7.表中包含纯 null

    1. val rddDNull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "9"), (null, "10")))
    2. val rddENull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), null))
    3. rddENull.leftOuterJoin(rddDNull).foreach(info => {
    4. println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
    5. })
    6. rddDNull.leftOuterJoin(rddENull).foreach(info => {
    7. println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
    8. })

    不管是左表有纯 null 还是右表有纯 null 或者都有 null,都会报错 NullPoint:

    结论:pairRdd 中有纯 null 使用 join 会报错

    三.问题修复

    上面遍历了重复和 null 的问题,主要导致左join与左表条数不一致的原因还是右表重复key导致,所以问题修复主要是去重:

    A.distinct 

    直接对 rdd 全局去重,但是只能去除相同的 (key, value)

    B.groupByKey

    将 (key, value1)、(key, value2) .... 相同 key 的 pairRdd 元素聚合

    上述两种方法是 PairRdd 常用的去重方法,不过怎么去重还需要结合业务场景,如果确实是相同的多余日志则使用 distinct,如果确实有重复日志且需要聚合信息则采用 groupByKey 、reduceByKey 等聚合方式,当然如果左右表都有重复且场景确需,正常 join 即可。

    四.总结

    这里 spark pairRdd leftJoin 可能增加结果的行数,使用 spark DataFrame 使用 join 时:

    1. val sqlContext = new SQLContext(sc)
    2. documentDFA.join(documentDFB).select("xxx").where("xxx")

    使用 select + where 得到的结果不一定会大于等于左表行数。再回看一下引言的数据,左表数据: 20350,最终数据: 25721,共增加了 5371 行,如果右表单独重复 Xi 个 key,每个 key 重复数目 Mi 个,左右表共重复 Yi 个 key,每个 key 重复数目 Ni 个,按照上面的公式应该满足:

    \sum X_i(M_i-1) + YiNi = 5371

  • 相关阅读:
    【Web前端面试】葵花宝典(2022版本)——React 篇
    并发编程之生产者消费者模型
    死磕GMSSL通信-java/Netty系列(三)
    Apollo之虚拟机部署
    访问修饰符你用对了吗
    偷梁换柱“Windows 11安装包”竟成了恶意程序?
    【Python 千题 —— 基础篇】读取字符串
    商务社交中为何电子名片这么火?都在使用哪一款免费的电子名片?
    【不爱施肥的小布】python实现-附ChatGPT解析
    JVM的GC算法CMS和G1
  • 原文地址:https://blog.csdn.net/BIT_666/article/details/125402026