先说结论:spark sql和hive不一样,spark对count(distinct)做了group by优化
在hive中count().
hive往往只用一个 reduce 来处理全局聚合函数,最后导致数据倾斜;在不考虑其它因素的情况下,我们的优化方案是先 group by 再 count 。
--优化前
select count(distinct id) from table_a
--优化后
select
count(id)
from
(
select
id
from table_a group by id
) tmp
在使用spark sql 时,不用担心这个问题,因为 spark 对count distinct 做了优化:
explain
select
count(distinct id),
count(distinct name)
from table_a
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
+- Exchange SinglePartition
+- *(2) HashAggregate(keys=[], functions=[partial_count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), partial_count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
+- *(2) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
+- Exchange(coordinator id: 387101114) hashpartitioning(table_a.`name`#147006, table_a.`id`#147007, gid#147005, 4096), coordinator[target post-shuffle partition size: 67108864]
+- *(1) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
+- *(1) Expand [List(name#146984, null, 1), List(null, id#146979, 2)], [table_a.`name`#147006, table_a.`id`#147007, gid#147005]
+- *(1) Project [id#146979, name#146984]
+- *(1) FileScan parquet table_a
从上述执行计划可以看到,expand,那为什么为产生数据膨胀呐?
distinct算子在处理过程中是将distinct后的字段和group by字段共同作为key传入reduce,导致shuffle前map阶段没有预聚合,同时shuffle时网络传输数据量过大消耗增加,对reduce处理时负载也增大
distinct算子在处理过程中会将原有数据膨胀,有N个DISTINCT关键字数据就会在map端膨胀N倍,同时对shuffle和reduce的长尾影响(原因1)也会扩大N
expand 之后,再以id、name 为 key 进行HashAggregate 也就是 group by ,这样以来,就相当于去重了。后面直接计算count (id) 、 count(name) 就可以,把数据分而治之。 在一定程度上缓解了数据倾斜。
val sql:String =
s"""
|select
| count(distinct sha1),
| count(distinct task_id),
| count(distinct task_type)
|from tmp
|""".stripMargin
val df2: DataFrame = session.sql(sql)
df2.show()
df2.explain(true)
val sql1:String =
s"""
|select
| count(sha1),
| count(task_id),
| count(task_type)
|from (
|select sha1,task_id,task_type
|from tmp
|group by grouping sets(sha1, task_id, task_type)
|)
|""".stripMargin
val df22: DataFrame = session.sql(sql1)
df22.explain(true)
df22.show()
在spark sql里面小数据量的话,count(distinct)和gruop by的执行时间是差不多的,
但是我看到有篇文章介绍的是大数据量的distinct和group by的对比,说的是大数据量的话无法在内存里HashAggregate也就是group by,两者的执行时间的差距还是很大的。具体的还没测试。。。
def rewrite(a: Aggregate): Aggregate = {
// 把所有聚合表式取出来
val aggExpressions = a.aggregateExpressions.flatMap { e =>
e.collect {
case ae: AggregateExpression => ae
}
}
// 抽取出含有 distinct的聚合表达式
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
if (unfoldableChildren.nonEmpty) {
// Only expand the unfoldable children
unfoldableChildren
} else {
e.aggregateFunction.children.take(1).toSet
}
}
//todo 当有多个distinct聚合表达式时,进行expand
if (distinctAggGroups.size > 1) {
// 创建gid标志
val gid = AttributeReference("gid", IntegerType, nullable = false)()
val groupByMap = a.groupingExpressions.collect {
case ne: NamedExpression => ne -> ne.toAttribute
case e => e -> AttributeReference(e.sql, e.dataType, e.nullable)()
}
val groupByAttrs = groupByMap.map(_._2)
....
}
// 构建Expand算子
val expand = Expand(
regularAggProjection ++ distinctAggProjections,
groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2),
a.child)
.....
}
重点代码:
//todo 当有多个distinct聚合表达式时,进行expand
if (distinctAggGroups.size > 1) { expand }
grouping sets 、rollup 、cube 是用来处理多维分析的函数:
grouping sets:对分组集中指定的组表达式的每个子集执行group by,group by A,B grouping sets(A,B)就等价于 group by A union group by B,其中A和B也可以是一个集合,比如group by A,B,C grouping sets((A,B),(A,C))。
rollup:在指定表达式的每个层次级别创建分组集。group by A,B,C with rollup首先会对(A、B、C)进行group by,然后对(A、B)进行group by,然后是(A)进行group by,最后对全表进行group by操作。
cube : 为指定表达式集的每个可能组合创建分组集。首先会对(A、B、C)进行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),©,最后对全表进行group by操作。
前文也说了,grouping sets也是利用expand的方式
上文我们基本可以了解到了,是由于expand导致的慢,优化方向可以朝着减少distinct关键的出现的次数,减少数据膨胀方向入手
但是这样有一个弊端:同时启动太多task 会造成集群资源紧张,也会导致其它任务没有资源。并且数据是 逐日增加的,总体上不好控制。
从sql结构上:
可以把计算的指标拆开,分两次计算,然后再 join。
总体的处理原则就是,让过滤掉的数据尽量的多,expand 时的数据尽量少: