• Hive3.1.2的Beeline执行过程


    Hive3.1.2的Beeline执行过程

    前言

    由于阿里云DataPhin中台不能识别非DataPhin创建的表,不得已,笔者使用sql Client的beeline方式,实现了导入普通Hive表数据到DataPhin的Hive表:

    beline -u "jdbc:hive2://Hive的Host:10000/default;principal=hive/一串HOST@realm域" -e "
    insert overwrite table db1.tb1
    select
    	col1
    from
    	db2.tb2
    ;
    "
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    当然分区表也是支持的。由于经常报错,笔者尝试扒源码,尝试根据beeline的执行过程【beeline执行流程】,寻找优化方向,顺便试试能不能找到可调的参数。

    Beeline的使用方法可以参照官网的Confluencehttps://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients

    Hive参数配置官网的Confluence也十分详细:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-RestrictedListandWhitelist

    CDP7的Hive on Tez参数配置官网的Confluence也十分详细:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-Tez

    从官网就可以查到哪些情况调哪些参数,不必像肤浅的SQL Boy们那样到处求人。

    源码查看

    使用Apache的Hive3.1.2。IDE选用idea,Maven有无其实影响不大,毕竟不会去详细看Calcite解析AST。

    入口

    package org.apache.hive.beeline;
    
    /**
     * A console SQL shell with command completion.
     * 

    * TODO: *

      *
    • User-friendly connection prompts
    • *
    • Page results
    • *
    • Handle binary data (blob fields)
    • *
    • Implement command aliases
    • *
    • Stored procedure execution
    • *
    • Binding parameters to prepared statements
    • *
    • Scripting language
    • *
    • XA transactions
    • *
    * */
    @SuppressWarnings("static-access") public class BeeLine implements Closeable { /** * Starts the program. */ public static void main(String[] args) throws IOException { mainWithInputRedirection(args, null); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    beeline的Model下就有Beeline类,直接Main方法启动,简单粗暴。main方法内部也只有一个方法:

      /**
       * Starts the program with redirected input. For redirected output,
       * setOutputStream() and setErrorStream can be used.
       * Exits with 0 on success, 1 on invalid arguments, and 2 on any other error
       *
       * @param args
       *          same as main()
       *
       * @param inputStream
       *          redirected input, or null to use standard input
       */
      public static void mainWithInputRedirection(String[] args, InputStream inputStream)
          throws IOException {
        BeeLine beeLine = new BeeLine();
        try {
          int status = beeLine.begin(args, inputStream);
    
          if (!Boolean.getBoolean(BeeLineOpts.PROPERTY_NAME_EXIT)) {
              System.exit(status);
          }
        } finally {
          beeLine.close();
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    和平时看到的成功返回码0,失败返回码1一致。从main方法的null入参可知,这货使用的是注释标注的标准输入。

    初始化Beeline对象

    public BeeLine() {
      this(true);
    }
    
    • 1
    • 2
    • 3

    这个构造方法:

    public BeeLine(boolean isBeeLine) {
      this.isBeeLine = isBeeLine;
      this.signalHandler = new SunSignalHandler(this);
      this.shutdownHook = new Runnable() {
        @Override
        public void run() {
          try {
            if (history != null) {
              history.setMaxSize(getOpts().getMaxHistoryRows());
              history.flush();
            }
          } catch (IOException e) {
            error(e);
          } finally {
            close();
          }
        }
      };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在Beeline的类里有这个私有对象:

    import jline.console.history.FileHistory;
    
      private FileHistory history;
      // Indicates if this instance of beeline is running in compatibility mode, or beeline mode
      private boolean isBeeLine = true;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    显然这个多线程任务是用来写历史记录或者Log日志之类的功能,不用过多关注。

    正式开始

    跳入begin方法:

    /**
     * Start accepting input from stdin, and dispatch it
     * to the appropriate {@link CommandHandler} until the
     * global variable exit is true.
     */
    public int begin(String[] args, InputStream inputStream) throws IOException {
      try {
        // load the options first, so we can override on the command line
        getOpts().load();
      } catch (Exception e) {
        // nothing
      }
    
      setupHistory();
    
      //add shutdown hook to cleanup the beeline for smooth exit
      addBeelineShutdownHook();
    
      //this method also initializes the consoleReader which is
      //needed by initArgs for certain execution paths
      ConsoleReader reader = initializeConsoleReader(inputStream);
      if (isBeeLine) {
        int code = initArgs(args);
        if (code != 0) {
          return code;
        }
      } else {
        int code = initArgsFromCliVars(args);
        if (code != 0 || exit) {
          return code;
        }
        defaultConnect(false);
      }
    
      if (getOpts().isHelpAsked()) {
        return 0;
      }
      if (getOpts().getScriptFile() != null) {
        return executeFile(getOpts().getScriptFile());
      }
      try {
        info(getApplicationTitle());
      } catch (Exception e) {
        // ignore
      }
      return execute(reader, false);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    这个begin方法才正式开始执行。可以看到有获取配置、读取输入流、初始化参数、连接、执行、执行脚本文件之类的方法。

    读取配置load

    跳入BeelineOpts.java可以看到:

    public void load() throws IOException {
      try (InputStream in = new FileInputStream(rcFile)) {
        load(in);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    再跳:

    public void load(InputStream fin) throws IOException {
      Properties p = new Properties();
      p.load(fin);
      loadProperties(p);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    再跳:

    public static final String PROPERTY_NAME_EXIT = PROPERTY_PREFIX + "system.exit";
    public static final String PROPERTY_PREFIX = "beeline.";
    
    public void loadProperties(Properties props) {
      for (Object element : props.keySet()) {
        String key = element.toString();
        if (key.equals(PROPERTY_NAME_EXIT)) {
          // fix for sf.net bug 879422
          continue;
        }
        if (key.startsWith(PROPERTY_PREFIX)) {
          set(key.substring(PROPERTY_PREFIX.length()),
              props.getProperty(key));
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    这个方法其实就是判断如果key=“beeline.system.exit”就跳出本次循环,否则根据key去掉“beeline.”后的值作为新key,根据源key获取配置的值作为新value传入set方法:

    public void set(String key, String value) {
      set(key, value, false);
    }
    
    • 1
    • 2
    • 3

    再跳:

    public boolean set(String key, String value, boolean quiet) {
      try {
        beeLine.getReflector().invoke(this, "set" + key, new Object[] {value});
        return true;
      } catch (Exception e) {
        if (!quiet) {
          beeLine.error(beeLine.loc("error-setting", new Object[] {key, e}));
        }
        return false;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这里委托执行:

    package org.apache.hive.beeline;
    
    class Reflector {
      public Object invoke(Object on, String method, Object[] args)
          throws InvocationTargetException, IllegalAccessException,
          ClassNotFoundException {
        return invoke(on, method, Arrays.asList(args));
      }
        
      public Object invoke(Object on, String method, List args)
          throws InvocationTargetException, IllegalAccessException,
          ClassNotFoundException {
        return invoke(on, on == null ? null : on.getClass(), method, args);
      }
        
      public Object invoke(Object on, Class defClass,
          String method, List args)
          throws InvocationTargetException, IllegalAccessException,
          ClassNotFoundException {
        Class c = defClass != null ? defClass : on.getClass();
        List<Method> candidateMethods = new LinkedList<Method>();
    
        Method[] m = c.getMethods();
        for (int i = 0; i < m.length; i++) {
          if (m[i].getName().equalsIgnoreCase(method)) {
            candidateMethods.add(m[i]);
          }
        }
    
        if (candidateMethods.size() == 0) {
          throw new IllegalArgumentException(beeLine.loc("no-method",
              new Object[] {method, c.getName()}));
        }
    
        for (Iterator<Method> i = candidateMethods.iterator(); i.hasNext();) {
          Method meth = i.next();
          Class[] ptypes = meth.getParameterTypes();
          if (!(ptypes.length == args.size())) {
            continue;
          }
    
          Object[] converted = convert(args, ptypes);
          if (converted == null) {
            continue;
          }
    
          if (!Modifier.isPublic(meth.getModifiers())) {
            continue;
          }
          return meth.invoke(on, converted);
        }
        return null;
      }    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    这里会反射获取到所有方法名称为“set某个key”的类和方法并添加到List。之后就会跳入Method.java:

    @CallerSensitive
    public Object invoke(Object obj, Object... args)
        throws IllegalAccessException, IllegalArgumentException,
           InvocationTargetException
    {
        if (!override) {
            if (!Reflection.quickCheckMemberAccess(clazz, modifiers)) {
                Class<?> caller = Reflection.getCallerClass();
                checkAccess(caller, clazz, obj, modifiers);
            }
        }
        MethodAccessor ma = methodAccessor;             // read volatile
        if (ma == null) {
            ma = acquireMethodAccessor();
        }
        return ma.invoke(obj, args);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    遍历吊起所有Beeline类的public的方法。

    例如Beeline类本身会调用自己的部分方法:

    Properties confProps = commandLine.getOptionProperties("hiveconf");
    for (String propKey : confProps.stringPropertyNames()) {
      setHiveConfVar(propKey, confProps.getProperty(propKey));
    }
    
    getOpts().setScriptFile(commandLine.getOptionValue("f"));
    
    if (commandLine.getOptionValues("i") != null) {
      getOpts().setInitFiles(commandLine.getOptionValues("i"));
    }
    
    dbName = commandLine.getOptionValue("database");
    getOpts().setVerbose(Boolean.parseBoolean(commandLine.getOptionValue("verbose")));
    getOpts().setSilent(Boolean.parseBoolean(commandLine.getOptionValue("silent")));
    
    int code = 0;
    if (cl.getOptionValues('e') != null) {
      commands = Arrays.asList(cl.getOptionValues('e'));
      opts.setAllowMultiLineCommand(false); //When using -e, command is always a single line
    }
    
    if (cl.hasOption("help")) {
      usage();
      getOpts().setHelpAsked(true);
      return true;
    }
    
    Properties hiveConfs = cl.getOptionProperties("hiveconf");
    for (String key : hiveConfs.stringPropertyNames()) {
      setHiveConfVar(key, hiveConfs.getProperty(key));
    }
    
    driver = cl.getOptionValue("d");
    auth = cl.getOptionValue("a");
    user = cl.getOptionValue("n");
    getOpts().setAuthType(auth);
    if (cl.hasOption("w")) {
      pass = obtainPasswordFromFile(cl.getOptionValue("w"));
    } else {
    if (beelineParser.isPasswordOptionSet) {
      pass = cl.getOptionValue("p");
      }
    }
    url = cl.getOptionValue("u");
    if ((url == null) && cl.hasOption("reconnect")){
    // If url was not specified with -u, but -r was present, use that.
    url = getOpts().getLastConnectedUrl();
    }
    getOpts().setInitFiles(cl.getOptionValues("i"));
    getOpts().setScriptFile(cl.getOptionValue("f"));
    
    public void updateOptsForCli() {
      getOpts().updateBeeLineOptsFromConf();
      getOpts().setShowHeader(false);
      getOpts().setEscapeCRLF(false);
      getOpts().setOutputFormat("dsv");
      getOpts().setDelimiterForDSV(' ');
      getOpts().setNullEmptyString(true);
    }
    
    setupHistory();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    查看get方法顺路看到了为神马-u是穿用户连接参数 ,-n是敲用户名,-p是敲密码。。。这些都是代码里直接写死的。不用狐疑。

    启动历史setupHistory

      private void setupHistory() throws IOException {
        if (this.history != null) {
           return;
        }
    
    • 1
    • 2
    • 3
    • 4

    这个setupHistory方法不是重点。

    平滑退出addBeelineShutdownHook

    //add shutdown hook to cleanup the beeline for smooth exit
    addBeelineShutdownHook();
    
      private void addBeelineShutdownHook() throws IOException {
        // add shutdown hook to flush the history to history file and it also close all open connections
        ShutdownHookManager.addShutdownHook(getShutdownHook());
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这个方法根据注释描述,应该是Beeline宕机时,把历史记录写入历史文件,然后关闭所有连接,保证Beeline平滑退出。也不是重点。

    初始化命令行读取器initializeConsoleReader

    ConsoleReader reader = initializeConsoleReader(inputStream);
    
    • 1

    这一步调用的方法在Beeline.java可以找到:

    public ConsoleReader initializeConsoleReader(InputStream inputStream) throws IOException {
      if (inputStream != null) {
        // ### NOTE: fix for sf.net bug 879425.
        // Working around an issue in jline-2.1.2, see https://github.com/jline/jline/issues/10
        // by appending a newline to the end of inputstream
        InputStream inputStreamAppendedNewline = new SequenceInputStream(inputStream,
            new ByteArrayInputStream((new String("\n")).getBytes()));
        consoleReader = new ConsoleReader(inputStreamAppendedNewline, getErrorStream());
        consoleReader.setCopyPasteDetection(true); // jline will detect if  is regular character
      } else {
        consoleReader = new ConsoleReader(getInputStream(), getErrorStream());
      }
    
      //disable the expandEvents for the purpose of backward compatibility
      consoleReader.setExpandEvents(false);
    
      try {
        // now set the output for the history
        consoleReader.setHistory(this.history);
      } catch (Exception e) {
        handleException(e);
      }
    
      if (inputStream instanceof FileInputStream || inputStream instanceof FSDataInputStream) {
        // from script.. no need to load history and no need of completer, either
        return consoleReader;
      }
    
      consoleReader.addCompleter(new BeeLineCompleter(this));
      return consoleReader;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    当main主方法调用mainWithInputRedirection传参为null时,只是没有做输入流的重定向,最后不管true还是false都要初始化consoleReader。设置历史相关的参数后,判断输入流如果是文件输入流或者HDFS的文件输入流就不加载历史及完成符,否则加入完成符后返回,也不是重点。

    初始化入参initArgs

    // Indicates if this instance of beeline is running in compatibility mode, or beeline mode
    private boolean isBeeLine = true;
    
    • 1
    • 2

    根据这个对象的描述,如果是beeline正运行在兼容模式或者beeline模式就是true。最开始新建对象时构造方法就是入参true,所以此时一定是true,执行initArgs(args):

    int initArgs(String[] args) {
      List<String> commands = Collections.emptyList();
    
      CommandLine cl;
      BeelineParser beelineParser;
    
      try {
        beelineParser = new BeelineParser();
        cl = beelineParser.parse(options, args);
      } catch (ParseException e1) {
        output(e1.getMessage());
        usage();
        return -1;
      }
    
      boolean connSuccessful = connectUsingArgs(beelineParser, cl);
      // checks if default hs2 connection configuration file is present
      // and uses it to connect if found
      // no-op if the file is not present
      if(!connSuccessful && !exit) {
        connSuccessful = defaultBeelineConnect(cl);
      }
    
      int code = 0;
      if (cl.getOptionValues('e') != null) {
        commands = Arrays.asList(cl.getOptionValues('e'));
        opts.setAllowMultiLineCommand(false); //When using -e, command is always a single line
    
      }
    
      if (!commands.isEmpty() && getOpts().getScriptFile() != null) {
        error("The '-e' and '-f' options cannot be specified simultaneously");
        return 1;
      } else if(!commands.isEmpty() && !connSuccessful) {
        error("Cannot run commands specified using -e. No current connection");
        return 1;
      }
      if (!commands.isEmpty()) {
        for (Iterator<String> i = commands.iterator(); i.hasNext();) {
          String command = i.next().toString();
          debug(loc("executing-command", command));
          if (!dispatch(command)) {
            code++;
          }
        }
        exit = true; // execute and exit
      }
      return code;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    这里会生成Beeline的解析器,之后利用解析器根据选项和入参解析出cl命令。之后执行connectUsingArgs就是根据参数去连接到Hive的Server端。如果连接失败就可能连接到默认的HiveServer2,这种情况很常见,当运维大哥把环境变量之类的玩意儿搞好,直接敲beeline就能根据配置文件连接HiveServer2。

    解析入参
    user = cl.getOptionValue("n");
    
    • 1

    根据connectUsingArgs方法中类似这种的方法,跳入:

    package org.apache.commons.cli;
    
    public class CommandLine implements Serializable {
        public String getOptionValue(String opt) {
            String[] values = this.getOptionValues(opt);
            return values == null ? null : values[0];
        }
        
        public String[] getOptionValues(String opt) {
            List values = new ArrayList();
            Iterator it = this.options.iterator();
    
            while(true) {
                Option option;
                do {
                    if (!it.hasNext()) {
                        return values.isEmpty() ? null : (String[])((String[])values.toArray(new String[values.size()]));
                    }
    
                    option = (Option)it.next();
                } while(!opt.equals(option.getOpt()) && !opt.equals(option.getLongOpt()));
    
                values.addAll(option.getValuesList());
            }
        }    
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    显然这个死循环就是入参有值则返回切分后的第一个,否则返回null。没有传入的参数默认返回就是null。

    找到-e相关内容

    根据注释,传入-e是命令行总是单行,这里有个opts.setAllowMultiLineCommand(false)方法显然是设置了一个参数:

    class BeeLineOpts implements Completer {
        
      private boolean allowMultiLineCommand = true;
        
      public boolean isAllowMultiLineCommand() {
        return allowMultiLineCommand;
      }
    
      public void setAllowMultiLineCommand(boolean allowMultiLineCommand) {
        this.allowMultiLineCommand = allowMultiLineCommand;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    根据-e获取到的结果存在commands对象,这个对象的完整作用域就是方法initArgs:

    package org.apache.hive.beeline;
    
    import org.apache.commons.cli.CommandLine;
    
    public class BeeLine implements Closeable {
      int initArgs(String[] args) {
        List<String> commands = Collections.emptyList();
    
        CommandLine cl;
        BeelineParser beelineParser;
    
        try {
          beelineParser = new BeelineParser();
          cl = beelineParser.parse(options, args);
        } catch (ParseException e1) {
          output(e1.getMessage());
          usage();
          return -1;
        }
    
        boolean connSuccessful = connectUsingArgs(beelineParser, cl);
        // checks if default hs2 connection configuration file is present
        // and uses it to connect if found
        // no-op if the file is not present
        if(!connSuccessful && !exit) {
          connSuccessful = defaultBeelineConnect(cl);
        }
    
        int code = 0;
        if (cl.getOptionValues('e') != null) {
          commands = Arrays.asList(cl.getOptionValues('e'));
          opts.setAllowMultiLineCommand(false); //When using -e, command is always a single line
    
        }
    
        if (!commands.isEmpty() && getOpts().getScriptFile() != null) {
          error("The '-e' and '-f' options cannot be specified simultaneously");
          return 1;
        } else if(!commands.isEmpty() && !connSuccessful) {
          error("Cannot run commands specified using -e. No current connection");
          return 1;
        }
        if (!commands.isEmpty()) {
          for (Iterator<String> i = commands.iterator(); i.hasNext();) {
            String command = i.next().toString();
            debug(loc("executing-command", command));
            if (!dispatch(command)) {
              code++;
            }
          }
          exit = true; // execute and exit
        }
        return code;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    显然-e是初始化参数时用到。对应的参数作为List会被dispatch(command)遍历执行,从而实现-e传参执行多段SQL。显然这个-e和初始化参数关系非常密切。

    找到-i和-f相关的内容

    根据Beeline.java的connectUsingArgs方法中

    getOpts().setScriptFile(commandLine.getOptionValue("f"));
    
    if (commandLine.getOptionValues("i") != null) {
      getOpts().setInitFiles(commandLine.getOptionValues("i"));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    跳入:

    package org.apache.hive.beeline;
    class BeeLineOpts implements Completer {
      public void setInitFiles(String[] initFiles) {
        this.initFiles = initFiles;
      private String scriptFile = null;     
          
          
      public String[] getInitFiles() {
        return initFiles;
      }
    
      public void setInitFiles(String[] initFiles) {
        this.initFiles = initFiles;
      }
        
      public void setScriptFile(String scriptFile) {
        this.scriptFile = scriptFile;
      }
    
      public String getScriptFile() {
        return scriptFile;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    显然-i是设置了初始化需要的文件,-f是设置了跑脚本的文件。之后会有位置用到。

    找到-u相关的内容

    根据Beeline.java的

    url = cl.getOptionValue("u");
    
    • 1

    根据url找到在Beeline.java中完整的作用域:

    /*
     * Connects using the command line arguments. There are two
     * possible ways to connect here 1. using the cmd line arguments like -u
     * or using !properties 
     */
    private boolean connectUsingArgs(BeelineParser beelineParser, CommandLine cl) {
      String driver = null, user = null, pass = "", url = null;
      String auth = null;
    
    
      if (cl.hasOption("help")) {
        usage();
        getOpts().setHelpAsked(true);
        return true;
      }
    
      Properties hiveVars = cl.getOptionProperties("hivevar");
      for (String key : hiveVars.stringPropertyNames()) {
        getOpts().getHiveVariables().put(key, hiveVars.getProperty(key));
      }
    
      Properties hiveConfs = cl.getOptionProperties("hiveconf");
      for (String key : hiveConfs.stringPropertyNames()) {
        setHiveConfVar(key, hiveConfs.getProperty(key));
      }
    
      driver = cl.getOptionValue("d");
      auth = cl.getOptionValue("a");
      user = cl.getOptionValue("n");
      getOpts().setAuthType(auth);
      if (cl.hasOption("w")) {
        pass = obtainPasswordFromFile(cl.getOptionValue("w"));
      } else {
        if (beelineParser.isPasswordOptionSet) {
          pass = cl.getOptionValue("p");
        }
      }
      url = cl.getOptionValue("u");
      if ((url == null) && cl.hasOption("reconnect")){
        // If url was not specified with -u, but -r was present, use that.
        url = getOpts().getLastConnectedUrl();
      }
      getOpts().setInitFiles(cl.getOptionValues("i"));
      getOpts().setScriptFile(cl.getOptionValue("f"));
    
    
      if (url != null) {
        // Specifying username/password/driver explicitly will override the values from the url;
        // make sure we don't override the values present in the url with empty values.
        if (user == null) {
          user = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER);
        }
        if (pass == null) {
          pass = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PASSWD);
        }
        if (driver == null) {
          driver = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.PROPERTY_DRIVER);
        }
    
        String com;
        String comForDebug;
        if(pass != null) {
          com = constructCmd(url, user, pass, driver, false);
          comForDebug = constructCmd(url, user, pass, driver, true);
        } else {
          com = constructCmdUrl(url, user, driver, false);
          comForDebug = constructCmdUrl(url, user, driver, true);
        }
        debug(comForDebug);
        if (!dispatch(com)) {
          exit = true;
          return false;
        }
        return true;
      }
      // load property file
      String propertyFile = cl.getOptionValue("property-file");
      if (propertyFile != null) {
        try {
          this.consoleReader = new ConsoleReader();
        } catch (IOException e) {
          handleException(e);
        }
        if (!dispatch("!properties " + propertyFile)) {
          exit = true;
          return false;
        }
      }
      return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    当根据-u拿到的url不是null时,就会执行一系列骚操作,先后用工具类拿到用户名、密码、驱动。之后密码不为null就执行constructCmd方法继续执行并拿到字符串对象com:

    private String constructCmd(String url, String user, String pass, String driver, boolean stripPasswd) {
      return new StringBuilder()
         .append("!connect ")
         .append(url)
         .append(" ")
         .append(user == null || user.length() == 0 ? "''" : user)
         .append(" ")
         .append(stripPasswd ? PASSWD_MASK : (pass.length() == 0 ? "''" : pass))
         .append(" ")
         .append((driver == null ? "" : driver))
         .toString();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    然后执行dispatch(com)方法,根据-u转换出来的com字符串对象做一些骚操作,当然没有循环就只会做一次。从内容看,是补充了!connect前缀,不必多说,和初始化连接肯定有关系。

    但是url不为null,用户名、密码、驱动为null的情况【没有使用-n、-p、-d时,用户名、密码、驱动为null】都是要调用parsePropertyFromUrl:

    package org.apache.hive.jdbc;
    
    public class Utils {
      public static String parsePropertyFromUrl(final String url, final String key) {
        String[] tokens = url.split(";");
        for (String token : tokens) {
          if (token.trim().startsWith(key.trim() + "=")) {
            return token.trim().substring((key.trim() + "=").length());
          }
        }
        return null;
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    根据key这个参数是user、password还是driver确定具体执行的套路。首先根据“;”切分字符串,然后判断切分后的字符串是否为“key值=”开头,是的话就去掉“key值=”部分,获取到value值,否则null。显然这个方法不止能拿到user、password、driver这3个配置,只要符合格式要求【;k1=v1;k2=v2;k3=v3这种】都是可以拿到。

    之后如果获取到密码,也是执行constructCmd,皆大欢喜。

    当根据-u拿到的url不是null,且密码为null,这种才是绝大多数情况,例如启用了Kerberos认证的情况。此时要执行constructCmdUrl:

    /**
     * This is an internal method used to create !connect command when -p option is used without
     * providing the password on the command line. Connect command returned should be ; separated
     * key-value pairs along with the url. We cannot use space separated !connect url user [password]
     * [driver] here since both password and driver are optional and there would be no way to
     * distinguish if the last string is password or driver
     *
     * @param url connection url passed using -u argument on the command line
     * @param user username passed through command line
     * @param driver driver passed through command line -d option
     * @param stripPasswd when set to true generates a !connect command which strips the password for
     *          logging purposes
     * @return !connect command
     */
    private String constructCmdUrl(String url, String user, String driver,
        boolean stripPasswd)  {
      StringBuilder command = new StringBuilder("!connect ");
      command.append(url);
      //if the url does not have a database name add the trailing '/'
      if(isTrailingSlashNeeded(url)) {
        command.append('/');
      }
      command.append(';');
      // if the username is not already available in the URL add the one provided
      if (Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER) == null) {
        command.append(JdbcConnectionParams.AUTH_USER);
        command.append('=');
        command.append((user == null || user.length() == 0 ? "''" : user));
      }
      if (stripPasswd) {
        // if password is available in url it needs to be striped
        int startIndex = command.indexOf(JdbcConnectionParams.AUTH_PASSWD + "=")
            + JdbcConnectionParams.AUTH_PASSWD.length() + 2;
        if(startIndex != -1) {
          int endIndex = command.toString().indexOf(";", startIndex);
          command.replace(startIndex, (endIndex == -1 ? command.length() : endIndex),
            BeeLine.PASSWD_MASK);
        }
      }
      // if the driver is not already available in the URL add the one provided
      if (Utils.parsePropertyFromUrl(url, JdbcConnectionParams.PROPERTY_DRIVER) == null
          && driver != null) {
        command.append(';');
        command.append(JdbcConnectionParams.PROPERTY_DRIVER);
        command.append("=");
        command.append(driver);
      }
      return command.toString();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    显然这个方法并没有干什么惊天地泣鬼神的大事,只是对!connect的连接串做了格式修正。

    调度dispatch

    Hive的beeline中,-e和-u都使用了dispatch方法,显然这个方法相当重要。跳入Beeline.java的dispatch方法:

    public static final String COMMAND_PREFIX = "!";
    /**
     * Dispatch the specified line to the appropriate {@link CommandHandler}.
     *
     * @param line
     *          the command-line to dispatch
     * @return true if the command was "successful"
     */
    boolean dispatch(String line) {
      if (line == null) {
        // exit
        exit = true;
        return true;
      }
    
      if (line.trim().length() == 0) {
        return true;
      }
    
      if (isComment(line)) {
        return true;
      }
    
      line = line.trim();
    
      // save it to the current script, if any
      if (scriptOutputFile != null) {
        scriptOutputFile.addLine(line);
      }
    
      if (isHelpRequest(line)) {
        line = "!help";
      }
    
      if (isBeeLine) {
        if (line.startsWith(COMMAND_PREFIX)) {
          // handle SQLLine command in beeline which starts with ! and does not end with ;
          return execCommandWithPrefix(line);
        } else {
          return commands.sql(line, getOpts().getEntireLineAsCommand());
        }
      } else {
        return commands.sql(line, getOpts().getEntireLineAsCommand());
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    到这步又要根据isBeeLine判断调用哪种方法。

    调用有前缀方法execCommandWithPrefix

    显然根据之前的分析,isBeeLine正常情况=true,那么入参line以“!”开头则执行execCommandWithPrefix(line):

    /**
     * This method is used for executing commands beginning with !
     * @param line
     * @return
     */
    public boolean execCommandWithPrefix(String line) {
      Map<String, CommandHandler> cmdMap = new TreeMap<String, CommandHandler>();
      line = line.substring(1);
      for (int i = 0; i < commandHandlers.length; i++) {
        String match = commandHandlers[i].matches(line);
        if (match != null) {
          cmdMap.put(match, commandHandlers[i]);
        }
      }
    
      if (cmdMap.size() == 0) {
        return error(loc("unknown-command", line));
      }
      if (cmdMap.size() > 1) {
        // any exact match?
        CommandHandler handler = cmdMap.get(line);
        if (handler == null) {
          return error(loc("multiple-matches", cmdMap.keySet().toString()));
        }
        return handler.execute(line);
      }
      return cmdMap.values().iterator().next().execute(line);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    而这个遍历的句柄commandHandlers也是在Beeline.java中:

    final CommandHandler[] commandHandlers = new CommandHandler[] {
        new ReflectiveCommandHandler(this, new String[] {"quit", "done", "exit"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"connect", "open"},
            new Completer[] {new StringsCompleter(getConnectionURLExamples())}),
        new ReflectiveCommandHandler(this, new String[] {"describe"},
            new Completer[] {new TableNameCompletor(this)}),
        new ReflectiveCommandHandler(this, new String[] {"indexes"},
            new Completer[] {new TableNameCompletor(this)}),
        new ReflectiveCommandHandler(this, new String[] {"primarykeys"},
            new Completer[] {new TableNameCompletor(this)}),
        new ReflectiveCommandHandler(this, new String[] {"exportedkeys"},
            new Completer[] {new TableNameCompletor(this)}),
        new ReflectiveCommandHandler(this, new String[] {"manual"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"importedkeys"},
            new Completer[] {new TableNameCompletor(this)}),
        new ReflectiveCommandHandler(this, new String[] {"procedures"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"tables"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"typeinfo"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"columns"},
            new Completer[] {new TableNameCompletor(this)}),
        new ReflectiveCommandHandler(this, new String[] {"reconnect"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"dropall"},
            new Completer[] {new TableNameCompletor(this)}),
        new ReflectiveCommandHandler(this, new String[] {"history"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"metadata"},
            new Completer[] {
                new StringsCompleter(getMetadataMethodNames())}),
        new ReflectiveCommandHandler(this, new String[] {"nativesql"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"dbinfo"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"rehash"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"verbose"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"run"},
            new Completer[] {new FileNameCompleter()}),
        new ReflectiveCommandHandler(this, new String[] {"batch"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"list"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"all"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"go", "#"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"script"},
            new Completer[] {new FileNameCompleter()}),
        new ReflectiveCommandHandler(this, new String[] {"record"},
            new Completer[] {new FileNameCompleter()}),
        new ReflectiveCommandHandler(this, new String[] {"brief"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"close"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"closeall"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"isolation"},
            new Completer[] {new StringsCompleter(getIsolationLevels())}),
        new ReflectiveCommandHandler(this, new String[] {"outputformat"},
            new Completer[] {new StringsCompleter(
                formats.keySet().toArray(new String[0]))}),
        new ReflectiveCommandHandler(this, new String[] {"autocommit"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"commit"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"properties"},
            new Completer[] {new FileNameCompleter()}),
        new ReflectiveCommandHandler(this, new String[] {"rollback"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"help", "?"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"set"},
            getOpts().optionCompleters()),
        new ReflectiveCommandHandler(this, new String[] {"save"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"scan"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"sql"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"sh"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"call"},
            null),
        new ReflectiveCommandHandler(this, new String[] {"nullemptystring"},
            new Completer[] {new BooleanCompleter()}),
        new ReflectiveCommandHandler(this, new String[]{"addlocaldriverjar"},
            null),
        new ReflectiveCommandHandler(this, new String[]{"addlocaldrivername"},
            null),
        new ReflectiveCommandHandler(this, new String[]{"delimiter"},
            null)
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    显然!connect会被句柄捕获。之后根据被捕获的次数【Map类型的cmdMap实例对象会起到去重作用】执行execute(line)方法,确保每个命令都被执行。

    这里的CommandHandler是个接口:

    package org.apache.hive.beeline;
    
    import jline.console.completer.Completer;
    
    /**
     * A generic command to be executed. Execution of the command
     * should be dispatched to the {@link #execute(java.lang.String)} method after determining that
     * the command is appropriate with
     * the {@link #matches(java.lang.String)} method.
     *
     */
    interface CommandHandler {
      /**
       * @return the name of the command
       */
      public String getName();
    
    
      /**
       * @return all the possible names of this command.
       */
      public String[] getNames();
    
    
      /**
       * @return the short help description for this command.
       */
      public String getHelpText();
    
    
      /**
       * Check to see if the specified string can be dispatched to this
       * command.
       *
       * @param line
       *          the command line to check.
       * @return the command string that matches, or null if it no match
       */
      public String matches(String line);
    
    
      /**
       * Execute the specified command.
       *
       * @param line
       *          the full command line to execute.
       */
      public boolean execute(String line);
    
    
      /**
       * Returns the completors that can handle parameters.
       */
      public Completer[] getParameterCompleters();
    
      /**
       * Returns exception thrown for last command
       * @return
       */
      public Throwable getLastException();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    查看继承树:

    在这里插入图片描述

    可以看到其有唯一的抽象继承类AbstractCommandHandler:

    package org.apache.hive.beeline;
    
    import java.util.Arrays;
    import java.util.LinkedList;
    import java.util.List;
    
    import jline.console.completer.Completer;
    import jline.console.completer.NullCompleter;
    
    /**
     * An abstract implementation of CommandHandler.
     *
     */
    public abstract class AbstractCommandHandler implements CommandHandler {
      private final BeeLine beeLine;
      private final String name;
      private final String[] names;
      private final String helpText;
      private Completer[] parameterCompleters = new Completer[0];
    
      protected transient Throwable lastException;
    
      public AbstractCommandHandler(BeeLine beeLine, String[] names, String helpText,
                                    Completer[] completors) {
        this.beeLine = beeLine;
        name = names[0];
        this.names = names;
        this.helpText = helpText;
        if (completors == null || completors.length == 0) {
          parameterCompleters = new Completer[] { new NullCompleter() };
        } else {
          List<Completer> c = new LinkedList<Completer>(Arrays.asList(completors));
          c.add(new NullCompleter());
          parameterCompleters = c.toArray(new Completer[0]);
        }
      }
    
      @Override
      public String getHelpText() {
        return helpText;
      }
    
    
      @Override
      public String getName() {
        return name;
      }
    
    
      @Override
      public String[] getNames() {
        return names;
      }
    
    
      @Override
      public String matches(String line) {
        if (line == null || line.length() == 0) {
          return null;
        }
    
        String[] parts = beeLine.split(line);
        if (parts == null || parts.length == 0) {
          return null;
        }
    
        for (String name2 : names) {
          if (name2.startsWith(parts[0])) {
            return name2;
          }
        }
        return null;
      }
    
      public void setParameterCompleters(Completer[] parameterCompleters) {
        this.parameterCompleters = parameterCompleters;
      }
    
      @Override
      public Completer[] getParameterCompleters() {
        return parameterCompleters;
      }
    
      @Override
      public Throwable getLastException() {
        return lastException;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    且有唯一的反射继承类ReflectiveCommandHandler:

    package org.apache.hive.beeline;
    
    import jline.console.completer.Completer;
    
    import org.apache.hadoop.fs.shell.Command;
    
    /**
     * A {@link Command} implementation that uses reflection to
     * determine the method to dispatch the command.
     *
     */
    public class ReflectiveCommandHandler extends AbstractCommandHandler {
      private final BeeLine beeLine;
    
      /**
       * @param beeLine
       * @param cmds      'cmds' is an array of alternative names for the same command. And that the
       *                  first one is always chosen for display purposes and to lookup help
       *                  documentation from BeeLine.properties file.
       * @param completer
       */
      public ReflectiveCommandHandler(BeeLine beeLine, String[] cmds, Completer[] completer) {
        super(beeLine, cmds, beeLine.loc("help-" + cmds[0]), completer);
        this.beeLine = beeLine;
      }
    
      public boolean execute(String line) {
        lastException = null;
        ClientHook hook = ClientCommandHookFactory.get().getHook(beeLine, line);
    
        try {
          Object ob = beeLine.getCommands().getClass().getMethod(getName(),
              new Class[] {String.class})
              .invoke(beeLine.getCommands(), new Object[] {line});
    
          boolean result = (ob != null && ob instanceof Boolean && ((Boolean) ob).booleanValue());
    
          if (hook != null && result) {
            hook.postHook(beeLine);
          }
    
          return result;
        } catch (Throwable e) {
          lastException = e;
          return beeLine.error(e);
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    怎么匹配前缀不是重点。重点是这货反射获取到commands对象后,就会获取到ReflectiveCommandHandler构建方法的第二个字符串数组的第一个值【例如connect】,就会调用Commands类的方法实现相关功能。笔者此处还是以connect举例:

    package org.apache.hive.beeline;
    
    public class Commands {
    public boolean connect(String line) throws Exception {
      String example = "Usage: connect    [driver]"
          + BeeLine.getSeparator();
    
      String[] parts = beeLine.split(line);
      if (parts == null) {
        return false;
      }
    
      if (parts.length < 2) {
        return beeLine.error(example);
      }
    
      String url = parts.length < 2 ? null : parts[1];
      String user = parts.length < 3 ? null : parts[2];
      String pass = parts.length < 4 ? null : parts[3];
      String driver = parts.length < 5 ? null : parts[4];
    
      Properties props = new Properties();
      if (url != null) {
        String saveUrl = getUrlToUse(url);
        props.setProperty(JdbcConnectionParams.PROPERTY_URL, saveUrl);
      }
    
      String value = null;
      if (driver != null) {
        props.setProperty(JdbcConnectionParams.PROPERTY_DRIVER, driver);
      } else {
        value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.PROPERTY_DRIVER);
        if (value != null) {
          props.setProperty(JdbcConnectionParams.PROPERTY_DRIVER, value);
        }
      }
    
      if (user != null) {
        props.setProperty(JdbcConnectionParams.AUTH_USER, user);
      } else {
        value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER);
        if (value != null) {
          props.setProperty(JdbcConnectionParams.AUTH_USER, value);
        }
      }
    
      if (pass != null) {
        props.setProperty(JdbcConnectionParams.AUTH_PASSWD, pass);
      } else {
        value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PASSWD);
        if (value != null) {
          props.setProperty(JdbcConnectionParams.AUTH_PASSWD, value);
        }
      }
    
      value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_TYPE);
      if (value != null) {
        props.setProperty(JdbcConnectionParams.AUTH_TYPE, value);
      }
      return connect(props);
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    例如匹配到!connect的情况就会执行这个connect方法。类似的情况不再赘述。平时最常见的也就是!connect和!quit了。

    调用无前缀方法commands.sql

    不是“!”开头就执行commands.sql(line, getOpts().getEntireLineAsCommand()):

    package org.apache.hive.beeline;
    public class Commands {
      public boolean sql(String line, boolean entireLineAsCommand) {
        return execute(line, false, entireLineAsCommand);
      }
    
      private boolean execute(String line, boolean call, boolean entireLineAsCommand) {
        if (line == null || line.length() == 0) {
          return false; // ???
        }
    
        // ### FIXME: doing the multi-line handling down here means
        // higher-level logic never sees the extra lines. So,
        // for example, if a script is being saved, it won't include
        // the continuation lines! This is logged as sf.net
        // bug 879518.
    
        // use multiple lines for statements not terminated by the delimiter
        try {
          line = handleMultiLineCmd(line);
        } catch (Exception e) {
          beeLine.handleException(e);
        }
    
        line = line.trim();
        List<String> cmdList = getCmdList(line, entireLineAsCommand);
        for (int i = 0; i < cmdList.size(); i++) {
          String sql = cmdList.get(i).trim();
          if (sql.length() != 0) {
            if (!executeInternal(sql, call)) {
              return false;
            }
          }
        }
        return true;
      }    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    显然最正常的情况就是执行executeInternal(sql, call)方法【Commands.java中,call=false】:

    // Return false only occurred error when execution the sql and the sql should follow the rules
    // of beeline.
    private boolean executeInternal(String sql, boolean call) {
      if (!beeLine.isBeeLine()) {
        sql = cliToBeelineCmd(sql);
      }
    
      if (sql == null || sql.length() == 0) {
        return true;
      }
    
      if (beeLine.isComment(sql)) {
        //skip this and rest cmds in the line
        return true;
      }
    
      // is source CMD
      if (isSourceCMD(sql)) {
        return sourceFile(sql);
      }
    
      if (sql.startsWith(BeeLine.COMMAND_PREFIX)) {
        return beeLine.execCommandWithPrefix(sql);
      }
    
      String prefix = call ? "call" : "sql";
    
      if (sql.startsWith(prefix)) {
        sql = sql.substring(prefix.length());
      }
    
      // batch statements?
      if (beeLine.getBatch() != null) {
        beeLine.getBatch().add(sql);
        return true;
      }
    
      if (!(beeLine.assertConnection())) {
        return false;
      }
    
      ClientHook hook = ClientCommandHookFactory.get().getHook(beeLine, sql);
    
      try {
        Statement stmnt = null;
        boolean hasResults;
        Thread logThread = null;
    
        try {
          long start = System.currentTimeMillis();
    
          if (call) {
            stmnt = beeLine.getDatabaseConnection().getConnection().prepareCall(sql);
            hasResults = ((CallableStatement) stmnt).execute();
          } else {
            stmnt = beeLine.createStatement();
            // In test mode we want the operation logs regardless of the settings
            if (!beeLine.isTestMode() && beeLine.getOpts().isSilent()) {
              hasResults = stmnt.execute(sql);
            } else {
              InPlaceUpdateStream.EventNotifier eventNotifier =
                  new InPlaceUpdateStream.EventNotifier();
              logThread = new Thread(createLogRunnable(stmnt, eventNotifier));
              logThread.setDaemon(true);
              logThread.start();
              if (stmnt instanceof HiveStatement) {
                HiveStatement hiveStatement = (HiveStatement) stmnt;
                hiveStatement.setInPlaceUpdateStream(
                    new BeelineInPlaceUpdateStream(
                        beeLine.getErrorStream(),
                        eventNotifier
                    ));
              }
              hasResults = stmnt.execute(sql);
              logThread.interrupt();
              logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
            }
          }
    
          beeLine.showWarnings();
    
          if (hasResults) {
            OutputFile outputFile = beeLine.getRecordOutputFile();
            if (beeLine.isTestMode() && outputFile != null && outputFile.isActiveConverter()) {
              outputFile.fetchStarted();
              if (!sql.trim().toLowerCase().startsWith("explain")) {
                outputFile.foundQuery(true);
              } else {
                outputFile.foundQuery(false);
              }
            }
            do {
              ResultSet rs = stmnt.getResultSet();
              try {
                int count = beeLine.print(rs);
                long end = System.currentTimeMillis();
    
                beeLine.info(
                    beeLine.loc("rows-selected", count) + " " + beeLine.locElapsedTime(end - start));
              } finally {
                if (logThread != null) {
                  logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
                  showRemainingLogsIfAny(stmnt);
                  logThread = null;
                }
                rs.close();
              }
            } while (BeeLine.getMoreResults(stmnt));
            if (beeLine.isTestMode() && outputFile != null && outputFile.isActiveConverter()) {
              outputFile.fetchFinished();
            }
          } else {
            int count = stmnt.getUpdateCount();
            long end = System.currentTimeMillis();
            beeLine.info(
                beeLine.loc("rows-affected", count) + " " + beeLine.locElapsedTime(end - start));
          }
        } finally {
          if (logThread != null) {
            if (!logThread.isInterrupted()) {
              logThread.interrupt();
            }
            logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
            showRemainingLogsIfAny(stmnt);
          }
          if (stmnt != null) {
            stmnt.close();
          }
        }
      } catch (Exception e) {
        return beeLine.error(e);
      }
      beeLine.showWarnings();
      if (hook != null) {
        hook.postHook(beeLine);
      }
      return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138

    然后就看到熟悉的Statement,和普通的JDBC大体相似的一波操作猛如虎。。。还顺带搞了个10000ms的log线程有啥用暂时也不用深究。

    当call=false时,又会根据是否在test模式及一个配置项走2种方式,但最终都要执行:stmnt.execute。这个方法不陌生,就是Java的JDBC常用的Statement接口的execute方法。当执行sql的statement返回结果时,又要根据是否有enplain执行不同操作。最后就是是否statement,展示warning信息,及最后postHook释放连接资源。

    调用无前缀方法时获取连接

    当call=false时,执行:

    stmnt = beeLine.createStatement();
    
    • 1

    获取到了JDBC的statement。跳入Beeline.java可以看到:

    Statement createStatement() throws SQLException {
      Statement stmnt = getDatabaseConnection().getConnection().createStatement();
      if (getOpts().timeout > -1) {
        stmnt.setQueryTimeout(getOpts().timeout);
      }
      if (signalHandler != null) {
        signalHandler.setStatement(stmnt);
      }
      return stmnt;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    最开始的getDatabaseConnection方法:

    private final DatabaseConnections connections = new DatabaseConnections();
    
    DatabaseConnection getDatabaseConnection() {
      return getDatabaseConnections().current();
    }
    
      DatabaseConnections getDatabaseConnections() {
        return connections;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    显然这是个从类似连接池的类里拿取数据库连接的方法:

    package org.apache.hive.beeline;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    class DatabaseConnections {
      private final List<DatabaseConnection> connections = new ArrayList<DatabaseConnection>();
      private int index = -1;
    
      public DatabaseConnection current() {
        if (index != -1) {
          return connections.get(index);
        }
        return null;
      }
    
      public int size() {
        return connections.size();
      }
    
      public Iterator<DatabaseConnection> iterator() {
        return connections.iterator();
      }
    
      public void remove() {
        if (index != -1) {
          connections.remove(index);
        }
        while (index >= connections.size()) {
          index--;
        }
      }
    
      public void setConnection(DatabaseConnection connection) {
        if (connections.indexOf(connection) == -1) {
          connections.add(connection);
        }
        index = connections.indexOf(connection);
      }
    
      public int getIndex() {
        return index;
      }
    
    
      public boolean setIndex(int index) {
        if (index < 0 || index >= connections.size()) {
          return false;
        }
        this.index = index;
        return true;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    而连接对象DatabaseConnection:

    package org.apache.hive.beeline;
    
    import org.apache.hive.jdbc.HiveConnection;
    
    import jline.console.completer.ArgumentCompleter;
    import jline.console.completer.Completer;
    
    class DatabaseConnection {
      private final String driver;
      private final String url;
      private final Properties info;    
      public DatabaseConnection(BeeLine beeLine, String driver, String url,
           Properties info) throws SQLException {
        this.beeLine = beeLine;
        this.driver = driver;
        this.url = url;
        this.info = info;
      }    
        
        
      public Connection getConnection() throws SQLException {
        if (connection != null) {
          return connection;
        }
        connect();
        return connection;
      }
        
      /**
       * Connection to the specified data source.
       */
      boolean connect() throws SQLException {
        try {
          if (driver != null && driver.length() != 0) {
            Class.forName(driver);
          }
        } catch (ClassNotFoundException cnfe) {
          return beeLine.error(cnfe);
        }
    
        boolean isDriverRegistered = false;
        try {
          isDriverRegistered = DriverManager.getDriver(getUrl()) != null;
        } catch (Exception e) {
        }
    
        try {
          close();
        } catch (Exception e) {
          return beeLine.error(e);
        }
    
        Map<String, String> hiveVars = beeLine.getOpts().getHiveVariables();
        if (hiveVars != null){
          for (Map.Entry<String, String> var : hiveVars.entrySet()) {
            info.put(HIVE_VAR_PREFIX + var.getKey(), var.getValue());
          }
        }
    
        Map<String, String> hiveConfVars = beeLine.getOpts().getHiveConfVariables();
        if (hiveConfVars != null){
          for (Map.Entry<String, String> var : hiveConfVars.entrySet()) {
            info.put(HIVE_CONF_PREFIX + var.getKey(), var.getValue());
          }
        }
    
        if (isDriverRegistered) {
          // if the driver registered in the driver manager, get the connection via the driver manager
          setConnection(DriverManager.getConnection(getUrl(), info));
        } else {
          beeLine.debug("Use the driver from local added jar file.");
          setConnection(getConnectionFromLocalDriver(getUrl(), info));
        }
        setDatabaseMetaData(getConnection().getMetaData());
    
        try {
          beeLine.info(beeLine.loc("connected", new Object[] {
              getDatabaseMetaData().getDatabaseProductName(),
              getDatabaseMetaData().getDatabaseProductVersion()}));
        } catch (Exception e) {
          beeLine.handleException(e);
        }
    
        try {
          beeLine.info(beeLine.loc("driver", new Object[] {
              getDatabaseMetaData().getDriverName(),
              getDatabaseMetaData().getDriverVersion()}));
        } catch (Exception e) {
          beeLine.handleException(e);
        }
    
        try {
          getConnection().setAutoCommit(beeLine.getOpts().getAutoCommit());
          // TODO: Setting autocommit should not generate an exception as long as it is set to false
          // beeLine.autocommitStatus(getConnection());
        } catch (Exception e) {
          beeLine.handleException(e);
        }
    
        try {
          beeLine.getCommands().isolation("isolation: " + beeLine.getOpts().getIsolation());
        } catch (Exception e) {
          beeLine.handleException(e);
        }
    
        return true;
      }    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108

    从构造方法来看,如果是null就会自动执行connect方法,这个connect方法就是类似JDBC创建连接的那一套,加载驱动包并且创建连接对象,那么DatabaseConnection的实例对象一定是有连接的。而DatabaseConnection的构造方法也传入了driver和url等参数。可以看到,就是喜闻乐见的JDBC。

    执行脚本文件executeFile

    根据加载后的配置判断,如果有脚本文件,执行executeFile(getOpts().getScriptFile()):

    private int executeFile(String fileName) {
      InputStream fileStream = null;
      try {
        if (!isBeeLine) {
          org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(fileName);
          FileSystem fs;
          HiveConf conf = getCommands().getHiveConf(true);
          if (!path.toUri().isAbsolute()) {
            fs = FileSystem.getLocal(conf);
            path = fs.makeQualified(path);
          } else {
            fs = FileSystem.get(path.toUri(), conf);
          }
          fileStream = fs.open(path);
        } else {
          fileStream = new FileInputStream(fileName);
        }
        return execute(initializeConsoleReader(fileStream), !getOpts().getForce());
      } catch (Throwable t) {
        handleException(t);
        return ERRNO_OTHER;
      } finally {
        IOUtils.closeStream(fileStream);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    也就是会去获取到文件流,调用execute方法去具体执行。

    最终执行execute

    最后会执行execute(reader, false)。根据之前的分析,这是记录历史文件及日志相关的功能,不是重点。正常执行后会退出。如果有-f传入的脚本文件,最终会在这个方法执行完毕,并且正常退出。

    private int execute(ConsoleReader reader, boolean exitOnError) {
      int lastExecutionResult = ERRNO_OK;
      Character mask = (System.getProperty("jline.terminal", "").equals("jline.UnsupportedTerminal")) ? null
                         : ConsoleReader.NULL_MASK;
    
      while (!exit) {
        try {
          // Execute one instruction; terminate on executing a script if there is an error
          // in silent mode, prevent the query and prompt being echoed back to terminal
          String line = (getOpts().isSilent() && getOpts().getScriptFile() != null) ? reader
              .readLine(null, mask) : reader.readLine(getPrompt());
    
          // trim line
          if (line != null) {
            line = line.trim();
          }
    
          if (!dispatch(line)) {
            lastExecutionResult = ERRNO_OTHER;
            if (exitOnError) {
              break;
            }
          } else if (line != null) {
            lastExecutionResult = ERRNO_OK;
          }
    
        } catch (Throwable t) {
          handleException(t);
          return ERRNO_OTHER;
        }
      }
      return lastExecutionResult;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    正常的大数据平台一般也是有-e传参的,文件传参不赘述。

    JDBC连接Hive

    从源码不难看出,beeline就是走JDBC连接Hive,并通过-u、-e等传入了参数,交给statement执行SQL。从源码也可以看出是通过url创建的连接,而JDBC众所周知是需要走反射的方式:

    /**
     * Attempts to establish a connection to the given database URL.
     * The DriverManager attempts to select an appropriate driver from
     * the set of registered JDBC drivers.
     *

    * Note: If a property is specified as part of the {@code url} and * is also specified in the {@code Properties} object, it is * implementation-defined as to which value will take precedence. * For maximum portability, an application should only specify a * property once. * * @param url a database url of the form * jdbc:subprotocol:subname * @param info a list of arbitrary string tag/value pairs as * connection arguments; normally at least a "user" and * "password" property should be included * @return a Connection to the URL * @exception SQLException if a database access error occurs or the url is * {@code null} * @throws SQLTimeoutException when the driver has determined that the * timeout value specified by the {@code setLoginTimeout} method * has been exceeded and has at least tried to cancel the * current database connection attempt */ @CallerSensitive public static Connection getConnection(String url, java.util.Properties info) throws SQLException { return (getConnection(url, info, Reflection.getCallerClass())); }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    接下来看看反射加载的包具体实现,看看url是怎样传入的配置项。

    找到驱动类

    Hive的JDBC驱动包名称都不陌生,就是这个org.apache.hive.jdbc.HiveDriver:

    package org.apache.hive.jdbc;
    
    import java.io.IOException;
    import java.net.URL;
    import java.sql.Connection;
    import java.sql.Driver;
    import java.sql.DriverPropertyInfo;
    import java.sql.SQLException;
    import java.sql.SQLFeatureNotSupportedException;
    import java.util.Properties;
    import java.util.jar.Attributes;
    import java.util.jar.Manifest;
    import java.util.logging.Logger;
    import java.util.regex.Pattern;
    
    import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
    
    
    /**
     * HiveDriver.
     *
     */
    public class HiveDriver implements Driver {
      static {
        try {
          java.sql.DriverManager.registerDriver(new HiveDriver());
        } catch (SQLException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
    
      /**
       * Is this driver JDBC compliant?
       */
      private static final boolean JDBC_COMPLIANT = false;
    
      /**
       * Property key for the database name.
       */
      private static final String DBNAME_PROPERTY_KEY = "DBNAME";
    
      /**
       * Property key for the Hive Server2 host.
       */
      private static final String HOST_PROPERTY_KEY = "HOST";
    
      /**
       * Property key for the Hive Server2 port.
       */
      private static final String PORT_PROPERTY_KEY = "PORT";
    
    
      /**
       *
       */
      public HiveDriver() {
        // TODO Auto-generated constructor stub
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
          security.checkWrite("foobah");
        }
      }
    
      /**
       * Checks whether a given url is in a valid format.
       *
       * The current uri format is: jdbc:hive://[host[:port]]
       *
       * jdbc:hive:// - run in embedded mode jdbc:hive://localhost - connect to
       * localhost default port (10000) jdbc:hive://localhost:5050 - connect to
       * localhost port 5050
       *
       * TODO: - write a better regex. - decide on uri format
       */
    
      @Override
      public boolean acceptsURL(String url) throws SQLException {
        return Pattern.matches(Utils.URL_PREFIX + ".*", url);
      }
    
      /*
       * As per JDBC 3.0 Spec (section 9.2)
       * "If the Driver implementation understands the URL, it will return a Connection object;
       * otherwise it returns null"
       */
      @Override
      public Connection connect(String url, Properties info) throws SQLException {
        return acceptsURL(url) ? new HiveConnection(url, info) : null;
      }
    
      /**
       * Package scoped access to the Driver's Major Version
       * @return The Major version number of the driver. -1 if it cannot be determined from the
       * manifest.mf file.
       */
      static int getMajorDriverVersion() {
        int version = -1;
        try {
          String fullVersion = HiveDriver.fetchManifestAttribute(
              Attributes.Name.IMPLEMENTATION_VERSION);
          String[] tokens = fullVersion.split("\\."); //$NON-NLS-1$
    
          if(tokens != null && tokens.length > 0 && tokens[0] != null) {
            version = Integer.parseInt(tokens[0]);
          }
        } catch (Exception e) {
          // Possible reasons to end up here:
          // - Unable to read version from manifest.mf
          // - Version string is not in the proper X.x.xxx format
          version = -1;
        }
        return version;
      }
    
      /**
       * Package scoped access to the Driver's Minor Version
       * @return The Minor version number of the driver. -1 if it cannot be determined from the
       * manifest.mf file.
       */
      static int getMinorDriverVersion() {
        int version = -1;
        try {
          String fullVersion = HiveDriver.fetchManifestAttribute(
              Attributes.Name.IMPLEMENTATION_VERSION);
          String[] tokens = fullVersion.split("\\."); //$NON-NLS-1$
    
          if(tokens != null && tokens.length > 1 && tokens[1] != null) {
            version = Integer.parseInt(tokens[1]);
          }
        } catch (Exception e) {
          // Possible reasons to end up here:
          // - Unable to read version from manifest.mf
          // - Version string is not in the proper X.x.xxx format
          version = -1;
        }
        return version;
      }
    
      /**
       * Returns the major version of this driver.
       */
      @Override
      public int getMajorVersion() {
        return HiveDriver.getMajorDriverVersion();
      }
    
      /**
       * Returns the minor version of this driver.
       */
      @Override
      public int getMinorVersion() {
        return HiveDriver.getMinorDriverVersion();
      }
    
      public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        // JDK 1.7
        throw new SQLFeatureNotSupportedException("Method not supported");
      }
    
      @Override
      public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
        if (info == null) {
          info = new Properties();
        }
    
        if ((url != null) && url.startsWith(Utils.URL_PREFIX)) {
          info = parseURLforPropertyInfo(url, info);
        }
    
        DriverPropertyInfo hostProp = new DriverPropertyInfo(HOST_PROPERTY_KEY,
            info.getProperty(HOST_PROPERTY_KEY, ""));
        hostProp.required = false;
        hostProp.description = "Hostname of Hive Server2";
    
        DriverPropertyInfo portProp = new DriverPropertyInfo(PORT_PROPERTY_KEY,
            info.getProperty(PORT_PROPERTY_KEY, ""));
        portProp.required = false;
        portProp.description = "Port number of Hive Server2";
    
        DriverPropertyInfo dbProp = new DriverPropertyInfo(DBNAME_PROPERTY_KEY,
            info.getProperty(DBNAME_PROPERTY_KEY, "default"));
        dbProp.required = false;
        dbProp.description = "Database name";
    
        DriverPropertyInfo[] dpi = new DriverPropertyInfo[3];
    
        dpi[0] = hostProp;
        dpi[1] = portProp;
        dpi[2] = dbProp;
    
        return dpi;
      }
    
      /**
       * Returns whether the driver is JDBC compliant.
       */
      @Override
      public boolean jdbcCompliant() {
        return JDBC_COMPLIANT;
      }
    
      /**
       * Takes a url in the form of jdbc:hive://[hostname]:[port]/[db_name] and
       * parses it. Everything after jdbc:hive// is optional.
       *
       * The output from Utils.parseUrl() is massaged for the needs of getPropertyInfo
       * @param url
       * @param defaults
       * @return
       * @throws java.sql.SQLException
       */
      private Properties parseURLforPropertyInfo(String url, Properties defaults) throws SQLException {
        Properties urlProps = (defaults != null) ? new Properties(defaults)
            : new Properties();
    
        if (url == null || !url.startsWith(Utils.URL_PREFIX)) {
          throw new SQLException("Invalid connection url: " + url);
        }
    
        JdbcConnectionParams params = null;
        try {
          params = Utils.parseURL(url, defaults);
        } catch (ZooKeeperHiveClientException e) {
          throw new SQLException(e);
        }
        String host = params.getHost();
        if (host == null){
          host = "";
        }
        String port = Integer.toString(params.getPort());
        if(host.equals("")){
          port = "";
        }
        else if(port.equals("0") || port.equals("-1")){
          port = Utils.DEFAULT_PORT;
        }
        String db = params.getDbName();
        urlProps.put(HOST_PROPERTY_KEY, host);
        urlProps.put(PORT_PROPERTY_KEY, port);
        urlProps.put(DBNAME_PROPERTY_KEY, db);
    
        return urlProps;
      }
    
      /**
       * Lazy-load manifest attributes as needed.
       */
      private static Attributes manifestAttributes = null;
    
      /**
       * Loads the manifest attributes from the jar.
       *
       * @throws java.net.MalformedURLException
       * @throws IOException
       */
      private static synchronized void loadManifestAttributes() throws IOException {
        if (manifestAttributes != null) {
          return;
        }
        Class<?> clazz = HiveDriver.class;
        String classContainer = clazz.getProtectionDomain().getCodeSource()
            .getLocation().toString();
        URL manifestUrl = new URL("jar:" + classContainer
            + "!/META-INF/MANIFEST.MF");
        Manifest manifest = new Manifest(manifestUrl.openStream());
        manifestAttributes = manifest.getMainAttributes();
      }
    
      /**
       * Package scoped to allow manifest fetching from other HiveDriver classes
       * Helper to initialize attributes and return one.
       *
       * @param attributeName
       * @return
       * @throws SQLException
       */
      static String fetchManifestAttribute(Attributes.Name attributeName)
          throws SQLException {
        try {
          loadManifestAttributes();
        } catch (IOException e) {
          throw new SQLException("Couldn't load manifest attributes.", e);
        }
        return manifestAttributes.getValue(attributeName);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287

    匹配的就是:

    package org.apache.hive.jdbc;
    
    public class Utils {
      static final Logger LOG = LoggerFactory.getLogger(Utils.class.getName());
      /**
        * The required prefix for the connection URL.
        */
      public static final String URL_PREFIX = "jdbc:hive2://";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    类似"jdbc:hive2://"这种开头的url。

    当然也能看到根据配置解析参数的parseURLforPropertyInfo方法。

    静态调用的构造方法:

    public HiveDriver() {
      // TODO Auto-generated constructor stub
      SecurityManager security = System.getSecurityManager();
      if (security != null) {
        security.checkWrite("foobah");
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    貌似是做安全检查的。

    查看连接步骤

    在HiveDriver的connect方法可以找到:

    /*
     * As per JDBC 3.0 Spec (section 9.2)
     * "If the Driver implementation understands the URL, it will return a Connection object;
     * otherwise it returns null"
     */
    @Override
    public Connection connect(String url, Properties info) throws SQLException {
      return acceptsURL(url) ? new HiveConnection(url, info) : null;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    跳入:

    package org.apache.hive.jdbc;
    public class HiveConnection implements java.sql.Connection {
      public HiveConnection(String uri, Properties info) throws SQLException {
        setupLoginTimeout();
        try {
          connParams = Utils.parseURL(uri, info);
        } catch (ZooKeeperHiveClientException e) {
          throw new SQLException(e);
        }
        jdbcUriString = connParams.getJdbcUriString();
        // JDBC URL: jdbc:hive2://:/dbName;sess_var_list?hive_conf_list#hive_var_list
        // each list: =;= and so on
        // sess_var_list -> sessConfMap
        // hive_conf_list -> hiveConfMap
        // hive_var_list -> hiveVarMap
        host = Utils.getCanonicalHostName(connParams.getHost());
        port = connParams.getPort();
        sessConfMap = connParams.getSessionVars();
        isEmbeddedMode = connParams.isEmbeddedMode();
    
        if (sessConfMap.containsKey(JdbcConnectionParams.FETCH_SIZE)) {
          fetchSize = Integer.parseInt(sessConfMap.get(JdbcConnectionParams.FETCH_SIZE));
        }
        if (sessConfMap.containsKey(JdbcConnectionParams.INIT_FILE)) {
          initFile = sessConfMap.get(JdbcConnectionParams.INIT_FILE);
        }
        wmPool = sessConfMap.get(JdbcConnectionParams.WM_POOL);
        for (String application : JdbcConnectionParams.APPLICATION) {
          wmApp = sessConfMap.get(application);
          if (wmApp != null) break;
        }
    
        // add supported protocols
        supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
        supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2);
        supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V3);
        supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V4);
        supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V5);
        supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6);
        supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);
        supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8);
        supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9);
        supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10);
    
        if (isEmbeddedMode) {
          EmbeddedThriftBinaryCLIService embeddedClient = new EmbeddedThriftBinaryCLIService();
          embeddedClient.init(null, connParams.getHiveConfs());
          client = embeddedClient;
          connParams.getHiveConfs().clear();
          // open client session
          openSession();
          executeInitSql();
        } else {
          int maxRetries = 1;
          try {
            String strRetries = sessConfMap.get(JdbcConnectionParams.RETRIES);
            if (StringUtils.isNotBlank(strRetries)) {
              maxRetries = Integer.parseInt(strRetries);
            }
          } catch(NumberFormatException e) { // Ignore the exception
          }
    
          for (int numRetries = 0;;) {
            try {
              // open the client transport
              openTransport();
              // set up the client
              client = new TCLIService.Client(new TBinaryProtocol(transport));
              // open client session
              openSession();
              executeInitSql();
    
              break;
            } catch (Exception e) {
              LOG.warn("Failed to connect to " + connParams.getHost() + ":" + connParams.getPort());
              String errMsg = null;
              String warnMsg = "Could not open client transport with JDBC Uri: " + jdbcUriString + ": ";
              if (ZooKeeperHiveClientHelper.isZkDynamicDiscoveryMode(sessConfMap)) {
                errMsg = "Could not open client transport for any of the Server URI's in ZooKeeper: ";
                // Try next available server in zookeeper, or retry all the servers again if retry is enabled
                while(!Utils.updateConnParamsFromZooKeeper(connParams) && ++numRetries < maxRetries) {
                  connParams.getRejectedHostZnodePaths().clear();
                }
                // Update with new values
                jdbcUriString = connParams.getJdbcUriString();
                host = Utils.getCanonicalHostName(connParams.getHost());
                port = connParams.getPort();
              } else {
                errMsg = warnMsg;
                ++numRetries;
              }
    
              if (numRetries >= maxRetries) {
                throw new SQLException(errMsg + e.getMessage(), " 08S01", e);
              } else {
                LOG.warn(warnMsg + e.getMessage() + " Retrying " + numRetries + " of " + maxRetries);
              }
            }
          }
        }
    
        // Wrap the client with a thread-safe proxy to serialize the RPC calls
        client = newSynchronizedClient(client);
      }    
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105

    显然这货就是数据连接实际调用的方法。

    可以看到Hive的JDBC url正确的格式:

    jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list
    
    • 1

    并且注释里写了要每个list都遵守k=v且用“;”隔开的格式。

    当然不会使用默认Derby的嵌入式模式,继续查看正常模式的执行方法。在设置完成拉取数据的批量、重试次数等信息后,就会循环执行【超出最大重试次数就抛异常退出】:

    // open the client transport
    openTransport();
    // set up the client
    client = new TCLIService.Client(new TBinaryProtocol(transport));
    // open client session
    openSession();
    executeInitSql();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这里是4步走战略。

    开启网络传输openTransport

    跳入这个方法:

    private void openTransport() throws Exception {
      assumeSubject =
          JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap
              .get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE));
      transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
      if (!transport.isOpen()) {
        transport.open();
      }
      logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这个三元运算符是不是很熟悉?根据是否为Http传输模式,创建Http传输和加密的二进制传输。加密二进制传输当然不止有Kerberos方式:

    /**
     * Create transport per the connection options
     * Supported transport options are:
     *   - SASL based transports over
     *      + Kerberos
     *      + Delegation token
     *      + SSL
     *      + non-SSL
     *   - Raw (non-SASL) socket
     *
     *   Kerberos and Delegation token supports SASL QOP configurations
     * @throws SQLException, TTransportException
     */
    private TTransport createBinaryTransport() throws SQLException, TTransportException {
      try {
        TTransport socketTransport = createUnderlyingTransport();
        // handle secure connection if specified
        if (!JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))) {
          // If Kerberos
          Map<String, String> saslProps = new HashMap<String, String>();
          SaslQOP saslQOP = SaslQOP.AUTH;
          if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_QOP)) {
            try {
              saslQOP = SaslQOP.fromString(sessConfMap.get(JdbcConnectionParams.AUTH_QOP));
            } catch (IllegalArgumentException e) {
              throw new SQLException("Invalid " + JdbcConnectionParams.AUTH_QOP +
                  " parameter. " + e.getMessage(), "42000", e);
            }
            saslProps.put(Sasl.QOP, saslQOP.toString());
          } else {
            // If the client did not specify qop then just negotiate the one supported by server
            saslProps.put(Sasl.QOP, "auth-conf,auth-int,auth");
          }
          saslProps.put(Sasl.SERVER_AUTH, "true");
          if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL)) {
            transport = KerberosSaslHelper.getKerberosTransport(
                sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host,
                socketTransport, saslProps, assumeSubject);
          } else {
            // If there's a delegation token available then use token based connection
            String tokenStr = getClientDelegationToken(sessConfMap);
            if (tokenStr != null) {
              transport = KerberosSaslHelper.getTokenTransport(tokenStr,
                  host, socketTransport, saslProps);
            } else {
              // we are using PLAIN Sasl connection with user/password
              String userName = getUserName();
              String passwd = getPassword();
              // Overlay the SASL transport on top of the base socket transport (SSL or non-SSL)
              transport = PlainSaslHelper.getPlainTransport(userName, passwd, socketTransport);
            }
          }
        } else {
          // Raw socket connection (non-sasl)
          transport = socketTransport;
        }
      } catch (SaslException e) {
        throw new SQLException("Could not create secure connection to "
            + jdbcUriString + ": " + e.getMessage(), " 08S01", e);
      }
      return transport;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    但是当JDBC的url连接串有principal的时候:

    if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL)) {
      transport = KerberosSaslHelper.getKerberosTransport(
          sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host,
          socketTransport, saslProps, assumeSubject);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    当然毫无疑问要调用Kerberos相关的方法:

    package org.apache.hive.service.auth;
    
    import java.io.IOException;
    import java.util.Map;
    
    import javax.security.sasl.SaslException;
    
    import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
    import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server;
    import org.apache.hive.service.cli.thrift.ThriftCLIService;
    import org.apache.hive.service.rpc.thrift.TCLIService;
    import org.apache.hive.service.rpc.thrift.TCLIService.Iface;
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.TProcessorFactory;
    import org.apache.thrift.transport.TSaslClientTransport;
    import org.apache.thrift.transport.TTransport;
    
    public final class KerberosSaslHelper {
      public static TTransport getKerberosTransport(String principal, String host,
        TTransport underlyingTransport, Map<String, String> saslProps, boolean assumeSubject)
        throws SaslException {
        try {
          String[] names = principal.split("[/@]");
          if (names.length != 3) {
            throw new IllegalArgumentException("Kerberos principal should have 3 parts: " + principal);
          }
    
          if (assumeSubject) {
            return createSubjectAssumedTransport(principal, underlyingTransport, saslProps);
          } else {
            HadoopThriftAuthBridge.Client authBridge =
              HadoopThriftAuthBridge.getBridge().createClientWithConf("kerberos");
            return authBridge.createClientTransport(principal, host, "KERBEROS", null,
                                                    underlyingTransport, saslProps);
          }
        } catch (IOException e) {
          throw new SaslException("Failed to open client transport", e);
        }
      }
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    这玩意儿写死了要根据“@”和“/”来切分字符串,如果切出来不是3部分就要报错。

    剩下的部分参见之前的一篇稿子:https://lizhiyong.blog.csdn.net/article/details/124184528

    这篇稿子有分析过Hive2和Hive3生成Kerberos传输对象时的区别,碍于篇幅,此处不再展开。

    设置client

    将上一步生成的传输对象放进来,执行构造方法返回一个新的类,不是重点。

    开启会话openSession

    private void openSession() throws SQLException {
      TOpenSessionReq openReq = new TOpenSessionReq();
    
      Map<String, String> openConf = new HashMap<String, String>();
      // for remote JDBC client, try to set the conf var using 'set foo=bar'
      for (Entry<String, String> hiveConf : connParams.getHiveConfs().entrySet()) {
        openConf.put("set:hiveconf:" + hiveConf.getKey(), hiveConf.getValue());
      }
      // For remote JDBC client, try to set the hive var using 'set hivevar:key=value'
      for (Entry<String, String> hiveVar : connParams.getHiveVars().entrySet()) {
        openConf.put("set:hivevar:" + hiveVar.getKey(), hiveVar.getValue());
      }
      // switch the database
      openConf.put("use:database", connParams.getDbName());
      // set the fetchSize
      openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size",
        Integer.toString(fetchSize));
      if (wmPool != null) {
        openConf.put("set:hivevar:wmpool", wmPool);
      }
      if (wmApp != null) {
        openConf.put("set:hivevar:wmapp", wmApp);
      }
    
      // set the session configuration
      Map<String, String> sessVars = connParams.getSessionVars();
      if (sessVars.containsKey(HiveAuthConstants.HS2_PROXY_USER)) {
        openConf.put(HiveAuthConstants.HS2_PROXY_USER,
            sessVars.get(HiveAuthConstants.HS2_PROXY_USER));
      }
      openReq.setConfiguration(openConf);
    
      // Store the user name in the open request in case no non-sasl authentication
      if (JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))) {
        openReq.setUsername(sessConfMap.get(JdbcConnectionParams.AUTH_USER));
        openReq.setPassword(sessConfMap.get(JdbcConnectionParams.AUTH_PASSWD));
      }
    
      try {
        TOpenSessionResp openResp = client.OpenSession(openReq);
    
        // validate connection
        Utils.verifySuccess(openResp.getStatus());
        if (!supportedProtocols.contains(openResp.getServerProtocolVersion())) {
          throw new TException("Unsupported Hive2 protocol");
        }
        protocol = openResp.getServerProtocolVersion();
        sessHandle = openResp.getSessionHandle();
    
        // Update fetchSize if modified by server
        String serverFetchSize =
          openResp.getConfiguration().get("hive.server2.thrift.resultset.default.fetch.size");
        if (serverFetchSize != null) {
          fetchSize = Integer.parseInt(serverFetchSize);
        }
      } catch (TException e) {
        LOG.error("Error opening session", e);
        throw new SQLException("Could not establish connection to "
            + jdbcUriString + ": " + e.getMessage(), " 08S01", e);
      }
      isClosed = false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    这个方法是根据代理、认证、配置等,做了一些初始化配置项和对象的操作。

    执行初始化executeInitSql

    private void executeInitSql() throws SQLException {
      if (initFile != null) {
        try {
          List<String> sqlList = parseInitFile(initFile);
          Statement st = createStatement();
          for(String sql : sqlList) {
            boolean hasResult = st.execute(sql);
            if (hasResult) {
              ResultSet rs = st.getResultSet();
              while (rs.next()) {
                System.out.println(rs.getString(1));
              }
            }
          }
        } catch(Exception e) {
          LOG.error("Failed to execute initial SQL");
          throw new SQLException(e.getMessage());
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    只是根据获取到的初始化文件预先执行一遍解析出来的SQL。不是重点。

    Hive数据源HiveDataSource

    package org.apache.hive.jdbc;
    
    import java.io.PrintWriter;
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.sql.SQLFeatureNotSupportedException;
    import java.util.logging.Logger;
    
    import javax.sql.DataSource;
    
    /**
     * HiveDataSource.
     *
     */
    public class HiveDataSource implements DataSource {
    
      /**
       *
       */
      public HiveDataSource() {
        // TODO Auto-generated constructor stub
      }
    
      /*
       * (non-Javadoc)
       *
       * @see javax.sql.DataSource#getConnection()
       */
    
      @Override
      public Connection getConnection() throws SQLException {
        return getConnection("", "");
      }
    
      /*
       * (non-Javadoc)
       *
       * @see javax.sql.DataSource#getConnection(java.lang.String, java.lang.String)
       */
    
      @Override
      public Connection getConnection(String username, String password)
          throws SQLException {
        try {
          return new HiveConnection("", null);
        } catch (Exception ex) {
          throw new SQLException("Error in getting HiveConnection",ex);
        }
      }
    
      /*
       * (non-Javadoc)
       *
       * @see javax.sql.CommonDataSource#getLogWriter()
       */
    
      @Override
      public PrintWriter getLogWriter() throws SQLException {
        // TODO Auto-generated method stub
        throw new SQLFeatureNotSupportedException("Method not supported");
      }
    
      /*
       * (non-Javadoc)
       *
       * @see javax.sql.CommonDataSource#getLoginTimeout()
       */
    
      @Override
      public int getLoginTimeout() throws SQLException {
        // TODO Auto-generated method stub
        throw new SQLFeatureNotSupportedException("Method not supported");
      }
    
      public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        // JDK 1.7
        throw new SQLFeatureNotSupportedException("Method not supported");
      }
    
      /*
       * (non-Javadoc)
       *
       * @see javax.sql.CommonDataSource#setLogWriter(java.io.PrintWriter)
       */
    
      @Override
      public void setLogWriter(PrintWriter arg0) throws SQLException {
        // TODO Auto-generated method stub
        throw new SQLFeatureNotSupportedException("Method not supported");
      }
    
      /*
       * (non-Javadoc)
       *
       * @see javax.sql.CommonDataSource#setLoginTimeout(int)
       */
    
      @Override
      public void setLoginTimeout(int arg0) throws SQLException {
        // TODO Auto-generated method stub
        throw new SQLFeatureNotSupportedException("Method not supported");
      }
    
      /*
       * (non-Javadoc)
       *
       * @see java.sql.Wrapper#isWrapperFor(java.lang.Class)
       */
    
      @Override
      public boolean isWrapperFor(Class<?> arg0) throws SQLException {
        // TODO Auto-generated method stub
        throw new SQLFeatureNotSupportedException("Method not supported");
      }
    
      /*
       * (non-Javadoc)
       *
       * @see java.sql.Wrapper#unwrap(java.lang.Class)
       */
    
      @Override
      public <T> T unwrap(Class<T> arg0) throws SQLException {
        // TODO Auto-generated method stub
        throw new SQLFeatureNotSupportedException("Method not supported");
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128

    看懂了JDBC的connect方法,也就能明白Spark、Flink、Hudi、Spring等组件是如何获取到Hive连接的。通过这个数据源HiveDataSource构建的连接,其实底层还是JDBC,与手写无异。

    JDBC配置类JdbcConnectionParams

    根据源码:

    package org.apache.hive.jdbc;
    
    public class Utils {
      public static class JdbcConnectionParams {
        // Note on client side parameter naming convention:
        // Prefer using a shorter camelCase param name instead of using the same name as the
        // corresponding
        // HiveServer2 config.
        // For a jdbc url: jdbc:hive2://:/dbName;sess_var_list?hive_conf_list#hive_var_list,
        // client side params are specified in sess_var_list
    
        // Client param names:
    
        // Retry setting
        static final String RETRIES = "retries";
    
        public static final String AUTH_TYPE = "auth";
        // We're deprecating this variable's name.
        public static final String AUTH_QOP_DEPRECATED = "sasl.qop";
        public static final String AUTH_QOP = "saslQop";
        public static final String AUTH_SIMPLE = "noSasl";
        public static final String AUTH_TOKEN = "delegationToken";
        public static final String AUTH_USER = "user";
        public static final String AUTH_PRINCIPAL = "principal";
        public static final String AUTH_PASSWD = "password";
        public static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
        public static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
        public static final String ANONYMOUS_USER = "anonymous";
        public static final String ANONYMOUS_PASSWD = "anonymous";
        public static final String USE_SSL = "ssl";
        public static final String SSL_TRUST_STORE = "sslTrustStore";
        public static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
        // We're deprecating the name and placement of this in the parsed map (from hive conf vars to
        // hive session vars).
        static final String TRANSPORT_MODE_DEPRECATED = "hive.server2.transport.mode";
        public static final String TRANSPORT_MODE = "transportMode";
        // We're deprecating the name and placement of this in the parsed map (from hive conf vars to
        // hive session vars).
        static final String HTTP_PATH_DEPRECATED = "hive.server2.thrift.http.path";
        public static final String HTTP_PATH = "httpPath";
        public static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode";
        public static final String PROPERTY_DRIVER        = "driver";
        public static final String PROPERTY_URL           = "url";
        // Don't use dynamic service discovery
        static final String SERVICE_DISCOVERY_MODE_NONE = "none";
        // Use ZooKeeper for indirection while using dynamic service discovery
        public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
        public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA = "zooKeeperHA";
        static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
        // Default namespace value on ZooKeeper.
        // This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri.
        static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
        static final String ZOOKEEPER_ACTIVE_PASSIVE_HA_DEFAULT_NAMESPACE = "hs2ActivePassiveHA";
        static final String COOKIE_AUTH = "cookieAuth";
        static final String COOKIE_AUTH_FALSE = "false";
        static final String COOKIE_NAME = "cookieName";
        // The default value of the cookie name when CookieAuth=true
        static final String DEFAULT_COOKIE_NAMES_HS2 = "hive.server2.auth";
        // The http header prefix for additional headers which have to be appended to the request
        static final String HTTP_HEADER_PREFIX = "http.header.";
        // Set the fetchSize
        static final String FETCH_SIZE = "fetchSize";
        static final String INIT_FILE = "initFile";
        static final String WM_POOL = "wmPool";
        // Cookie prefix
        static final String HTTP_COOKIE_PREFIX = "http.cookie.";
    
        // We support ways to specify application name modeled after some existing DBs, since
        // there's no standard approach.
        // MSSQL: applicationName https://docs.microsoft.com/en-us/sql/connect/jdbc/building-the-connection-url
        // Postgres 9~: ApplicationName https://jdbc.postgresql.org/documentation/91/connect.html
        // Note: various ODBC names used include "Application Name", "APP", etc. Add those?
        static final String[] APPLICATION = new String[] { "applicationName", "ApplicationName" };
    
        // --------------- Begin 2 way ssl options -------------------------
        // Use two way ssl. This param will take effect only when ssl=true
        static final String USE_TWO_WAY_SSL = "twoWay";
        static final String TRUE = "true";
        static final String SSL_KEY_STORE = "sslKeyStore";
        static final String SSL_KEY_STORE_PASSWORD = "keyStorePassword";
        static final String SSL_KEY_STORE_TYPE = "JKS";
        static final String SUNX509_ALGORITHM_STRING = "SunX509";
        static final String SUNJSSE_ALGORITHM_STRING = "SunJSSE";
       // --------------- End 2 way ssl options ----------------------------
    
        // Non-configurable params:
        // Currently supports JKS keystore format
        static final String SSL_TRUST_STORE_TYPE = "JKS";
    
        private static final String HIVE_VAR_PREFIX = "hivevar:";
        private static final String HIVE_CONF_PREFIX = "hiveconf:";
        private String host = null;
        private int port = 0;
        private String jdbcUriString;
        private String dbName = DEFAULT_DATABASE;
        private Map<String,String> hiveConfs = new LinkedHashMap<String,String>();
        private Map<String,String> hiveVars = new LinkedHashMap<String,String>();
        private Map<String,String> sessionVars = new LinkedHashMap<String,String>();
        private boolean isEmbeddedMode = false;
        private String suppliedURLAuthority;
        private String zooKeeperEnsemble = null;
        private String currentHostZnodePath;
        private final List<String> rejectedHostZnodePaths = new ArrayList<String>();
    
        public JdbcConnectionParams() {
        }
    
        public JdbcConnectionParams(JdbcConnectionParams params) {
          this.host = params.host;
          this.port = params.port;
          this.jdbcUriString = params.jdbcUriString;
          this.dbName = params.dbName;
          this.hiveConfs.putAll(params.hiveConfs);
          this.hiveVars.putAll(params.hiveVars);
          this.sessionVars.putAll(params.sessionVars);
          this.isEmbeddedMode = params.isEmbeddedMode;
          this.suppliedURLAuthority = params.suppliedURLAuthority;
          this.zooKeeperEnsemble = params.zooKeeperEnsemble;
          this.currentHostZnodePath = params.currentHostZnodePath;
          this.rejectedHostZnodePaths.addAll(rejectedHostZnodePaths);
        }
    
        public String getHost() {
          return host;
        }
    
        public int getPort() {
          return port;
        }
    
        public String getJdbcUriString() {
          return jdbcUriString;
        }
    
        public String getDbName() {
          return dbName;
        }
    
        public Map<String, String> getHiveConfs() {
          return hiveConfs;
        }
    
        public Map<String, String> getHiveVars() {
          return hiveVars;
        }
    
        public boolean isEmbeddedMode() {
          return isEmbeddedMode;
        }
    
        public Map<String, String> getSessionVars() {
          return sessionVars;
        }
    
        public String getSuppliedURLAuthority() {
          return suppliedURLAuthority;
        }
    
        public String getZooKeeperEnsemble() {
          return zooKeeperEnsemble;
        }
    
        public List<String> getRejectedHostZnodePaths() {
          return rejectedHostZnodePaths;
        }
    
        public String getCurrentHostZnodePath() {
          return currentHostZnodePath;
        }
    
        public void setHost(String host) {
          this.host = host;
        }
    
        public void setPort(int port) {
          this.port = port;
        }
    
        public void setJdbcUriString(String jdbcUriString) {
          this.jdbcUriString = jdbcUriString;
        }
    
        public void setDbName(String dbName) {
          this.dbName = dbName;
        }
    
        public void setHiveConfs(Map<String, String> hiveConfs) {
          this.hiveConfs = hiveConfs;
        }
    
        public void setHiveVars(Map<String, String> hiveVars) {
          this.hiveVars = hiveVars;
        }
    
        public void setEmbeddedMode(boolean embeddedMode) {
          this.isEmbeddedMode = embeddedMode;
        }
    
        public void setSessionVars(Map<String, String> sessionVars) {
          this.sessionVars = sessionVars;
        }
    
        public void setSuppliedURLAuthority(String suppliedURLAuthority) {
          this.suppliedURLAuthority = suppliedURLAuthority;
        }
    
        public void setZooKeeperEnsemble(String zooKeeperEnsemble) {
          this.zooKeeperEnsemble = zooKeeperEnsemble;
        }
    
        public void setCurrentHostZnodePath(String currentHostZnodePath) {
          this.currentHostZnodePath = currentHostZnodePath;
        }
      }    
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215

    这里定义了非常多的配置项。

    根据注释sess_var_list是client端的配置。此外,还有hive_conf_list和hive_var_list。此处看到也就是普通的Map类型。

    可调参数

    事实上,根据官网手册,beeline执行:

    set -v;
    
    • 1

    即可获取到所有Hadoop和Hive相关的配置项。用CDP7的Hive On Tez的小朋友们一定要注意到坑,Tez的配置项可不一定齐全。。。如果是hive.xx.tez.xx这种一定有,因为它是Hive的Tez配置,但是tez.xx这种一定没有,因为这种是Tez自己的配置项。可以在beeline中做查询和设置:

    0: jdbc:hive2://192.168.88.101:10000/default> set tez.am.resource.memory.mb;
    +-----------------------------------------+
    |                   set                   |
    +-----------------------------------------+
    | tez.am.resource.memory.mb is undefined  |
    +-----------------------------------------+
    1 row selected (0.098 seconds)
    0: jdbc:hive2://192.168.88.101:10000/default>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    但是set -v拿到的配置项里一定没有这种。读者可以自己去试试。

    所有Hadoop及Hive可调参数

    鹅,set -v的结果太长了,具体见附录:https://lizhiyong.blog.csdn.net/article/details/126634922

    总结

    通过查看官方文档及扒源码,很容易理解Beeline的执行流程及JDBC连接流程。平时固定的用法其实都是源码中写死的,一定要养成多看源码的好习惯,才能凡事不求人。

    Beeline的执行流程总结概括一下:

    Main方法初始化Beeline对象→吊起唯一的方法→读取配置的load方法→
    启动历史的setupHistory方法→平滑退出的addBeelineShutdownHook方法→
    初始化命令行读取器的initializeConsoleReader方法→初始化入参的initArgs方法→
    调度的dispatch方法→执行脚本文件的executeFile方法【不一定执行】→
    最终执行execute方法【如果有-f传入脚本文件,内部也会吊起调度的dispatch方法】
    
    • 1
    • 2
    • 3
    • 4
    • 5

    而运行时最主要的是调度的dispatch方法。总结概括一下:

    根据入参判断是否有!开头
    -->有!开头就调用有前缀方法execCommandWithPrefix方法→根据!之后的关键字匹配→反射执行同名方法
    -->无!开头就调用无前缀方法commands.sql→按照JDBC获取连接、获取statement、执行sql返回resultSet、展示结果及释放资源的标准套路执行sql
    
    • 1
    • 2
    • 3

    JDBC连接Hive也总结概括一下:

    HiveDriver捕获url句柄、反射加载驱动类→
    HiveConnection解析url参数→最大retry次数内遍历获取连接对象【非嵌入式模式】
    
    • 1
    • 2

    遍历获取连接对象的连接步骤也总结概括一下:

    调用openTransport方法获取传输对象【http、二进制;二进制方式还会有Kerberos等多种方式】→
    设置client对象→开启会话openSession方法【此时会根据代理、认证、配置等设置批量大小等很多参数】→
    执行初始化executeInitSql【如果有-i传入的文件】
    
    • 1
    • 2
    • 3

    以及Spark、Flink、Hudi、Spring等组件常用的HiveDataSource底层其实也是通过JDBC连接到Hive。并且可以在beeline中执行set -v来查看所有可以设置的Hadoop及Hive的参数【包括Hive所需的tez参数,但是不包括计算引擎tez自己的参数】。

    Hive虽然古老,还是有很多可学习借鉴的地方。

  • 相关阅读:
    Python学习基础笔记十六——函数嵌套
    便捷解压Keka for Mac(压缩解压工具) v1.3.3中文版
    Libuv源码解析 - uv_loop整个初始化模块
    linux平台下一个好用的并行压缩工具(cpu核数越多越快,比tar -j可以快很多倍)
    双素数 马蹄集
    解决org.quartz.SchedulerException: Job threw an unhandled exception.
    vue动态修改url上的参数
    MATLAB:使用JADE解决带约束的单目标优化问题
    Hive增量查询Hudi表
    excel功能区(ribbonx)编程笔记 4-combobox和dropdown控件
  • 原文地址:https://blog.csdn.net/qq_41990268/article/details/126634843