• spark3 spark-sql explain 命令的执行过程


    1. SparkSQLDriver

    对于每个 SQL 语句,除了 CommandFactory 定义的,如 dfs之外,都创建一个 SparkSQLDriver 对象,然后调用 他的 init方法和 run 方法。

    override def run(command: String): CommandProcessorResponse = {
        try {
          val substitutorCommand = SQLConf.withExistingConf(context.conf) {
            new VariableSubstitution().substitute(command)
          }
          context.sparkContext.setJobDescription(substitutorCommand)
          val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
          hiveResponse = SQLExecution.withNewExecutionId(execution) {
            hiveResultString(execution.executedPlan)
          }
          tableSchema = getResultSetSchema(execution)
          new CommandProcessorResponse(0)
        } catch {
            // 
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    最重要的是

    val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
    
    • 1

    首先执行 context.sql(command)
    Context.sql 方法如下

    def sql(sqlText: String): DataFrame = sparkSession.sql(sqlText)
    
    • 1

    sparkSession.sql

    plan 是解析后的 Unsolved Logical plan.

     def sql(sqlText: String): DataFrame = withActive {
        val tracker = new QueryPlanningTracker
        val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
          sessionState.sqlParser.parsePlan(sqlText)
        }
        Dataset.ofRows(self, plan, tracker)
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    DataSet.ofRows

    ofRows 的 qe.assertAnalyzed() 对 plan 进行解析

     def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker)
        : DataFrame = sparkSession.withActive {
        val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
        qe.assertAnalyzed()
        new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    context.sql(command).logicalPlan 是 DataSet 的 logicalPlan,代码如下:

      @transient private[sql] val logicalPlan: LogicalPlan = {
        val plan = queryExecution.commandExecuted
        if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
          val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new HashSet[Long])
          dsIds.add(id)
          plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
        }
        plan
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    QueryExecution

    DataSet 的 logicalPlan 用到了 queryExecution.commandExecuted 字段,该字段是 lazy 的,第1次用到的时候初始化,里面有用到 analyzed,也是 lazy 的。
    analyzed 把 Unsolved Execution Plan 转为 Resolved Execution Plan。 commandExecuted 执行eagerlyExecuteCommands(analyzed)。
    第一次访问 commandExecuted 生成 CommandResult 对象,以后再访问DataSet 的 logicalPlan,还是返回 CommandResult 对象。

    lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) {
      // We can't clone `logical` here, which will reset the `_analyzed` flag.
      sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
    }
    
    lazy val commandExecuted: LogicalPlan = mode match {
      case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)
      case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
      case CommandExecutionMode.SKIP => analyzed
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    eagerlyExecuteCommands 返回 CommandResult 对象

    private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {
        case c: Command =>
          val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)
          val result = SQLExecution.withNewExecutionId(qe, Some(commandExecutionName(c))) {
            qe.executedPlan.executeCollect()
          }
          CommandResult(
            qe.analyzed.output,
            qe.commandExecuted,
            qe.executedPlan,
            result)
        case other => other
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    SparkSQLDriver.run

    继续回到主流程。context.sessionState.executePlan 参数是 CommandResult 对象。

    val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
    hiveResponse = SQLExecution.withNewExecutionId(execution) {
            hiveResultString(execution.executedPlan)
    
    • 1
    • 2
    • 3

    sessionState.executePlan

    默认 mode 为 CommandExecutionMode.ALL。plan 是 CommandResult 对象。

     def executePlan(
          plan: LogicalPlan,
          mode: CommandExecutionMode.Value = CommandExecutionMode.ALL): QueryExecution =
        createQueryExecution(plan, mode)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    protected def createQueryExecution:
        (LogicalPlan, CommandExecutionMode.Value) => QueryExecution =
          (plan, mode) => new QueryExecution(session, plan, mode = mode)
    
    • 1
    • 2
    • 3

    所以 val execution = context.sessionState.executePlan(context.sql(command).logicalPlan) 执行后,execution 是 QueryExecution 对象。

    SparkSQLDriver.run

    hiveResponse = SQLExecution.withNewExecutionId(execution) {
      hiveResultString(execution.executedPlan)
    }
    tableSchema = getResultSetSchema(execution)
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    MacOS 为指定应用添加指定权限(浏览器无法使用摄像头、麦克风终极解决方案)
    (未解决)执行git rebase提示:Current branch xxx is up to date.
    详细介绍Webpack5中的Plugin
    【信号处理】Matlab实现希尔伯特-黄变换
    vue3 如何国际化
    60.【C++猜数字游戏(看一眼就会)】
    陇剑杯2023线上wp
    【C++】类的封装 ③ ( 访问控制权限 )
    快递查询方法分享:如何批量查询并筛选超时快递?
    Spring整合Mybatis
  • 原文地址:https://blog.csdn.net/houzhizhen/article/details/131686021