使用 spark lefOuterJoin 寻找下发的 gap,用原始下发 rdd 左join 真实下发后发现最终的结果数与左表不一致,左表数据: 20350,最终数据: 25721。一直以来使用 Hive 都是默认 leftJoin 左表应该与结果一致,所以开始排查。
20350 条变成 25721 条数据,所以大概率是出现了同 key 的情况,分别检查两边的数据,发现左表、右表均有相同的下发记录,所以导致最终进入循环的数目 countNum 超过了左表的行数,为了避免之后再遇到这样的问题,下面遍历下常见的情况,先初始化一个 SaprkContext 并添加 3对 pairRdd,其中 rddA,rddC 存在重复 key,rddB 无重复 key:
- val conf = new SparkConf().setAppName("TestLefterJoin").setMaster("local[5]")
- val spark = SparkSession
- .builder
- .config(conf)
- .getOrCreate()
- val sc = spark.sparkContext
- sc.setLogLevel("error")
-
- val rddA = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6")))
- val rddB = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5")))
- val rddC = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "7")))
- rddA.leftOuterJoin(rddB).foreach(info => {
- println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
- })
左表 (A,1),(A,6) 重复,二者分别与右表的 (A, 1) 匹配,所以分别得到 (A, 1, 1) 和 (A, 6, 1) ,如果右表没有 "A" 的 key,匹配结果是 (A, 1, NULL) 与 (A, 6, NULL)
- (B,2,2)
- (D,4,4)
- (E,5,5)
- (A,1,1)
- (C,3,3)
- (A,6,1)
结论:左表有重复 left join 后结果与左表行数一致
- rddB.leftOuterJoin(rddA).foreach(info => {
- println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
- })
将上述 rddA 与 rddB 对调得到右表有重复的结果,(A, 1) 分别有右表 (A, 1) 与 (A, 6) 匹配得到 (A, 1, 1) 与 (A, 1, 6),结果一对多
- (A,1,1)
- (C,3,3)
- (E,5,5)
- (B,2,2)
- (D,4,4)
- (A,1,6)
结论:右表有重复 left join 后结果与左表行数不一致,增加行数为右表重复 key 的数 - 1
- rddA.leftOuterJoin(rddC).foreach(info => {
- println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
- })
左表 (A,1) 、(A,6) 与右表 (A,1)、(A,7) 直接得到 2x2 四种匹配,比左表多2条数据
- (B,2,2)
- (C,3,3)
- (E,5,5)
- (A,1,1)
- (D,4,4)
- (A,1,7)
- (A,6,1)
- (A,6,7)
结论:左右表有重复 left join 后结果与左表行数不一致, 增加行数为右表重复 key 的数目
- val rddANull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "7"), (null, "8")))
- val rddBNull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5")))
- rddANull.leftOuterJoin(rddBNull).foreach(info => {
- println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
- })
左表的 (null, 7) , (null, 8) 会把 null 当做单独的 key 匹配,所以不影响
- (B,2,2)
- (E,5,5)
- (null,7,NULL)
- (C,3,3)
- (A,1,1)
- (A,6,1)
- (D,4,4)
结论:左表有重复 null key 不影响 left join 与行数
- val rddANull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "7"), (null, "8")))
- val rddCNull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "7"), (null, "8")))
- rddCNull.leftOuterJoin(rddANull).foreach(info => {
- println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
- })
左表 (null, 8) 与右表 (null,7)、(null,8) 匹配得到两条记录。
- (B,2,2)
- (C,3,3)
- (D,4,4)
- (E,5,5)
- (null,8,7)
- (null,8,8)
- (A,1,1)
- (A,1,6)
- (A,7,1)
- (A,7,6)
结论:右表有重复 null key 影响 left join 行数,增加数目为右表重复 key 数 - 1
- val rddANull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "7"), (null, "8")))
- rddANull.leftOuterJoin(rddANull).foreach(info => {
- println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
- })
两边都有 (null,7)、(null,8) ,和上面正常 key 左右表重复结果相同,多2条记录
- (B,2,2)
- (D,4,4)
- (E,5,5)
- (null,7,7)
- (null,7,8)
- (null,8,7)
- (null,8,8)
- (A,1,1)
- (A,1,6)
- (A,6,1)
- (A,6,6)
- (C,3,3)
结论:左右均重复 null key 时影响 left join 行数,其中增加行数为重复 null key的数
Tips:
经过上面3次试验可以看到 null 作为 pairRdd 的 key 在进行 join 时和正常的 key join 时是一样的,唯一的区别是处理这类型的 key 时需要注意非 null 的判断,否则容易报错
- val rddDNull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "9"), (null, "10")))
- val rddENull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), null))
- rddENull.leftOuterJoin(rddDNull).foreach(info => {
- println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
- })
-
- rddDNull.leftOuterJoin(rddENull).foreach(info => {
- println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
- })
不管是左表有纯 null 还是右表有纯 null 或者都有 null,都会报错 NullPoint:
结论:pairRdd 中有纯 null 使用 join 会报错
上面遍历了重复和 null 的问题,主要导致左join与左表条数不一致的原因还是右表重复key导致,所以问题修复主要是去重:
A.distinct
直接对 rdd 全局去重,但是只能去除相同的 (key, value)
B.groupByKey
将 (key, value1)、(key, value2) .... 相同 key 的 pairRdd 元素聚合
上述两种方法是 PairRdd 常用的去重方法,不过怎么去重还需要结合业务场景,如果确实是相同的多余日志则使用 distinct,如果确实有重复日志且需要聚合信息则采用 groupByKey 、reduceByKey 等聚合方式,当然如果左右表都有重复且场景确需,正常 join 即可。
这里 spark pairRdd leftJoin 可能增加结果的行数,使用 spark DataFrame 使用 join 时:
- val sqlContext = new SQLContext(sc)
- documentDFA.join(documentDFB).select("xxx").where("xxx")
使用 select + where 得到的结果不一定会大于等于左表行数。再回看一下引言的数据,左表数据: 20350,最终数据: 25721,共增加了 5371 行,如果右表单独重复 Xi 个 key,每个 key 重复数目 Mi 个,左右表共重复 Yi 个 key,每个 key 重复数目 Ni 个,按照上面的公式应该满足: