• Spark Sql之count(distinct)分析&&学习&&验证


    学习内容

    • spark sql count(distinct)
    • 数据膨胀
    • count(distinct)原理
    • grouping sets原理
    • count(distinct)优化

    spark 对count(distinct)的优化

    先说结论:spark sql和hive不一样,spark对count(distinct)做了group by优化

    在hive中count().
    hive往往只用一个 reduce 来处理全局聚合函数,最后导致数据倾斜;在不考虑其它因素的情况下,我们的优化方案是先 group by 再 count 。

    --优化前
    select count(distinct id) from table_a 
    
    --优化后
    select 
      count(id)
    from
    (
        select 
            id
        from table_a group by id
    ) tmp
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在使用spark sql 时,不用担心这个问题,因为 spark 对count distinct 做了优化:

    explain 
    select 
        count(distinct id),
        count(distinct name) 
    from table_a
    
    • 1
    • 2
    • 3
    • 4
    • 5
    == Physical Plan ==
    *(3) HashAggregate(keys=[], functions=[count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
    +- Exchange SinglePartition
       +- *(2) HashAggregate(keys=[], functions=[partial_count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), partial_count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
          +- *(2) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
             +- Exchange(coordinator id: 387101114) hashpartitioning(table_a.`name`#147006, table_a.`id`#147007, gid#147005, 4096), coordinator[target post-shuffle partition size: 67108864]
                +- *(1) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
                   +- *(1) Expand [List(name#146984, null, 1), List(null, id#146979, 2)], [table_a.`name`#147006, table_a.`id`#147007, gid#147005]
                      +- *(1) Project [id#146979, name#146984]
                         +- *(1) FileScan parquet table_a
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    数据膨胀原理

    从上述执行计划可以看到,expand,那为什么为产生数据膨胀呐?

    distinct算子在处理过程中是将distinct后的字段和group by字段共同作为key传入reduce,导致shuffle前map阶段没有预聚合,同时shuffle时网络传输数据量过大消耗增加,对reduce处理时负载也增大

    distinct算子在处理过程中会将原有数据膨胀,有N个DISTINCT关键字数据就会在map端膨胀N倍,同时对shuffle和reduce的长尾影响(原因1)也会扩大N

    在这里插入图片描述

    expand 之后,再以id、name 为 key 进行HashAggregate 也就是 group by ,这样以来,就相当于去重了。后面直接计算count (id) 、 count(name) 就可以,把数据分而治之。 在一定程度上缓解了数据倾斜。

    distinct数据膨胀

     val sql:String =
        s"""
           |select
           |  count(distinct sha1),
           |  count(distinct task_id),
           |  count(distinct task_type)
           |from tmp
           |""".stripMargin
    
        val df2: DataFrame = session.sql(sql)
        df2.show()
        df2.explain(true)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    grouping sets数据膨胀

        val sql1:String =
          s"""
             |select
             |  count(sha1),
             |  count(task_id),
             |  count(task_type)
             |from (
             |select sha1,task_id,task_type
             |from tmp
             |group by grouping sets(sha1, task_id, task_type)
             |)
             |""".stripMargin
    
        val df22: DataFrame = session.sql(sql1)
        df22.explain(true)
        df22.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在这里插入图片描述

    开个坑

    在spark sql里面小数据量的话,count(distinct)和gruop by的执行时间是差不多的,
    但是我看到有篇文章介绍的是大数据量的distinct和group by的对比,说的是大数据量的话无法在内存里HashAggregate也就是group by,两者的执行时间的差距还是很大的。具体的还没测试。。。


    distinct源码

    def rewrite(a: Aggregate): Aggregate = {
        // 把所有聚合表式取出来
        val aggExpressions = a.aggregateExpressions.flatMap { e =>
          e.collect {
            case ae: AggregateExpression => ae
          }
        }
        // 抽取出含有 distinct的聚合表达式
        val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
            val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
            if (unfoldableChildren.nonEmpty) {
              // Only expand the unfoldable children
              unfoldableChildren
            } else {        
              e.aggregateFunction.children.take(1).toSet
            }
        }
        //todo 当有多个distinct聚合表达式时,进行expand
        if (distinctAggGroups.size > 1) {
          // 创建gid标志
          val gid = AttributeReference("gid", IntegerType, nullable = false)()
          val groupByMap = a.groupingExpressions.collect {
            case ne: NamedExpression => ne -> ne.toAttribute
            case e => e -> AttributeReference(e.sql, e.dataType, e.nullable)()
          }
          val groupByAttrs = groupByMap.map(_._2)
          ....     
          }
    
          // 构建Expand算子
          val expand = Expand(
            regularAggProjection ++ distinctAggProjections,
            groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2),
            a.child)        
            .....
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    重点代码:
    //todo 当有多个distinct聚合表达式时,进行expand
    if (distinctAggGroups.size > 1) { expand }


    spark sql grouping sets

    grouping sets 、rollup 、cube 是用来处理多维分析的函数:

    grouping sets:对分组集中指定的组表达式的每个子集执行group by,group by A,B grouping sets(A,B)就等价于 group by A union group by B,其中A和B也可以是一个集合,比如group by A,B,C grouping sets((A,B),(A,C))。

    rollup:在指定表达式的每个层次级别创建分组集。group by A,B,C with rollup首先会对(A、B、C)进行group by,然后对(A、B)进行group by,然后是(A)进行group by,最后对全表进行group by操作。

    cube : 为指定表达式集的每个可能组合创建分组集。首先会对(A、B、C)进行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),©,最后对全表进行group by操作。

    前文也说了,grouping sets也是利用expand的方式


    优化思路

    上文我们基本可以了解到了,是由于expand导致的慢,优化方向可以朝着减少distinct关键的出现的次数,减少数据膨胀方向入手

    1、增加 expand的过程中partition 的数量

    但是这样有一个弊端:同时启动太多task 会造成集群资源紧张,也会导致其它任务没有资源。并且数据是 逐日增加的,总体上不好控制。

    2、缩减expand 的数据量

    从sql结构上:
    可以把计算的指标拆开,分两次计算,然后再 join。
    总体的处理原则就是,让过滤掉的数据尽量的多,expand 时的数据尽量少:

    参考

    参考博客

  • 相关阅读:
    Java访问Scala中的Int类型
    ElasticSearch - ​开启搜索的新境界
    web前端 html+css+javascript游戏网页设计实例 (网页制作课作业)
    图文多模态模型CLIP
    STM32CubeMX教程17 DAC - 输出三角波噪声波
    记录 mybatis plus QuerWapper使用 FIND_IN_SET
    几乎必问,Spring 面试题开胃菜
    PHP废品回收微信小程序系统源码
    【经验之谈·高频PCB电路设计常见的66个问题】
    【echarts】14、echarts+vue2 - 柱状图pictorialBar
  • 原文地址:https://blog.csdn.net/Lzx116/article/details/126153664