• Spark Optimizer 规则详解和示例


    Optimizer 是在 Analyzer 生成 Resolved Logical Plan 后,进行优化的阶段。

    1. Batch Finish Analysis

    有5条优化规则,这些规则都执行一次

    1.1 EliminateSubqueryAliases

    消除查询别名,对应逻辑算子树中的 SubqueryAlias 节点。一般来讲,Subqueries 仅用于提供查询的视角范围信息,一旦 Analyzer 阶段结束,该节点就可以被删除,该优化规则直接将 SubqueryAlias 替换为其子节点。
    如下SQL,子查询 alias 为 t,在 Analyzed Logical Plan 中,还有 SubqueryAlias t节点。

    explain extended select sum(len) from ( select c1,length(c1) len  from t1 group by c1) t;
    
    • 1
    == Analyzed Logical Plan ==
    sum(len): bigint
    Aggregate [sum(len#56) AS sum(len)#64L]
    +- SubqueryAlias t
       +- Aggregate [c1#62], [c1#62, length(c1#62) AS len#56]
          +- SubqueryAlias spark_catalog.test.t1
             +- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#62], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Aggregate [sum(len#56) AS sum(len)#64L]
    +- Aggregate [c1#62], [length(c1#62) AS len#56]
       +- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#62], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    1.2 ReplaceExpressions

    ReplaceExpressions 表达式替换。
    4个替换规则,如下所示。

    case e: RuntimeReplaceable => e.child
        case CountIf(predicate) => Count(new NullIf(predicate, Literal.FalseLiteral))
        case BoolOr(arg) => Max(arg)
        case BoolAnd(arg) => Min(arg)
    
    • 1
    • 2
    • 3
    • 4
    1.2.1 RuntimeReplaceable

    RuntimeReplaceable 是一个 trait,有好多子类,用 child 节点把自己替换。如 Nvl 的child是 Coalesce(Seq(left, right))。那么优化的时候用 child 替换 nvl 。

    case class Nvl(left: Expression, right: Expression, child: Expression) extends RuntimeReplaceable {
    
      def this(left: Expression, right: Expression) = {
        this(left, right, Coalesce(Seq(left, right)))
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    explain extended SELECT nvl(c1,c2) FROM VALUES ('v1', 'v12'), ('v2', 'v22'), ('v3', 'v32') AS tab(c1, c2);
    
    • 1

    输出结果

    == Analyzed Logical Plan ==
    nvl(c1, c2): string
    Project [nvl(c1#85, c2#86) AS nvl(c1, c2)#87]
    +- SubqueryAlias tab
       +- LocalRelation [c1#85, c2#86]
    
    == Optimized Logical Plan ==
    LocalRelation [nvl(c1, c2)#87]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1.2.2 bool_or

    用max替换 bool_or.

    explain extended SELECT bool_or(col) FROM 
    VALUES (true), (false), (false) AS tab(col);
    
    • 1
    • 2

    输出结果

    == Analyzed Logical Plan ==
    bool_or(col): boolean
    Aggregate [bool_or(col#101) AS bool_or(col)#103]
    +- SubqueryAlias tab
       +- LocalRelation [col#101]
    
    == Optimized Logical Plan ==
    Aggregate [max(col#101) AS bool_or(col)#103]
    +- LocalRelation [col#101]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    1.2.3 bool_and

    用 min 替换 bool_and.

    explain extended SELECT bool_and(col) FROM 
    VALUES (true), (false), (false) AS tab(col);
    
    • 1
    • 2

    输出结果:

    == Analyzed Logical Plan ==
    bool_and(col): boolean
    Aggregate [bool_and(col#112) AS bool_and(col)#114]
    +- SubqueryAlias tab
       +- LocalRelation [col#112]
    
    == Optimized Logical Plan ==
    Aggregate [min(col#112) AS bool_and(col)#114]
    +- LocalRelation [col#112]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    1.3 ComputeCurrentTime

    计算当前时间相关的表达式,在同一条 SQL 中可能包含多个计算时间的表达式,如 CurentDate 和 CurrentTimestamp,保证同一个 SQL query 中多个表达式返回相同的值。

    subQuery.transformAllExpressionsWithPruning(transformCondition) {
              case cd: CurrentDate =>
                Literal.create(DateTimeUtils.microsToDays(currentTimestampMicros, cd.zoneId), DateType)
              case CurrentTimestamp() | Now() => currentTime
              case CurrentTimeZone() => timezone
              case localTimestamp: LocalTimestamp =>
                val asDateTime = LocalDateTime.ofInstant(instant, localTimestamp.zoneId)
                Literal.create(localDateTimeToMicros(asDateTime), TimestampNTZType)
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2. BatchUnion

    Combine Union,把相邻的 union 节点可以合并为一个 union 节点,如以下SQL.

    explain extended 
    select c1 from t1 
    union 
    select c1 from t1 where length(c1) = 2 
    union 
    select c1 from t1 where length(c1) = 3;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    输出结果如下, Analyzed Logical Plan 有2个 Union,Optimized Logical Plan 有 1 个 Union.

    == Analyzed Logical Plan ==
    c1: string
    Distinct
    +- Union false, false
       :- Distinct
       :  +- Union false, false
       :     :- Project [c1#161]
       :     :  +- SubqueryAlias spark_catalog.test.t1
       :     :     +- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#161], Partition Cols: []]
       :     +- Project [c1#162]
       :        +- Filter (length(c1#162) = 2)
       :           +- SubqueryAlias spark_catalog.test.t1
       :              +- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#162], Partition Cols: []]
       +- Project [c1#163]
          +- Filter (length(c1#163) = 3)
             +- SubqueryAlias spark_catalog.test.t1
                +- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#163], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Aggregate [c1#161], [c1#161]
    +- Union false, false
       :- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#161], Partition Cols: []]
       :- Filter (isnotnull(c1#162) AND (length(c1#162) = 2))
       :  +- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#162], Partition Cols: []]
       +- Filter (isnotnull(c1#163) AND (length(c1#163) = 3))
          +- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#163], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    3. Batch Subquery

    3.1 OptimizeSubqueries

    SQL语句包含子查询时,会在逻辑算子树上生成 SubqueryExpression 表达式。OptimizeSubqueries 优化规则在遇到 SubqueryExpression 表达式时,进一步调用 Optimizer 对该表达式的子计划进行优化。

    4. Batch Replace Operators

    用来执行算子的替换操作。在SQL语句中,某些查询算子可以直接改写为已有的算子,避免进行重复的逻辑转换。

    4.1 ReplaceIntersectWithSemiJoin

    将 Intersect 操作算子替换为 Left-Semi Join 操作算子,从逻辑上来看,这两种算子是等价的。需要注意的是,ReplaceIntersectWithSemiJoin 仅适用于 INTERSECT DISTINCT 类型的语句,不适用于 INTERSECT ALL 语句。此外,该优化规则执行之前必须消除重复的属性,避免生成的 Join 条件不正确。
    示例:

    create table t1(c1 string) stored as textfile;
    create table t2(c1 string) stored as textfile;
    load data local inpath '/etc/profile' overwrite into table t1;
    load data local inpath '/etc/profile' overwrite into table t2;
    
    • 1
    • 2
    • 3
    • 4

    查找长度为4的。

     select c1 from t1 where length(c1)=4;
    
    • 1

    输出结果:

    else
    else
    else
    done
    Time taken: 0.064 seconds, Fetched 4 row(s)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • intersect distinct
    explain extended 
    select  c1 from t2 where length(c1)<5 
    intersect distinct 
    select c1 from t1 where length(c1)=4;
    
    • 1
    • 2
    • 3
    • 4

    输出结果如下,可以看到,Analyzed Logical Plan 中,为 Intersect,而 Optimized Logical Plan 变为 Join LeftSemi

    == Analyzed Logical Plan ==
    c1: string
    Intersect false
    :- Project [c1#149]
    :  +- Filter (length(c1#149) < 5)
    :     +- SubqueryAlias spark_catalog.hzz.t2
    :        +- HiveTableRelation [`hzz`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#149], Partition Cols: []]
    +- Project [c1#150]
       +- Filter (length(c1#150) = 4)
          +- SubqueryAlias spark_catalog.hzz.t1
             +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#150], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Aggregate [c1#149], [c1#149]
    +- Join LeftSemi, (c1#149 <=> c1#150)
       :- Filter (isnotnull(c1#149) AND (length(c1#149) < 5))
       :  +- HiveTableRelation [`hzz`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#149], Partition Cols: []]
       +- Filter (isnotnull(c1#150) AND (length(c1#150) = 4))
          +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#150], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    4.2 ReplaceExceptWithAntiJoin

    用 AntiJoin 替换 Except。
    示例如下:

    explain extended 
    select  c1 from t2 where length(c1) <=5 
    except 
    select c1 from t1 where length(c1)=4;
    
    • 1
    • 2
    • 3
    • 4

    输出结果:

    == Analyzed Logical Plan ==
    c1: string
    Except false
    :- Project [c1#156]
    :  +- Filter (length(c1#156) <= 5)
    :     +- SubqueryAlias spark_catalog.hzz.t2
    :        +- HiveTableRelation [`hzz`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#156], Partition Cols: []]
    +- Project [c1#157]
       +- Filter (length(c1#157) = 4)
          +- SubqueryAlias spark_catalog.hzz.t1
             +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#157], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Aggregate [c1#156], [c1#156]
    +- Join LeftAnti, (c1#156 <=> c1#157)
       :- Filter (isnotnull(c1#156) AND (length(c1#156) <= 5))
       :  +- HiveTableRelation [`hzz`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#156], Partition Cols: []]
       +- Filter (isnotnull(c1#157) AND (length(c1#157) = 4))
          +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#157], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    4.3 RelaceDistinctWithAggregate

    示例:

    explain extended 
    select distinct c1 from t1;
    
    • 1
    • 2

    输出结果如下:

    == Analyzed Logical Plan ==
    c1: string
    Distinct
    +- Project [c1#163]
       +- SubqueryAlias spark_catalog.hzz.t1
          +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#163], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Aggregate [c1#163], [c1#163]
    +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#163], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    5. Batch Aggregate

    5.1 RemoveLiteralFromGroupExceptions

    去除 group by中的常数。
    示例:group by 都是常数,用 0 替代

    explain extended 
    select sum(length(c1)) from t1 group by 'aa','bb';
    
    • 1
    • 2
    == Analyzed Logical Plan ==
    sum(length(c1)): bigint
    Aggregate [aa, bb], [sum(length(c1#189)) AS sum(length(c1))#191L]
    +- SubqueryAlias spark_catalog.hzz.t1
       +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#189], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Aggregate [0], [sum(length(c1#189)) AS sum(length(c1))#191L]
    +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#189], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    5.2 RemoteRepetitionFromGroupExpressions

    去除 group by 中重复的表达式,如

    explain extended 
    select sum(length(c1)) from t1 group by c1,c1;
    
    • 1
    • 2

    输出结果

    == Analyzed Logical Plan ==
    sum(length(c1)): bigint
    Aggregate [c1#201, c1#201], [sum(length(c1#201)) AS sum(length(c1))#203L]
    +- SubqueryAlias spark_catalog.hzz.t1
       +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#201], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Aggregate [c1#201], [sum(length(c1#201)) AS sum(length(c1))#203L]
    +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#201], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    6. Batch Operator Optimizations

    包括3大分类。1. 算子下推。2. 算子组合。3. 常量折叠与长度消减。
    算子下推:谓词下推,列裁剪。
    算子组合:

    优化规则优化操作
    PushProjectionThroughUnion列裁剪下推
    ReorderJoinJoin 顺序优化,和 CostBasedJoinReorder 没有关系
    EliminateOuterJoin消除 OuterJoin
    PushPredicateThroughJoin谓词下推到Join 算子
    PushDownPredicate谓词下推
    LimitPushDownLimit 算子下推
    ColumnPruning列剪裁
    InferFiltersFromConstraints
    CollapseRepartition重分区组合
    CollapseProject投影算子组合
    CollapseWindowWindow 组合
    CombineFilters投影算子组合
    CombineLimitsLimit算子组合
    CombineUnionsUnion算子组合
    NullPropagationNull 提取
    FoldablePropagation可折叠算子提取
    OptimizeInIn 操作优化
    ConstantFolding常数折叠
    ReorderAssociativeOperator重排序关联算子优化
    LikeSimplificationLike 算子简化
    BooleanSimplificationBoolean 算子简化
    SimplifyConditionals条件简化
    RemoveDispensableExpressionsDispensable 表达式消除
    SimplifyBianryComparison比较算子简化
    PruneFilter过滤条件剪裁
    EliminateSorts排序算子消除
    SimplifyCastsCast 算子简化
    SimplifyCaseConversionExpressionsCase 表达式简化
    RewriteCorrelatedScalarSubquery依赖子查询重写
    EliminateSerialization序列化消除
    RemoveAliasOnlyPorject消除别名

    InferFiltersFromConstraints

    explain extended 
    select t1.c1 from t1 join t2 
    on t1.c1=t2.c1 
    where t2.c1='done';
    
    • 1
    • 2
    • 3
    • 4

    通过 t2.c1 = t1.c1 并且t2.c1=‘done’ 推测出 t1.c1=‘done’.

    == Analyzed Logical Plan ==
    c1: string
    Project [c1#235]
    +- Filter (c1#236 = done)
       +- Join Inner, (c1#235 = c1#236)
          :- SubqueryAlias spark_catalog.hzz.t1
          :  +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#235], Partition Cols: []]
          +- SubqueryAlias spark_catalog.hzz.t2
             +- HiveTableRelation [`hzz`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#236], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Project [c1#235]
    +- Join Inner, (c1#235 = c1#236)
       :- Filter ((c1#235 = done) AND isnotnull(c1#235))
       :  +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#235], Partition Cols: []]
       +- Filter (isnotnull(c1#236) AND (c1#236 = done))
          +- HiveTableRelation [`hzz`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#236], Partition Cols: []]
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    ConstantFolding

    Analyzed Logical Plan中 Filter 中还是 (1 + (2 * 3),在 Optimized Logical Plan 变为了具体的值 7.

    explain extended 
    select  c1 from t1 where length(c1)> 1+2*3;
    
    • 1
    • 2
    == Analyzed Logical Plan ==
    c1: string
    Project [c1#266]
    +- Filter (length(c1#266) > (1 + (2 * 3)))
       +- SubqueryAlias spark_catalog.hzz.t1
          +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#266], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Filter (isnotnull(c1#266) AND (length(c1#266) > 7))
    +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#266], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    RemoveDispensableExpressions

    如以下SQL 1 < 2 可以消除。

    explain extended 
    select  c1 from t1 where 1 < 2 and length(c1) = 4;
    
    • 1
    • 2
    == Analyzed Logical Plan ==
    c1: string
    Project [c1#272]
    +- Filter ((1 < 2) AND (length(c1#272) = 4))
       +- SubqueryAlias spark_catalog.hzz.t1
          +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#272], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Filter (isnotnull(c1#272) AND (length(c1#272) = 4))
    +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#272], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    7. Batch Check Cartesian Products

    CheckCartesianProducts 判断逻辑算子树是否存在迪卡尔类型的 Join 操作。当存在这样的操作,而SQL中没有显示的使用 cross join 表达式,则会抛出异常。当spark.sql.crossJoin.enabledtrue时,该规则会被忽略。

    8. Batch Decimal Optimizations =>DecimalAggregates

    一般情况下,如果聚和查询中涉及浮点数的精度处理,性能就会受到很大的影响。对于固定精度的 Decinal 类型,DecimalAggregates 规则将其当做 unscaledLong 类型来执行,这样可以加速聚和操作的速度。

    9. BatchTyped Filter Optimization => CombineTypedFilters

    当逻辑算子树中存在两个 TypedFilter 过滤条件且针对同类型的对象条件时,CombineTypeFilters 优化规则会将他们合并到同一个过滤函数中。

    10. Batch LocalRelation

    ConvertToLocalRelation 将一个 LocalRelation 上的本地操作转化为另一个 LocalRelation
    VALUES ('v1', 'v12'), ('v2', 'v22'), ('v3', 'v32') AS tab(c1, c2) 就是一个local relation。

    explain extended 
     SELECT c1 FROM VALUES 
     ('v1', 'v12'), ('v2', 'v22'), ('v3', 'v32') 
     AS tab(c1, c2) where c1='v1';
    
    • 1
    • 2
    • 3
    • 4

    输出结果, Parsed Logical Plan 中转化为 UnresolvedInlineTable。在Analyzed Logical Plan 中 UnresolvedInlineTable 转化为 LocalRelation。Optimized Logical Plan 变成仅有一个 LocalRelation,把 LocalRelation 和其上的操作转化为一个新的 LocalRelation。

    == Parsed Logical Plan ==
    'Project ['c1]
    +- 'Filter ('c1 = v1)
       +- 'SubqueryAlias tab
          +- 'UnresolvedInlineTable [c1, c2], [[v1, v12], [v2, v22], [v3, v32]]
    
    == Analyzed Logical Plan ==
    c1: string
    Project [c1#323]
    +- Filter (c1#323 = v1)
       +- SubqueryAlias tab
          +- LocalRelation [c1#323, c2#324]
    
    == Optimized Logical Plan ==
    LocalRelation [c1#323]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    PropageEmptyRelation 对空的 LocalRelation 进行折叠。

     explain extended 
      select t1.c1 from (
        SELECT c1 FROM VALUES 
        ('v1', 'v12'), ('v2', 'v22'), ('v3', 'v32') AS tab(c1, c2) 
        where c1='v4'
       )t1 join (
        SELECT c1 FROM 
        VALUES ('v1', 'v12'), ('v2', 'v22'), ('v3', 'v32') AS tab(c1, c2) where c1='v4' 
      )t2 where t1.c1=t2.c1;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    结果如下, Analyzed Logical Plan 还有两个子查询做 join 操作。
    到了 Optimized Logical Plan 中,仅有一个LocalRelation ,标记 LocalRelation 是空的。因为两个子查询经过优化后都是 LocalRelation ,join 后也是 LocalRelation

    == Analyzed Logical Plan ==
    c1: string
    Project [c1#337]
    +- Filter (c1#337 = c1#339)
       +- Join Inner
          :- SubqueryAlias t1
          :  +- Project [c1#337]
          :     +- Filter (c1#337 = v4)
          :        +- SubqueryAlias tab
          :           +- LocalRelation [c1#337, c2#338]
          +- SubqueryAlias t2
             +- Project [c1#339]
                +- Filter (c1#339 = v4)
                   +- SubqueryAlias tab
                      +- LocalRelation [c1#339, c2#340]
    
    == Optimized Logical Plan ==
    LocalRelation <empty>, [c1#337]
    
    == Physical Plan ==
    LocalTableScan <empty>, [c1#337]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    11. Batch OptimizeCodegen => OptimizeCodegen

    现在 Optimize 里已经没有 OptimizeCodegen 规则。

    12. Batch RewriteSubquery

    包含 RewritePredicateSubquery 和 CollapseProject 两条优化规则。
    RewritePredicateSubquery 将特定的子查询谓词转换为 left-semi / anti join 操作。其中,EXISTS 和 NOT EXISTS 算子分别对应 semi 和 anti 类型的 join,过滤条件会当做join 的条件,IN 和NOT IN 也分别对应 semi 和 anti 类型的 join,过滤条件和选择的列会被当做 join 的条件。

    CollapseProject 优化规则比较简单,类似 CombineTypedFilters 优化规则,会将两个相邻的 Project 算子组合在一起执行别名替换,整合成一个统一的表达式。

    • 示例将in 转为 left-semijoin
    explain extended select * from t1 where c1 in(select c1 from t2 where length(c1)>4);
    
    • 1

    输出如下:

    == Analyzed Logical Plan ==
    c1: string
    Project [c1#34]
    +- Filter c1#34 IN (list#28 [])
       :  +- Project [c1#35]
       :     +- Filter (length(c1#35) > 4)
       :        +- SubqueryAlias spark_catalog.hzz.t2
       :           +- HiveTableRelation [`hzz`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#35], Partition Cols: []]
       +- SubqueryAlias spark_catalog.hzz.t1
          +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#34], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Join LeftSemi, (c1#34 = c1#35)
    :- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#34], Partition Cols: []]
    +- Filter (isnotnull(c1#35) AND (length(c1#35) > 4))
       +- HiveTableRelation [`hzz`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#35], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 示例将not in 转为 left-antijoin
    explain extended select * from t1 where c1 not in(select c1 from t2 where length(c1)>4);
    
    • 1

    输出如下:

    == Analyzed Logical Plan ==
    c1: string
    Project [c1#42]
    +- Filter NOT c1#42 IN (list#36 [])
       :  +- Project [c1#43]
       :     +- Filter (length(c1#43) > 4)
       :        +- SubqueryAlias spark_catalog.hzz.t2
       :           +- HiveTableRelation [`hzz`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#43], Partition Cols: []]
       +- SubqueryAlias spark_catalog.hzz.t1
          +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#42], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Join LeftAnti, ((c1#42 = c1#43) OR isnull((c1#42 = c1#43)))
    :- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#42], Partition Cols: []]
    +- Filter (isnotnull(c1#43) AND (length(c1#43) > 4))
       +- HiveTableRelation [`hzz`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#43], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    CollapseProject 示例:

    explain extended 
    select c1_alias1 c1_alias2  from (
      select c1 c1_alias1 from (
        select c1 from t1 where c1 > 'abc'
        )
     )t2;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Analyzed Logical Plan 中的3个Project 合并为1个 Project。

    == Analyzed Logical Plan ==
    c1_alias2: string
    Project [c1_alias1#59 AS c1_alias2#60]
    +- SubqueryAlias t2
       +- Project [c1#66 AS c1_alias1#59]
          +- SubqueryAlias __auto_generated_subquery_name
             +- Project [c1#66]
                +- Filter (c1#66 > abc)
                   +- SubqueryAlias spark_catalog.hzz.t1
                      +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#66], Partition Cols: []]
    
    == Optimized Logical Plan ==
    Project [c1#66 AS c1_alias2#60]
    +- Filter (isnotnull(c1#66) AND (c1#66 > abc))
       +- HiveTableRelation [`hzz`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#66], Partition Cols: []]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    13. Batch Optimize Metadata Only Query => OptimizeMetadataOnlyQuery

    在 SparkOptimizer 中定义
    本规则优化仅需要查找分区级别的元数据。适用于用到的列都是分区列,并且查询的汇聚操作满足以下条件。

      1. 汇聚表达式作用在分区字段。
        SELECT col FROM tbl GROUP BY col
      1. 汇聚表达式作用在分区字段,并且有DISTINCT关键字 。
        SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1
      1. 汇聚表达式作用在分区字段,并且是否有DISTINCT关键字不影响结果
        SELECT col1, Max(col2) FROM tbl GROUP BY col1

    14 BatchExtract Python UDF from Aggregate => ExtractPythonUDFFromAggregate

    该 Batch 仅执行一次,只有 ExtractPythonUDFFromAggregate 一条规则,用于提取聚和操作中的 Python UDF 函数。该规则主要针对的是采用 PySpark 提交查询的情形,将参与聚和的 Python 自定义函数提取出来,在聚和操作完成后再执行。

    15. Batch Prune File Source Table Partitions => PruneFileSourcePartitions

    用来对数据文件中的分区进行剪裁操作。

    16. Batch User Provided Optimizers => ExperimentalMethods.extraOptimizations

    用于支持用户自定义的优化规则。用户只需要继承 Rule[LogicalPlan] 虚类。

  • 相关阅读:
    标准化,归一化,二值化,One-Hot,卡方检验选取重要特征,主成分分析,缺失值和异常值处理
    再见RestTemplate,Spring 6.1新特性:RestClient 了解一下!
    Java版工程行业管理系统源码-专业的工程管理软件-提供一站式服务
    web前端期末大作业:基于HTML+CSS+JavaScript奥迪企业bootstrap响应式网站
    软件项目管理 4.3.敏捷需求建模方法
    JDK1.8之前与之后 HashMap底层实现原理的差别
    Linux系统firewalld防火墙的应用实操(禁止屏蔽海外国外IP访问)
    从统计语言模型到预训练语言模型---预训练语言模型(Transformer)
    ROW_NUMBER ( )去重并根据条件保留数据--开窗函数使用
    微服务11-Sentinel中的授权规则以及Sentinel服务规则持久化
  • 原文地址:https://blog.csdn.net/houzhizhen/article/details/132602026