• Hive-源码分析一条hql的执行过程


    一、源码下载

     下面是hive官方源码下载地址,我下载的是hive-3.1.3,那就一起来看下吧

    https://dlcdn.apache.org/hive/hive-3.1.3/apache-hive-3.1.3-src.tar.gz

    二、上下文

    <Hive-源码带你看hive命令背后都做了什么>博客中已经讲到了hive命令执行后会一直循环处理控制台输入的hql,下面就来继续分析下一条hql的执行过程,我们先看官网给的路径,然后再从源码开始捋。

    三、官网说明

    Design - Apache Hive - Apache Software Foundation

    图中还展示了一个典型的查询是如何在系统中流动的,这里我们先看普通的查询

    1、UI调用驱动程序的执行接口

    2、驱动程序为查询创建会话句柄,并将查询发送给编译器以生成执行计划

    3、4、编译器从元存储中获取必要的元数据

    5、利用元数据对查询树中的表达式进行类型检查,并根据查询谓词修剪分区。编译器生成计划,计划是阶段的DAG,每个阶段要么是Map/Reduce作业,要么是元数据操作,要么是HDFS上的操作。对于Map/Reduce阶段,计划包含map运算符树(在MapTask上执行的运算符树)和reduce运算符树(用于需要ReduceTask的操作)。

    6、6.1、6.2、6.3:执行引擎将这些阶段提交给适当的组件,

    四、源码分析

    <Hive-源码带你看hive命令背后都做了什么>博客中已经讲到了CliDriver.executeDriver(),我们从其中的processLine()开始捋

    1、processLine

    1. /**
    2. * 处理一行分号分隔的命令 *
    3. * @param line
    4. * 要处理的命令 也就是一条hql
    5. * @param allowInterrupting
    6. * 当为true时,函数将通过中断处理并返回-1来处理SIG_INT(Ctrl+C)
    7. *
    8. * @return 如果一切正常 返回 0
    9. */
    10. public int processLine(String line, boolean allowInterrupting) {
    11. SignalHandler oldSignal = null;
    12. Signal interruptSignal = null;
    13. //如果是解析从控制台来的hql,allowInterrupting = true
    14. if (allowInterrupting) {
    15. //请记住在我们开始行处理时正在运行的所有线程。处理此行时挂起自定义Ctrl+C处理程序
    16. //中断保留现场
    17. interruptSignal = new Signal("INT");
    18. oldSignal = Signal.handle(interruptSignal, new SignalHandler() {
    19. private boolean interruptRequested;
    20. @Override
    21. public void handle(Signal signal) {
    22. boolean initialRequest = !interruptRequested;
    23. interruptRequested = true;
    24. //在第二个ctrl+c上杀死VM
    25. if (!initialRequest) {
    26. console.printInfo("Exiting the JVM");
    27. System.exit(127);
    28. }
    29. //中断CLI线程以停止当前语句并返回提示,还确实,下方给出了截图
    30. console.printInfo("Interrupting... Be patient, this might take some time.");
    31. console.printInfo("Press Ctrl+C again to kill JVM");
    32. //首先,终止所有正在运行的MR作业
    33. HadoopJobExecHelper.killRunningJobs();
    34. TezJobExecHelper.killRunningJobs();
    35. HiveInterruptUtils.interrupt();
    36. }
    37. });
    38. }
    39. try {
    40. int lastRet = 0, ret = 0;
    41. //我们不能直接使用“split”函数,因为可能会引用“;” 比如拼接字符串中有 “\\;”
    42. //将hql按照字符一个一个处理,遇到 “;” 就会将前面的处理成一个hql 放入 commands
    43. List commands = splitSemiColon(line);
    44. String command = "";
    45. //循环执行用户一次输入的多条hql
    46. for (String oneCmd : commands) {
    47. if (StringUtils.endsWith(oneCmd, "\\")) {
    48. command += StringUtils.chop(oneCmd) + ";";
    49. continue;
    50. } else {
    51. command += oneCmd;
    52. }
    53. if (StringUtils.isBlank(command)) {
    54. continue;
    55. }
    56. //接下来我们看processCmd方法中都做了什么
    57. ret = processCmd(command);
    58. command = "";
    59. lastRet = ret;
    60. boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
    61. if (ret != 0 && !ignoreErrors) {
    62. return ret;
    63. }
    64. }
    65. return lastRet;
    66. } finally {
    67. // Once we are done processing the line, restore the old handler
    68. if (oldSignal != null && interruptSignal != null) {
    69. Signal.handle(interruptSignal, oldSignal);
    70. }
    71. }
    72. }

    确实如源码中所写,当hql执行时如果按了ctrl+c 会有退出且给出这样的提示

    2、processCmd

    1. public int processCmd(String cmd) {
    2. CliSessionState ss = (CliSessionState) SessionState.get();
    3. ss.setLastCommand(cmd);
    4. ss.updateThreadName();
    5. //刷新打印流,使其不包括上一个命令的输出
    6. ss.err.flush();
    7. //从sql语句中剥离注释,跟踪语句何时包含字符串文字。并去掉头尾空白符(只有头尾哟)
    8. String cmd_trimmed = HiveStringUtils.removeComments(cmd).trim();
    9. //将去掉注释和首尾空白的hql按照 "\\s+" 分割成 tokens 字符串数组
    10. // "\\s+" 等价于 [\f\r\t\v]
    11. //比如现在 tokens 就是{“select” ,“*” , “from” ,“ods.test” , "where" "dt='20240309'"}
    12. String[] tokens = tokenizeCmd(cmd_trimmed);
    13. int ret = 0;
    14. //如果用户输入的是 quit 或 exit 直接退出
    15. if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
    16. //如果我们已经走到了这一步——要么前面的命令都成功了,
    17. //要么这是命令行。无论哪种情况,这都算作成功运行
    18. ss.close();
    19. System.exit(0);
    20. //如果 hql 第一个字符串是 source
    21. } else if (tokens[0].equalsIgnoreCase("source")) {
    22. //获取 source 后的hql字符串
    23. String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
    24. cmd_1 = new VariableSubstitution(new HiveVariableSource() {
    25. @Override
    26. public Map getHiveVariable() {
    27. return SessionState.get().getHiveVariables();
    28. }
    29. }).substitute(ss.getConf(), cmd_1);
    30. File sourceFile = new File(cmd_1);
    31. if (! sourceFile.isFile()){
    32. console.printError("File: "+ cmd_1 + " is not a file.");
    33. ret = 1;
    34. } else {
    35. try {
    36. ret = processFile(cmd_1);
    37. } catch (IOException e) {
    38. console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
    39. stringifyException(e));
    40. ret = 1;
    41. }
    42. }
    43. } else if (cmd_trimmed.startsWith("!")) {
    44. // 对于shell命令,请使用unstretch命令
    45. //可以在hive客户端输入 ! sh your_script.sh 执行你的脚本
    46. String shell_cmd = cmd.trim().substring(1);
    47. shell_cmd = new VariableSubstitution(new HiveVariableSource() {
    48. @Override
    49. public Map getHiveVariable() {
    50. return SessionState.get().getHiveVariables();
    51. }
    52. }).substitute(ss.getConf(), shell_cmd);
    53. // shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
    54. try {
    55. ShellCmdExecutor executor = new ShellCmdExecutor(shell_cmd, ss.out, ss.err);
    56. ret = executor.execute();
    57. if (ret != 0) {
    58. console.printError("Command failed with exit code = " + ret);
    59. }
    60. } catch (Exception e) {
    61. console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
    62. stringifyException(e));
    63. ret = 1;
    64. }
    65. } else { //本地方式
    66. try {
    67. //获取执行hql的驱动程序,这个我们详细来看下
    68. try (CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf)) {
    69. if (proc instanceof IDriver) {
    70. //让驱动程序使用sql解析器剥离注释
    71. ret = processLocalCmd(cmd, proc, ss);
    72. } else {
    73. //这里是直接使用剥离完注释的sql,我们看这里
    74. ret = processLocalCmd(cmd_trimmed, proc, ss);
    75. }
    76. }
    77. } catch (SQLException e) {
    78. console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
    79. org.apache.hadoop.util.StringUtils.stringifyException(e));
    80. ret = 1;
    81. }
    82. catch (Exception e) {
    83. throw new RuntimeException(e);
    84. }
    85. }
    86. ss.resetThreadName();
    87. return ret;
    88. }

    3、获取执行hql的驱动程序

    顺着第2步看这个类CommandProcessorFactory

    1. public static CommandProcessor get(String[] cmd, @Nonnull HiveConf conf) throws SQLException {
    2. CommandProcessor result = getForHiveCommand(cmd, conf);
    3. if (result != null) {
    4. return result;
    5. }
    6. if (isBlank(cmd[0])) {
    7. return null;
    8. } else {
    9. //如果不是llap开头的hql都会走这
    10. //为客户端构建一个驱动程序
    11. return DriverFactory.newDriver(conf);
    12. }
    13. }
    14. public static CommandProcessor getForHiveCommand(String[] cmd, HiveConf conf)
    15. throws SQLException {
    16. return getForHiveCommandInternal(cmd, conf, false);
    17. }
    18. public static CommandProcessor getForHiveCommandInternal(String[] cmd, HiveConf conf,
    19. boolean testOnly)
    20. throws SQLException {
    21. //这部分是关键,在HiveCommand中,我们看下
    22. HiveCommand hiveCommand = HiveCommand.find(cmd, testOnly);
    23. if (hiveCommand == null || isBlank(cmd[0])) {
    24. return null;
    25. }
    26. if (conf == null) {
    27. conf = new HiveConf();
    28. }
    29. Set availableCommands = new HashSet();
    30. for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST)
    31. .split(",")) {
    32. availableCommands.add(availableCommand.toLowerCase().trim());
    33. }
    34. if (!availableCommands.contains(cmd[0].trim().toLowerCase())) {
    35. throw new SQLException("Insufficient privileges to execute " + cmd[0], "42000");
    36. }
    37. if (cmd.length > 1 && "reload".equalsIgnoreCase(cmd[0])
    38. && "function".equalsIgnoreCase(cmd[1])) {
    39. // special handling for SQL "reload function"
    40. return null;
    41. }
    42. switch (hiveCommand) {
    43. case SET:
    44. return new SetProcessor();
    45. case RESET:
    46. return new ResetProcessor();
    47. case DFS:
    48. SessionState ss = SessionState.get();
    49. return new DfsProcessor(ss.getConf());
    50. case ADD:
    51. return new AddResourceProcessor();
    52. case LIST:
    53. return new ListResourceProcessor();
    54. case LLAP_CLUSTER:
    55. return new LlapClusterResourceProcessor();
    56. case LLAP_CACHE:
    57. return new LlapCacheResourceProcessor();
    58. case DELETE:
    59. return new DeleteResourceProcessor();
    60. case COMPILE:
    61. return new CompileProcessor();
    62. case RELOAD:
    63. return new ReloadProcessor();
    64. case CRYPTO:
    65. try {
    66. return new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(), conf);
    67. } catch (HiveException e) {
    68. throw new SQLException("Fail to start the command processor due to the exception: ", e);
    69. }
    70. default:
    71. throw new AssertionError("Unknown HiveCommand " + hiveCommand);
    72. }
    73. }

    HiveCommand是非SQL语句,例如设置属性或添加资源。

    1. //可以看出正常情况下只会返回 LLAP_CLUSTER 和 LLAP_CACHE
    2. public static HiveCommand find(String[] command, boolean findOnlyForTesting) {
    3. if (null == command){
    4. return null;
    5. }
    6. //解析第一个hql字符串,比如 select 、 delete 、update 、set 等等
    7. String cmd = command[0];
    8. if (cmd != null) {
    9. /转成大写 SELECT 、 DELETE 、UPDATE 、SET 等等
    10. cmd = cmd.trim().toUpperCase();
    11. if (command.length > 1 && "role".equalsIgnoreCase(command[1])) {
    12. //对 "set role r1" 语句的特殊处理
    13. return null;
    14. } else if(command.length > 1 && "from".equalsIgnoreCase(command[1])) {
    15. //对 "delete from where..." 语句特殊处理
    16. return null;
    17. } else if(command.length > 1 && "set".equalsIgnoreCase(command[0]) && "autocommit".equalsIgnoreCase(command[1])) {
    18. return null;//不希望set autocommit true|false与set hive.foo.bar混合......
    19. } else if (command.length > 1 && "llap".equalsIgnoreCase(command[0])) {
    20. return getLlapSubCommand(command);
    21. } else if (COMMANDS.contains(cmd)) {
    22. HiveCommand hiveCommand = HiveCommand.valueOf(cmd);
    23. if (findOnlyForTesting == hiveCommand.isOnlyForTesting()) {
    24. return hiveCommand;
    25. }
    26. return null;
    27. }
    28. }
    29. return null;
    30. }
    31. private static HiveCommand getLlapSubCommand(final String[] command) {
    32. if ("cluster".equalsIgnoreCase(command[1])) {
    33. return LLAP_CLUSTER;
    34. } else if ("cache".equalsIgnoreCase(command[1])) {
    35. return LLAP_CACHE;
    36. } else {
    37. return null;
    38. }
    39. }
    40. 如果不是llap开头的hql都会走这 return DriverFactory.newDriver(conf);

      1. public static IDriver newDriver(QueryState queryState, String userName, QueryInfo queryInfo) {
      2. //获取配置中 hive.query.reexecution.enabled 的属性值 默认 true
      3. //解释:启用查询重新执行
      4. boolean enabled = queryState.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ENABLED);
      5. if (!enabled) {
      6. //如果没有开启则返回Driver
      7. return new Driver(queryState, userName, queryInfo);
      8. }
      9. //获取配置中 hive.query.reexecution.strategies 的属性值 默认值为 overlay,reoptimize
      10. //解释:可以使用逗号分隔的插件列表:
      11. //overlay:hiveconf子树“reexec.overlay”用作执行出错时的覆盖
      12. //reoptimize:在执行期间收集运算符统计信息,并在失败后重新编译查询
      13. String strategies = queryState.getConf().getVar(ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES);
      14. strategies = Strings.nullToEmpty(strategies).trim().toLowerCase();
      15. ArrayList plugins = new ArrayList<>();
      16. for (String string : strategies.split(",")) {
      17. if (string.trim().isEmpty()) {
      18. continue;
      19. }
      20. plugins.add(buildReExecPlugin(string));
      21. }
      22. //默认返回ReExecDriver
      23. //覆盖IDriver接口,处理查询的重新执行;并向底层的重新执行插件提出了明确的问题。
      24. return new ReExecDriver(queryState, userName, queryInfo, plugins);
      25. }

      4、processLocalCmd

      1. int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
      2. //获取hive-site.xml中的hive.cli.print.escape.crlf属性值,默认为false
      3. //解释:是否将行输出中的回车和换行打印为转义符\r\n
      4. boolean escapeCRLF = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_ESCAPE_CRLF);
      5. int ret = 0;
      6. if (proc != null) {
      7. //从第3步已经知晓,默认会走这一步
      8. if (proc instanceof IDriver) {
      9. //强制先转成IDriver
      10. IDriver qp = (IDriver) proc;
      11. PrintStream out = ss.out;
      12. long start = System.currentTimeMillis();
      13. if (ss.getIsVerbose()) {
      14. out.println(cmd);
      15. }
      16. //这里调用的时IDriver.run() 我们详细看下
      17. ret = qp.run(cmd).getResponseCode();
      18. if (ret != 0) {
      19. qp.close();
      20. return ret;
      21. }
      22. //查询已运行捕获时间
      23. long end = System.currentTimeMillis();
      24. double timeTaken = (end - start) / 1000.0;
      25. ArrayList res = new ArrayList();
      26. printHeader(qp, out);
      27. //打印结果
      28. int counter = 0;
      29. try {
      30. if (out instanceof FetchConverter) {
      31. ((FetchConverter) out).fetchStarted();
      32. }
      33. while (qp.getResults(res)) {
      34. for (String r : res) {
      35. if (escapeCRLF) {
      36. r = EscapeCRLFHelper.escapeCRLF(r);
      37. }
      38. out.println(r);
      39. }
      40. counter += res.size();
      41. res.clear();
      42. if (out.checkError()) {
      43. break;
      44. }
      45. }
      46. } catch (IOException e) {
      47. console.printError("Failed with exception " + e.getClass().getName() + ":" + e.getMessage(),
      48. "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
      49. ret = 1;
      50. }
      51. qp.close();
      52. if (out instanceof FetchConverter) {
      53. ((FetchConverter) out).fetchFinished();
      54. }
      55. console.printInfo(
      56. "Time taken: " + timeTaken + " seconds" + (counter == 0 ? "" : ", Fetched: " + counter + " row(s)"));
      57. } else {
      58. String firstToken = tokenizeCmd(cmd.trim())[0];
      59. String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length());
      60. if (ss.getIsVerbose()) {
      61. ss.out.println(firstToken + " " + cmd_1);
      62. }
      63. CommandProcessorResponse res = proc.run(cmd_1);
      64. if (res.getResponseCode() != 0) {
      65. ss.out
      66. .println("Query returned non-zero code: " + res.getResponseCode() + ", cause: " + res.getErrorMessage());
      67. }
      68. if (res.getConsoleMessages() != null) {
      69. for (String consoleMsg : res.getConsoleMessages()) {
      70. console.printInfo(consoleMsg);
      71. }
      72. }
      73. ret = res.getResponseCode();
      74. }
      75. }
      76. return ret;
      77. }

      5、ReExecDriver

      1. public CommandProcessorResponse run(String command) {
      2. CommandProcessorResponse r0 = compileAndRespond(command);
      3. if (r0.getResponseCode() != 0) {
      4. return r0;
      5. }
      6. return run();
      7. }
      8. public CommandProcessorResponse compileAndRespond(String statement) {
      9. currentQuery = statement;
      10. //coreDriver就是Driver 我们去Driver详细看下这个逻辑
      11. return coreDriver.compileAndRespond(statement);
      12. }
      13. public CommandProcessorResponse run() {
      14. executionIndex = 0;
      15. int maxExecutuions = 1 + coreDriver.getConf().getIntVar(ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT);
      16. while (true) {
      17. executionIndex++;
      18. for (IReExecutionPlugin p : plugins) {
      19. p.beforeExecute(executionIndex, explainReOptimization);
      20. }
      21. coreDriver.getContext().setExecutionIndex(executionIndex);
      22. LOG.info("Execution #{} of query", executionIndex);
      23. CommandProcessorResponse cpr = coreDriver.run();
      24. PlanMapper oldPlanMapper = coreDriver.getPlanMapper();
      25. afterExecute(oldPlanMapper, cpr.getResponseCode() == 0);
      26. boolean shouldReExecute = explainReOptimization && executionIndex==1;
      27. shouldReExecute |= cpr.getResponseCode() != 0 && shouldReExecute();
      28. if (executionIndex >= maxExecutuions || !shouldReExecute) {
      29. return cpr;
      30. }
      31. LOG.info("Preparing to re-execute query");
      32. prepareToReExecute();
      33. CommandProcessorResponse compile_resp = coreDriver.compileAndRespond(currentQuery);
      34. if (compile_resp.failed()) {
      35. LOG.error("Recompilation of the query failed; this is unexpected.");
      36. // FIXME: somehow place pointers that re-execution compilation have failed; the query have been successfully compiled before?
      37. return compile_resp;
      38. }
      39. PlanMapper newPlanMapper = coreDriver.getPlanMapper();
      40. if (!explainReOptimization && !shouldReExecuteAfterCompile(oldPlanMapper, newPlanMapper)) {
      41. LOG.info("re-running the query would probably not yield better results; returning with last error");
      42. // FIXME: retain old error; or create a new one?
      43. return cpr;
      44. }
      45. }
      46. }

      5.1、Driver

      1. public CommandProcessorResponse compileAndRespond(String command, boolean cleanupTxnList) {
      2. try {
      3. compileInternal(command, false);
      4. return createProcessorResponse(0);
      5. } catch (CommandProcessorResponse e) {
      6. return e;
      7. } finally {
      8. if (cleanupTxnList) {
      9. //使用此命令编译的查询可能会生成有效的txn列表,因此我们需要重置它
      10. conf.unset(ValidTxnList.VALID_TXNS_KEY);
      11. }
      12. }
      13. }
      14. private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse {
      15. //......省略......
      16. try {
      17. //deferClose表示进程中断时是否应推迟关闭/销毁,
      18. //如果在另一个方法(如runInternal)内调用编译,
      19. //则应将其设置为true,runInternal将关闭推迟到该方法中调用的。
      20. //我们详细看下
      21. compile(command, true, deferClose);
      22. } catch (CommandProcessorResponse cpr) {
      23. //......省略......
      24. } finally {
      25. compileLock.unlock();
      26. }
      27. //......省略......
      28. }
      29. private void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse {
      30. //......省略......
      31. command = new VariableSubstitution(new HiveVariableSource() {
      32. @Override
      33. public Map getHiveVariable() {
      34. return SessionState.get().getHiveVariables();
      35. }
      36. }).substitute(conf, command);
      37. String queryStr = command;
      38. try {
      39. //应编辑命令以避免记录敏感数据
      40. queryStr = HookUtils.redactLogString(conf, command);
      41. } catch (Exception e) {
      42. LOG.warn("WARNING! Query command could not be redacted." + e);
      43. }
      44. checkInterrupted("at beginning of compilation.", null, null);
      45. if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {
      46. //在编译新查询之前关闭现有的ctx-etc,但不要破坏驱动程序
      47. closeInProcess(false);
      48. }
      49. if (resetTaskIds) {
      50. TaskFactory.resetId();
      51. }
      52. LockedDriverState.setLockedDriverState(lDrvState);
      53. //获取查询id 正在执行的查询的ID(每个会话可能有多个
      54. String queryId = queryState.getQueryId();
      55. if (ctx != null) {
      56. setTriggerContext(queryId);
      57. }
      58. //保存一些信息以供webUI在计划释放后使用
      59. this.queryDisplay.setQueryStr(queryStr);
      60. this.queryDisplay.setQueryId(queryId);
      61. //正在编译这条 hql
      62. LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr);
      63. conf.setQueryString(queryStr);
      64. //FIXME:副作用将把最后一个查询集留在会话级别
      65. if (SessionState.get() != null) {
      66. SessionState.get().getConf().setQueryString(queryStr);
      67. SessionState.get().setupQueryCurrentTimestamp();
      68. }
      69. //查询编译过程中是否发生任何错误。用于查询生存期挂钩。
      70. boolean compileError = false;
      71. boolean parseError = false;
      72. try {
      73. //初始化事务管理器。这必须在调用解析(analyze)之前完成。
      74. if (initTxnMgr != null) {
      75. queryTxnMgr = initTxnMgr;
      76. } else {
      77. queryTxnMgr = SessionState.get().initTxnMgr(conf);
      78. }
      79. if (queryTxnMgr instanceof Configurable) {
      80. ((Configurable) queryTxnMgr).setConf(conf);
      81. }
      82. queryState.setTxnManager(queryTxnMgr);
      83. //如果用户Ctrl-C两次杀死Hive CLI JVM,如果多次调用compile,
      84. //我们希望释放锁,请清除旧的shutdownhook
      85. ShutdownHookManager.removeShutdownHook(shutdownRunner);
      86. final HiveTxnManager txnMgr = queryTxnMgr;
      87. shutdownRunner = new Runnable() {
      88. @Override
      89. public void run() {
      90. try {
      91. releaseLocksAndCommitOrRollback(false, txnMgr);
      92. } catch (LockException e) {
      93. LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " +
      94. e.getMessage());
      95. }
      96. }
      97. };
      98. ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);
      99. //在解析和分析查询之前
      100. checkInterrupted("before parsing and analysing the query", null, null);
      101. if (ctx == null) {
      102. ctx = new Context(conf);
      103. setTriggerContext(queryId);
      104. }
      105. //设置此查询的事务管理器
      106. ctx.setHiveTxnManager(queryTxnMgr);
      107. ctx.setStatsSource(statsSource);
      108. //设置hql
      109. ctx.setCmd(command);
      110. //退出时清理HDFS
      111. ctx.setHDFSCleanup(true);
      112. perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
      113. //在查询进入解析阶段之前调用
      114. hookRunner.runBeforeParseHook(command);
      115. ASTNode tree;
      116. try {
      117. //解析hql 这里先不展开讲,我们会单独拿一篇博客来研究
      118. tree = ParseUtils.parse(command, ctx);
      119. } catch (ParseException e) {
      120. parseError = true;
      121. throw e;
      122. } finally {
      123. hookRunner.runAfterParseHook(command, parseError);
      124. }
      125. perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
      126. hookRunner.runBeforeCompileHook(command);
      127. //清除CurrentFunctionsInUse 设置,以捕获SemanticAnalyzer发现正在使用的新函数集
      128. SessionState.get().getCurrentFunctionsInUse().clear();
      129. perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);
      130. //刷新元存储缓存。这确保了我们不会从在同一线程中运行的先前查询中拾取对象。
      131. //这必须在我们获得语义分析器之后(即与元存储建立连接时),
      132. //但在我们进行分析之前完成,因为此时我们需要访问对象。
      133. Hive.get().getMSC().flushCache();
      134. backupContext = new Context(ctx);
      135. boolean executeHooks = hookRunner.hasPreAnalyzeHooks();
      136. //Hive为HiveSemanticAnalyzerHook的实现提供的上下文信息
      137. HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
      138. if (executeHooks) {
      139. hookCtx.setConf(conf);
      140. hookCtx.setUserName(userName);
      141. hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
      142. hookCtx.setCommand(command);
      143. hookCtx.setHiveOperation(queryState.getHiveOperation());
      144. //在Hive对语句执行自己的语义分析之前调用。实现可以检查语句AST,
      145. //并通过抛出SemanticException来阻止其执行。它是可选地,
      146. //它也可以扩充/重写AST,但必须生成一个与Hive自己的解析器直接返回的表单等效的表单。
      147. //返回替换后的AST(通常与原始AST相同,除非必须替换整个树;不得为null)
      148. tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree);
      149. }
      150. //进行语义分析和计划生成
      151. //这里会根据 tree的type获取不同的优化引擎,默认是CalcitePlanner
      152. BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
      153. if (!retrial) {
      154. openTransaction();
      155. generateValidTxnList();
      156. }
      157. //对hql转化后的tree进行解析,比如:语义分析 ,后面专门用一篇博客来研究
      158. sem.analyze(tree, ctx);
      159. if (executeHooks) {
      160. hookCtx.update(sem);
      161. hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks());
      162. }
      163. /语义分析完成
      164. LOG.info("Semantic Analysis Completed (retrial = {})", retrial);
      165. //检索有关查询的缓存使用情况的信息。
      166. if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
      167. cacheUsage = sem.getCacheUsage();
      168. }
      169. //验证计划
      170. sem.validate();
      171. perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
      172. //分析查询后
      173. checkInterrupted("after analyzing query.", null, null);
      174. //获取输出模式
      175. schema = getSchema(sem, conf);
      176. //制作查询计划
      177. plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
      178. queryState.getHiveOperation(), schema);
      179. //设置mapreduce工作流引擎id和name
      180. conf.set("mapreduce.workflow.id", "hive_" + queryId);
      181. conf.set("mapreduce.workflow.name", queryStr);
      182. //在此处初始化FetchTask
      183. if (plan.getFetchTask() != null) {
      184. plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext());
      185. }
      186. //进行授权检查
      187. if (!sem.skipAuthorization() &&
      188. HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
      189. try {
      190. perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
      191. //具体会做以下操作
      192. // 1、连接hive的元数据
      193. // 2、设置输入输出
      194. // 3、获取表和列的映射
      195. // 4、添加正在使用的永久UDF
      196. // 5、解析hql操作是对数据库、表、还是查询或者导入
      197. // 6、如果是分区表,还要检查分区权限
      198. // 7、通过表扫描运算符检查列授权
      199. // 8、表授权检查
      200. doAuthorization(queryState.getHiveOperation(), sem, command);
      201. } catch (AuthorizationException authExp) {
      202. console.printError("Authorization failed:" + authExp.getMessage()
      203. + ". Use SHOW GRANT to get more details.");
      204. errorMessage = authExp.getMessage();
      205. SQLState = "42000";
      206. throw createProcessorResponse(403);
      207. } finally {
      208. perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
      209. }
      210. }
      211. if (conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
      212. String explainOutput = getExplainOutput(sem, plan, tree);
      213. if (explainOutput != null) {
      214. LOG.info("EXPLAIN output for queryid " + queryId + " : "
      215. + explainOutput);
      216. if (conf.isWebUiQueryInfoCacheEnabled()) {
      217. //设置执行计划
      218. queryDisplay.setExplainPlan(explainOutput);
      219. }
      220. }
      221. }
      222. } catch (CommandProcessorResponse cpr) {
      223. throw cpr;
      224. } catch (Exception e) {
      225. checkInterrupted("during query compilation: " + e.getMessage(), null, null);
      226. compileError = true;
      227. ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
      228. errorMessage = "FAILED: " + e.getClass().getSimpleName();
      229. if (error != ErrorMsg.GENERIC_ERROR) {
      230. errorMessage += " [Error " + error.getErrorCode() + "]:";
      231. }
      232. // HIVE-4889
      233. if ((e instanceof IllegalArgumentException) && e.getMessage() == null && e.getCause() != null) {
      234. errorMessage += " " + e.getCause().getMessage();
      235. } else {
      236. errorMessage += " " + e.getMessage();
      237. }
      238. if (error == ErrorMsg.TXNMGR_NOT_ACID) {
      239. errorMessage += ". Failed command: " + queryStr;
      240. }
      241. SQLState = error.getSQLState();
      242. downstreamError = e;
      243. console.printError(errorMessage, "\n"
      244. + org.apache.hadoop.util.StringUtils.stringifyException(e));
      245. throw createProcessorResponse(error.getErrorCode());
      246. } finally {
      247. // 触发编译后挂钩。请注意,如果此处编译失败,则执行前/执行后挂钩将永远不会执行。
      248. if (!parseError) {
      249. try {
      250. hookRunner.runAfterCompilationHook(command, compileError);
      251. } catch (Exception e) {
      252. LOG.warn("Failed when invoking query after-compilation hook.", e);
      253. }
      254. }
      255. double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE)/1000.00;
      256. ImmutableMap compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation");
      257. queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings);
      258. boolean isInterrupted = lDrvState.isAborted();
      259. if (isInterrupted && !deferClose) {
      260. closeInProcess(true);
      261. }
      262. lDrvState.stateLock.lock();
      263. try {
      264. if (isInterrupted) {
      265. lDrvState.driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR;
      266. } else {
      267. lDrvState.driverState = compileError ? DriverState.ERROR : DriverState.COMPILED;
      268. }
      269. } finally {
      270. lDrvState.stateLock.unlock();
      271. }
      272. if (isInterrupted) {
      273. LOG.info("Compiling command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds");
      274. } else {
      275. LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
      276. }
      277. }
      278. }

      5.2、ReExecDriver自身执行

      1. public CommandProcessorResponse run() {
      2. executionIndex = 0;
      3. //获取配置文件中的 hive.query.reexecution.max.count 属性值,默认为 1
      4. //解释:单个查询的最大重新执行次数
      5. int maxExecutuions = 1 + coreDriver.getConf().getIntVar(ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT);
      6. while (true) {
      7. executionIndex++;
      8. //循环执行重新执行逻辑
      9. for (IReExecutionPlugin p : plugins) {
      10. //在执行查询之前调用
      11. p.beforeExecute(executionIndex, explainReOptimization);
      12. }
      13. coreDriver.getContext().setExecutionIndex(executionIndex);
      14. LOG.info("Execution #{} of query", executionIndex);
      15. //还是会调用Driver ,但是和5.1调用的不一样,我们详细看看
      16. CommandProcessorResponse cpr = coreDriver.run();
      17. PlanMapper oldPlanMapper = coreDriver.getPlanMapper();
      18. afterExecute(oldPlanMapper, cpr.getResponseCode() == 0);
      19. boolean shouldReExecute = explainReOptimization && executionIndex==1;
      20. shouldReExecute |= cpr.getResponseCode() != 0 && shouldReExecute();
      21. if (executionIndex >= maxExecutuions || !shouldReExecute) {
      22. return cpr;
      23. }
      24. //正在准备重新执行查询
      25. LOG.info("Preparing to re-execute query");
      26. prepareToReExecute();
      27. CommandProcessorResponse compile_resp = coreDriver.compileAndRespond(currentQuery);
      28. if (compile_resp.failed()) {
      29. LOG.error("Recompilation of the query failed; this is unexpected.");
      30. return compile_resp;
      31. }
      32. PlanMapper newPlanMapper = coreDriver.getPlanMapper();
      33. if (!explainReOptimization && !shouldReExecuteAfterCompile(oldPlanMapper, newPlanMapper)) {
      34. //重新运行查询可能不会产生更好的结果;返回最后一个错误
      35. LOG.info("re-running the query would probably not yield better results; returning with last error");
      36. return cpr;
      37. }
      38. }
      39. }

      分析调用Driver的逻辑(和5.1不同)

      1. public CommandProcessorResponse run(String command, boolean alreadyCompiled) {
      2. try {
      3. runInternal(command, alreadyCompiled);
      4. return createProcessorResponse(0);
      5. } catch (CommandProcessorResponse cpr) {
      6. //......省略......
      7. }
      8. }
      9. private void runInternal(String command, boolean alreadyCompiled) throws CommandProcessorResponse {
      10. errorMessage = null;
      11. SQLState = null;
      12. downstreamError = null;
      13. LockedDriverState.setLockedDriverState(lDrvState);
      14. lDrvState.stateLock.lock();
      15. try {
      16. if (alreadyCompiled) {
      17. if (lDrvState.driverState == DriverState.COMPILED) {
      18. //如果引擎是编译状态,现在修改成执行状态
      19. lDrvState.driverState = DriverState.EXECUTING;
      20. } else {
      21. //失败:预编译的查询已被取消或关闭。
      22. errorMessage = "FAILED: Precompiled query has been cancelled or closed.";
      23. console.printError(errorMessage);
      24. throw createProcessorResponse(12);
      25. }
      26. } else {
      27. lDrvState.driverState = DriverState.COMPILING;
      28. }
      29. } finally {
      30. lDrvState.stateLock.unlock();
      31. }
      32. //一个标志,通过跟踪方法是否因错误而返回,帮助在finally块中设置正确的驱动器状态。
      33. boolean isFinishedWithError = true;
      34. try {
      35. //Hive向HiveDriverRunHook的实现提供的上下文信息
      36. HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf,
      37. alreadyCompiled ? ctx.getCmd() : command);
      38. //获取所有驱动程序运行挂钩并预执行它们
      39. try {
      40. hookRunner.runPreDriverHooks(hookContext);
      41. } catch (Exception e) {
      42. errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
      43. SQLState = ErrorMsg.findSQLState(e.getMessage());
      44. downstreamError = e;
      45. console.printError(errorMessage + "\n"
      46. + org.apache.hadoop.util.StringUtils.stringifyException(e));
      47. throw createProcessorResponse(12);
      48. }
      49. PerfLogger perfLogger = null;
      50. //如果还没有编译
      51. if (!alreadyCompiled) {
      52. //内部编译将自动重置性能记录器
      53. compileInternal(command, true);
      54. //然后我们继续使用这个性能记录器
      55. perfLogger = SessionState.getPerfLogger();
      56. } else {
      57. //重用现有的性能记录器
      58. perfLogger = SessionState.getPerfLogger();
      59. //由于我们正在重用已编译的计划,因此需要更新其当前运行的开始时间
      60. plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
      61. }
      62. //我们在这里为cxt设置txn管理器的原因是,每个查询都有自己的ctx对象。
      63. //txn-mgr在同一个Driver实例中共享,该实例可以运行多个查询。
      64. ctx.setHiveTxnManager(queryTxnMgr);
      65. checkInterrupted("at acquiring the lock.", null, null);
      66. lockAndRespond();
      67. //......省略......
      68. try {
      69. //执行hql 我们后面专门用一篇博客来研究
      70. execute();
      71. } catch (CommandProcessorResponse cpr) {
      72. rollback(cpr);
      73. throw cpr;
      74. }
      75. //如果needRequireLock为false,则此处的发布将不执行任何操作,因为没有锁
      76. try {
      77. //由于set autocommit启动了一个隐式txn,请关闭它 if(queryTxnMgr.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) {
      78. releaseLocksAndCommitOrRollback(true);
      79. }
      80. else if(plan.getOperation() == HiveOperation.ROLLBACK) {
      81. releaseLocksAndCommitOrRollback(false);
      82. }
      83. else {
      84. //txn(如果有一个已启动)未完成
      85. }
      86. } catch (LockException e) {
      87. throw handleHiveException(e, 12);
      88. }
      89. perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN);
      90. queryDisplay.setPerfLogStarts(QueryDisplay.Phase.EXECUTION, perfLogger.getStartTimes());
      91. queryDisplay.setPerfLogEnds(QueryDisplay.Phase.EXECUTION, perfLogger.getEndTimes());
      92. //获取所有驱动程序运行的钩子并执行它们。
      93. try {
      94. hookRunner.runPostDriverHooks(hookContext);
      95. } catch (Exception e) {
      96. }
      97. isFinishedWithError = false;
      98. } finally {
      99. if (lDrvState.isAborted()) {
      100. closeInProcess(true);
      101. } else {
      102. //正常只释放相关资源ctx、driverContext
      103. releaseResources();
      104. }
      105. lDrvState.stateLock.lock();
      106. try {
      107. lDrvState.driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED;
      108. } finally {
      109. lDrvState.stateLock.unlock();
      110. }
      111. }
      112. }

      五、总结

      1、用户在hive客户端输入hql

      2、进行中断操作,终止正在运行的mr作业

      3、解析用户在hive客户端输入的hql(将hql按照字符一个一个处理,遇到 ";" 就会将前面的处理成一个hql 放入列表中)

      4、循环执行hql列表中的每一条hql

      5、从sql语句中剥离注释,并去掉头尾空白符 并按照 '\\s+' 分割成hql数组

      6、判断hql 是 正常的sql(只分析这个) 还是 source 、quit 、 exit 还是 !

      7、获取执行hql的驱动程序(对hql数组的第一个字符串进行转大写操作并匹配对应的驱动程序,默认会返回ReExecDriver)

      8、编译hql

      9、解析hql

      10、语义分析和计划生成

      11、校验计划

      12、获取输出模式并制作查询计划,并设置mapreduce工作流引擎参数

      13、授权检查

              13.1、连接hive的元数据

              13.2、设置输入输出

              13.3、获取表和列的映射

              13.4、添加正在使用的永久UDF

              13.5、通过表扫描运算符检查列授权

              13.6、表授权检查

      14、设置执行计划并执行

    41. 相关阅读:
      GPTCache使用
      机器人过程自动化(RPA)入门 8. 异常处理、调试和日志记录
      [网鼎杯 2018]Comment
      【Linux常用命令12】搜索命令及特殊字符的使用
      【C++】继承 ⑤ ( public 公有继承 - 示例分析 | protected 保护继承 - 示例分析 | private 私有继承 - 示例分析 )
      java计算机毕业设计即时高校信息发布系统MyBatis+系统+LW文档+源码+调试部署
      内网穿透——搭建私人影音媒体平台
      java175-method类反射机制
      XML序列化与反序列化接口对接实战,看这篇就够了
      sql高级进阶
    42. 原文地址:https://blog.csdn.net/lu070828/article/details/136259577