Hive3.1.2的Beeline执行过程
由于阿里云DataPhin中台不能识别非DataPhin创建的表,不得已,笔者使用sql Client的beeline方式,实现了导入普通Hive表数据到DataPhin的Hive表:
beline -u "jdbc:hive2://Hive的Host:10000/default;principal=hive/一串HOST@realm域" -e "
insert overwrite table db1.tb1
select
col1
from
db2.tb2
;
"
当然分区表也是支持的。由于经常报错,笔者尝试扒源码,尝试根据beeline的执行过程【beeline执行流程】,寻找优化方向,顺便试试能不能找到可调的参数。
Beeline的使用方法可以参照官网的Confluence:https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients
Hive参数配置官网的Confluence也十分详细:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-RestrictedListandWhitelist
CDP7的Hive on Tez参数配置官网的Confluence也十分详细:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-Tez
从官网就可以查到哪些情况调哪些参数,不必像肤浅的SQL Boy们那样到处求人。
使用Apache的Hive3.1.2。IDE选用idea,Maven有无其实影响不大,毕竟不会去详细看Calcite解析AST。
package org.apache.hive.beeline;
/**
* A console SQL shell with command completion.
*
* TODO:
*
* - User-friendly connection prompts
* - Page results
* - Handle binary data (blob fields)
* - Implement command aliases
* - Stored procedure execution
* - Binding parameters to prepared statements
* - Scripting language
* - XA transactions
*
*
*/
@SuppressWarnings("static-access")
public class BeeLine implements Closeable {
/**
* Starts the program.
*/
public static void main(String[] args) throws IOException {
mainWithInputRedirection(args, null);
}
}
beeline的Model下就有Beeline类,直接Main方法启动,简单粗暴。main方法内部也只有一个方法:
/**
* Starts the program with redirected input. For redirected output,
* setOutputStream() and setErrorStream can be used.
* Exits with 0 on success, 1 on invalid arguments, and 2 on any other error
*
* @param args
* same as main()
*
* @param inputStream
* redirected input, or null to use standard input
*/
public static void mainWithInputRedirection(String[] args, InputStream inputStream)
throws IOException {
BeeLine beeLine = new BeeLine();
try {
int status = beeLine.begin(args, inputStream);
if (!Boolean.getBoolean(BeeLineOpts.PROPERTY_NAME_EXIT)) {
System.exit(status);
}
} finally {
beeLine.close();
}
}
和平时看到的成功返回码0,失败返回码1一致。从main方法的null入参可知,这货使用的是注释标注的标准输入。
public BeeLine() {
this(true);
}
这个构造方法:
public BeeLine(boolean isBeeLine) {
this.isBeeLine = isBeeLine;
this.signalHandler = new SunSignalHandler(this);
this.shutdownHook = new Runnable() {
@Override
public void run() {
try {
if (history != null) {
history.setMaxSize(getOpts().getMaxHistoryRows());
history.flush();
}
} catch (IOException e) {
error(e);
} finally {
close();
}
}
};
}
在Beeline的类里有这个私有对象:
import jline.console.history.FileHistory;
private FileHistory history;
// Indicates if this instance of beeline is running in compatibility mode, or beeline mode
private boolean isBeeLine = true;
显然这个多线程任务是用来写历史记录或者Log日志之类的功能,不用过多关注。
跳入begin方法:
/**
* Start accepting input from stdin, and dispatch it
* to the appropriate {@link CommandHandler} until the
* global variable exit
is true.
*/
public int begin(String[] args, InputStream inputStream) throws IOException {
try {
// load the options first, so we can override on the command line
getOpts().load();
} catch (Exception e) {
// nothing
}
setupHistory();
//add shutdown hook to cleanup the beeline for smooth exit
addBeelineShutdownHook();
//this method also initializes the consoleReader which is
//needed by initArgs for certain execution paths
ConsoleReader reader = initializeConsoleReader(inputStream);
if (isBeeLine) {
int code = initArgs(args);
if (code != 0) {
return code;
}
} else {
int code = initArgsFromCliVars(args);
if (code != 0 || exit) {
return code;
}
defaultConnect(false);
}
if (getOpts().isHelpAsked()) {
return 0;
}
if (getOpts().getScriptFile() != null) {
return executeFile(getOpts().getScriptFile());
}
try {
info(getApplicationTitle());
} catch (Exception e) {
// ignore
}
return execute(reader, false);
}
这个begin方法才正式开始执行。可以看到有获取配置、读取输入流、初始化参数、连接、执行、执行脚本文件之类的方法。
跳入BeelineOpts.java可以看到:
public void load() throws IOException {
try (InputStream in = new FileInputStream(rcFile)) {
load(in);
}
}
再跳:
public void load(InputStream fin) throws IOException {
Properties p = new Properties();
p.load(fin);
loadProperties(p);
}
再跳:
public static final String PROPERTY_NAME_EXIT = PROPERTY_PREFIX + "system.exit";
public static final String PROPERTY_PREFIX = "beeline.";
public void loadProperties(Properties props) {
for (Object element : props.keySet()) {
String key = element.toString();
if (key.equals(PROPERTY_NAME_EXIT)) {
// fix for sf.net bug 879422
continue;
}
if (key.startsWith(PROPERTY_PREFIX)) {
set(key.substring(PROPERTY_PREFIX.length()),
props.getProperty(key));
}
}
}
这个方法其实就是判断如果key=“beeline.system.exit”就跳出本次循环,否则根据key去掉“beeline.”后的值作为新key,根据源key获取配置的值作为新value传入set方法:
public void set(String key, String value) {
set(key, value, false);
}
再跳:
public boolean set(String key, String value, boolean quiet) {
try {
beeLine.getReflector().invoke(this, "set" + key, new Object[] {value});
return true;
} catch (Exception e) {
if (!quiet) {
beeLine.error(beeLine.loc("error-setting", new Object[] {key, e}));
}
return false;
}
}
这里委托执行:
package org.apache.hive.beeline;
class Reflector {
public Object invoke(Object on, String method, Object[] args)
throws InvocationTargetException, IllegalAccessException,
ClassNotFoundException {
return invoke(on, method, Arrays.asList(args));
}
public Object invoke(Object on, String method, List args)
throws InvocationTargetException, IllegalAccessException,
ClassNotFoundException {
return invoke(on, on == null ? null : on.getClass(), method, args);
}
public Object invoke(Object on, Class defClass,
String method, List args)
throws InvocationTargetException, IllegalAccessException,
ClassNotFoundException {
Class c = defClass != null ? defClass : on.getClass();
List<Method> candidateMethods = new LinkedList<Method>();
Method[] m = c.getMethods();
for (int i = 0; i < m.length; i++) {
if (m[i].getName().equalsIgnoreCase(method)) {
candidateMethods.add(m[i]);
}
}
if (candidateMethods.size() == 0) {
throw new IllegalArgumentException(beeLine.loc("no-method",
new Object[] {method, c.getName()}));
}
for (Iterator<Method> i = candidateMethods.iterator(); i.hasNext();) {
Method meth = i.next();
Class[] ptypes = meth.getParameterTypes();
if (!(ptypes.length == args.size())) {
continue;
}
Object[] converted = convert(args, ptypes);
if (converted == null) {
continue;
}
if (!Modifier.isPublic(meth.getModifiers())) {
continue;
}
return meth.invoke(on, converted);
}
return null;
}
}
这里会反射获取到所有方法名称为“set某个key”的类和方法并添加到List。之后就会跳入Method.java:
@CallerSensitive
public Object invoke(Object obj, Object... args)
throws IllegalAccessException, IllegalArgumentException,
InvocationTargetException
{
if (!override) {
if (!Reflection.quickCheckMemberAccess(clazz, modifiers)) {
Class<?> caller = Reflection.getCallerClass();
checkAccess(caller, clazz, obj, modifiers);
}
}
MethodAccessor ma = methodAccessor; // read volatile
if (ma == null) {
ma = acquireMethodAccessor();
}
return ma.invoke(obj, args);
}
遍历吊起所有Beeline类的public的方法。
例如Beeline类本身会调用自己的部分方法:
Properties confProps = commandLine.getOptionProperties("hiveconf");
for (String propKey : confProps.stringPropertyNames()) {
setHiveConfVar(propKey, confProps.getProperty(propKey));
}
getOpts().setScriptFile(commandLine.getOptionValue("f"));
if (commandLine.getOptionValues("i") != null) {
getOpts().setInitFiles(commandLine.getOptionValues("i"));
}
dbName = commandLine.getOptionValue("database");
getOpts().setVerbose(Boolean.parseBoolean(commandLine.getOptionValue("verbose")));
getOpts().setSilent(Boolean.parseBoolean(commandLine.getOptionValue("silent")));
int code = 0;
if (cl.getOptionValues('e') != null) {
commands = Arrays.asList(cl.getOptionValues('e'));
opts.setAllowMultiLineCommand(false); //When using -e, command is always a single line
}
if (cl.hasOption("help")) {
usage();
getOpts().setHelpAsked(true);
return true;
}
Properties hiveConfs = cl.getOptionProperties("hiveconf");
for (String key : hiveConfs.stringPropertyNames()) {
setHiveConfVar(key, hiveConfs.getProperty(key));
}
driver = cl.getOptionValue("d");
auth = cl.getOptionValue("a");
user = cl.getOptionValue("n");
getOpts().setAuthType(auth);
if (cl.hasOption("w")) {
pass = obtainPasswordFromFile(cl.getOptionValue("w"));
} else {
if (beelineParser.isPasswordOptionSet) {
pass = cl.getOptionValue("p");
}
}
url = cl.getOptionValue("u");
if ((url == null) && cl.hasOption("reconnect")){
// If url was not specified with -u, but -r was present, use that.
url = getOpts().getLastConnectedUrl();
}
getOpts().setInitFiles(cl.getOptionValues("i"));
getOpts().setScriptFile(cl.getOptionValue("f"));
public void updateOptsForCli() {
getOpts().updateBeeLineOptsFromConf();
getOpts().setShowHeader(false);
getOpts().setEscapeCRLF(false);
getOpts().setOutputFormat("dsv");
getOpts().setDelimiterForDSV(' ');
getOpts().setNullEmptyString(true);
}
setupHistory();
查看get方法顺路看到了为神马-u是穿用户连接参数 ,-n是敲用户名,-p是敲密码。。。这些都是代码里直接写死的。不用狐疑。
private void setupHistory() throws IOException {
if (this.history != null) {
return;
}
这个setupHistory方法不是重点。
//add shutdown hook to cleanup the beeline for smooth exit
addBeelineShutdownHook();
private void addBeelineShutdownHook() throws IOException {
// add shutdown hook to flush the history to history file and it also close all open connections
ShutdownHookManager.addShutdownHook(getShutdownHook());
}
这个方法根据注释描述,应该是Beeline宕机时,把历史记录写入历史文件,然后关闭所有连接,保证Beeline平滑退出。也不是重点。
ConsoleReader reader = initializeConsoleReader(inputStream);
这一步调用的方法在Beeline.java可以找到:
public ConsoleReader initializeConsoleReader(InputStream inputStream) throws IOException {
if (inputStream != null) {
// ### NOTE: fix for sf.net bug 879425.
// Working around an issue in jline-2.1.2, see https://github.com/jline/jline/issues/10
// by appending a newline to the end of inputstream
InputStream inputStreamAppendedNewline = new SequenceInputStream(inputStream,
new ByteArrayInputStream((new String("\n")).getBytes()));
consoleReader = new ConsoleReader(inputStreamAppendedNewline, getErrorStream());
consoleReader.setCopyPasteDetection(true); // jline will detect if is regular character
} else {
consoleReader = new ConsoleReader(getInputStream(), getErrorStream());
}
//disable the expandEvents for the purpose of backward compatibility
consoleReader.setExpandEvents(false);
try {
// now set the output for the history
consoleReader.setHistory(this.history);
} catch (Exception e) {
handleException(e);
}
if (inputStream instanceof FileInputStream || inputStream instanceof FSDataInputStream) {
// from script.. no need to load history and no need of completer, either
return consoleReader;
}
consoleReader.addCompleter(new BeeLineCompleter(this));
return consoleReader;
}
当main主方法调用mainWithInputRedirection传参为null时,只是没有做输入流的重定向,最后不管true还是false都要初始化consoleReader。设置历史相关的参数后,判断输入流如果是文件输入流或者HDFS的文件输入流就不加载历史及完成符,否则加入完成符后返回,也不是重点。
// Indicates if this instance of beeline is running in compatibility mode, or beeline mode
private boolean isBeeLine = true;
根据这个对象的描述,如果是beeline正运行在兼容模式或者beeline模式就是true。最开始新建对象时构造方法就是入参true,所以此时一定是true,执行initArgs(args):
int initArgs(String[] args) {
List<String> commands = Collections.emptyList();
CommandLine cl;
BeelineParser beelineParser;
try {
beelineParser = new BeelineParser();
cl = beelineParser.parse(options, args);
} catch (ParseException e1) {
output(e1.getMessage());
usage();
return -1;
}
boolean connSuccessful = connectUsingArgs(beelineParser, cl);
// checks if default hs2 connection configuration file is present
// and uses it to connect if found
// no-op if the file is not present
if(!connSuccessful && !exit) {
connSuccessful = defaultBeelineConnect(cl);
}
int code = 0;
if (cl.getOptionValues('e') != null) {
commands = Arrays.asList(cl.getOptionValues('e'));
opts.setAllowMultiLineCommand(false); //When using -e, command is always a single line
}
if (!commands.isEmpty() && getOpts().getScriptFile() != null) {
error("The '-e' and '-f' options cannot be specified simultaneously");
return 1;
} else if(!commands.isEmpty() && !connSuccessful) {
error("Cannot run commands specified using -e. No current connection");
return 1;
}
if (!commands.isEmpty()) {
for (Iterator<String> i = commands.iterator(); i.hasNext();) {
String command = i.next().toString();
debug(loc("executing-command", command));
if (!dispatch(command)) {
code++;
}
}
exit = true; // execute and exit
}
return code;
}
这里会生成Beeline的解析器,之后利用解析器根据选项和入参解析出cl命令。之后执行connectUsingArgs就是根据参数去连接到Hive的Server端。如果连接失败就可能连接到默认的HiveServer2,这种情况很常见,当运维大哥把环境变量之类的玩意儿搞好,直接敲beeline就能根据配置文件连接HiveServer2。
user = cl.getOptionValue("n");
根据connectUsingArgs方法中类似这种的方法,跳入:
package org.apache.commons.cli;
public class CommandLine implements Serializable {
public String getOptionValue(String opt) {
String[] values = this.getOptionValues(opt);
return values == null ? null : values[0];
}
public String[] getOptionValues(String opt) {
List values = new ArrayList();
Iterator it = this.options.iterator();
while(true) {
Option option;
do {
if (!it.hasNext()) {
return values.isEmpty() ? null : (String[])((String[])values.toArray(new String[values.size()]));
}
option = (Option)it.next();
} while(!opt.equals(option.getOpt()) && !opt.equals(option.getLongOpt()));
values.addAll(option.getValuesList());
}
}
}
显然这个死循环就是入参有值则返回切分后的第一个,否则返回null。没有传入的参数默认返回就是null。
根据注释,传入-e是命令行总是单行,这里有个opts.setAllowMultiLineCommand(false)方法显然是设置了一个参数:
class BeeLineOpts implements Completer {
private boolean allowMultiLineCommand = true;
public boolean isAllowMultiLineCommand() {
return allowMultiLineCommand;
}
public void setAllowMultiLineCommand(boolean allowMultiLineCommand) {
this.allowMultiLineCommand = allowMultiLineCommand;
}
}
根据-e获取到的结果存在commands对象,这个对象的完整作用域就是方法initArgs:
package org.apache.hive.beeline;
import org.apache.commons.cli.CommandLine;
public class BeeLine implements Closeable {
int initArgs(String[] args) {
List<String> commands = Collections.emptyList();
CommandLine cl;
BeelineParser beelineParser;
try {
beelineParser = new BeelineParser();
cl = beelineParser.parse(options, args);
} catch (ParseException e1) {
output(e1.getMessage());
usage();
return -1;
}
boolean connSuccessful = connectUsingArgs(beelineParser, cl);
// checks if default hs2 connection configuration file is present
// and uses it to connect if found
// no-op if the file is not present
if(!connSuccessful && !exit) {
connSuccessful = defaultBeelineConnect(cl);
}
int code = 0;
if (cl.getOptionValues('e') != null) {
commands = Arrays.asList(cl.getOptionValues('e'));
opts.setAllowMultiLineCommand(false); //When using -e, command is always a single line
}
if (!commands.isEmpty() && getOpts().getScriptFile() != null) {
error("The '-e' and '-f' options cannot be specified simultaneously");
return 1;
} else if(!commands.isEmpty() && !connSuccessful) {
error("Cannot run commands specified using -e. No current connection");
return 1;
}
if (!commands.isEmpty()) {
for (Iterator<String> i = commands.iterator(); i.hasNext();) {
String command = i.next().toString();
debug(loc("executing-command", command));
if (!dispatch(command)) {
code++;
}
}
exit = true; // execute and exit
}
return code;
}
}
显然-e是初始化参数时用到。对应的参数作为List会被dispatch(command)遍历执行,从而实现-e传参执行多段SQL。显然这个-e和初始化参数关系非常密切。
根据Beeline.java的connectUsingArgs方法中
getOpts().setScriptFile(commandLine.getOptionValue("f"));
if (commandLine.getOptionValues("i") != null) {
getOpts().setInitFiles(commandLine.getOptionValues("i"));
}
跳入:
package org.apache.hive.beeline;
class BeeLineOpts implements Completer {
public void setInitFiles(String[] initFiles) {
this.initFiles = initFiles;
private String scriptFile = null;
public String[] getInitFiles() {
return initFiles;
}
public void setInitFiles(String[] initFiles) {
this.initFiles = initFiles;
}
public void setScriptFile(String scriptFile) {
this.scriptFile = scriptFile;
}
public String getScriptFile() {
return scriptFile;
}
}
显然-i是设置了初始化需要的文件,-f是设置了跑脚本的文件。之后会有位置用到。
根据Beeline.java的
url = cl.getOptionValue("u");
根据url找到在Beeline.java中完整的作用域:
/*
* Connects using the command line arguments. There are two
* possible ways to connect here 1. using the cmd line arguments like -u
* or using !properties
*/
private boolean connectUsingArgs(BeelineParser beelineParser, CommandLine cl) {
String driver = null, user = null, pass = "", url = null;
String auth = null;
if (cl.hasOption("help")) {
usage();
getOpts().setHelpAsked(true);
return true;
}
Properties hiveVars = cl.getOptionProperties("hivevar");
for (String key : hiveVars.stringPropertyNames()) {
getOpts().getHiveVariables().put(key, hiveVars.getProperty(key));
}
Properties hiveConfs = cl.getOptionProperties("hiveconf");
for (String key : hiveConfs.stringPropertyNames()) {
setHiveConfVar(key, hiveConfs.getProperty(key));
}
driver = cl.getOptionValue("d");
auth = cl.getOptionValue("a");
user = cl.getOptionValue("n");
getOpts().setAuthType(auth);
if (cl.hasOption("w")) {
pass = obtainPasswordFromFile(cl.getOptionValue("w"));
} else {
if (beelineParser.isPasswordOptionSet) {
pass = cl.getOptionValue("p");
}
}
url = cl.getOptionValue("u");
if ((url == null) && cl.hasOption("reconnect")){
// If url was not specified with -u, but -r was present, use that.
url = getOpts().getLastConnectedUrl();
}
getOpts().setInitFiles(cl.getOptionValues("i"));
getOpts().setScriptFile(cl.getOptionValue("f"));
if (url != null) {
// Specifying username/password/driver explicitly will override the values from the url;
// make sure we don't override the values present in the url with empty values.
if (user == null) {
user = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER);
}
if (pass == null) {
pass = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PASSWD);
}
if (driver == null) {
driver = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.PROPERTY_DRIVER);
}
String com;
String comForDebug;
if(pass != null) {
com = constructCmd(url, user, pass, driver, false);
comForDebug = constructCmd(url, user, pass, driver, true);
} else {
com = constructCmdUrl(url, user, driver, false);
comForDebug = constructCmdUrl(url, user, driver, true);
}
debug(comForDebug);
if (!dispatch(com)) {
exit = true;
return false;
}
return true;
}
// load property file
String propertyFile = cl.getOptionValue("property-file");
if (propertyFile != null) {
try {
this.consoleReader = new ConsoleReader();
} catch (IOException e) {
handleException(e);
}
if (!dispatch("!properties " + propertyFile)) {
exit = true;
return false;
}
}
return false;
}
当根据-u拿到的url不是null时,就会执行一系列骚操作,先后用工具类拿到用户名、密码、驱动。之后密码不为null就执行constructCmd方法继续执行并拿到字符串对象com:
private String constructCmd(String url, String user, String pass, String driver, boolean stripPasswd) {
return new StringBuilder()
.append("!connect ")
.append(url)
.append(" ")
.append(user == null || user.length() == 0 ? "''" : user)
.append(" ")
.append(stripPasswd ? PASSWD_MASK : (pass.length() == 0 ? "''" : pass))
.append(" ")
.append((driver == null ? "" : driver))
.toString();
}
然后执行dispatch(com)方法,根据-u转换出来的com字符串对象做一些骚操作,当然没有循环就只会做一次。从内容看,是补充了!connect前缀,不必多说,和初始化连接肯定有关系。
但是url不为null,用户名、密码、驱动为null的情况【没有使用-n、-p、-d时,用户名、密码、驱动为null】都是要调用parsePropertyFromUrl:
package org.apache.hive.jdbc;
public class Utils {
public static String parsePropertyFromUrl(final String url, final String key) {
String[] tokens = url.split(";");
for (String token : tokens) {
if (token.trim().startsWith(key.trim() + "=")) {
return token.trim().substring((key.trim() + "=").length());
}
}
return null;
}
}
根据key这个参数是user、password还是driver确定具体执行的套路。首先根据“;”切分字符串,然后判断切分后的字符串是否为“key值=”开头,是的话就去掉“key值=”部分,获取到value值,否则null。显然这个方法不止能拿到user、password、driver这3个配置,只要符合格式要求【;k1=v1;k2=v2;k3=v3这种】都是可以拿到。
之后如果获取到密码,也是执行constructCmd,皆大欢喜。
当根据-u拿到的url不是null,且密码为null,这种才是绝大多数情况,例如启用了Kerberos认证的情况。此时要执行constructCmdUrl:
/**
* This is an internal method used to create !connect command when -p option is used without
* providing the password on the command line. Connect command returned should be ; separated
* key-value pairs along with the url. We cannot use space separated !connect url user [password]
* [driver] here since both password and driver are optional and there would be no way to
* distinguish if the last string is password or driver
*
* @param url connection url passed using -u argument on the command line
* @param user username passed through command line
* @param driver driver passed through command line -d option
* @param stripPasswd when set to true generates a !connect command which strips the password for
* logging purposes
* @return !connect command
*/
private String constructCmdUrl(String url, String user, String driver,
boolean stripPasswd) {
StringBuilder command = new StringBuilder("!connect ");
command.append(url);
//if the url does not have a database name add the trailing '/'
if(isTrailingSlashNeeded(url)) {
command.append('/');
}
command.append(';');
// if the username is not already available in the URL add the one provided
if (Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER) == null) {
command.append(JdbcConnectionParams.AUTH_USER);
command.append('=');
command.append((user == null || user.length() == 0 ? "''" : user));
}
if (stripPasswd) {
// if password is available in url it needs to be striped
int startIndex = command.indexOf(JdbcConnectionParams.AUTH_PASSWD + "=")
+ JdbcConnectionParams.AUTH_PASSWD.length() + 2;
if(startIndex != -1) {
int endIndex = command.toString().indexOf(";", startIndex);
command.replace(startIndex, (endIndex == -1 ? command.length() : endIndex),
BeeLine.PASSWD_MASK);
}
}
// if the driver is not already available in the URL add the one provided
if (Utils.parsePropertyFromUrl(url, JdbcConnectionParams.PROPERTY_DRIVER) == null
&& driver != null) {
command.append(';');
command.append(JdbcConnectionParams.PROPERTY_DRIVER);
command.append("=");
command.append(driver);
}
return command.toString();
}
显然这个方法并没有干什么惊天地泣鬼神的大事,只是对!connect的连接串做了格式修正。
Hive的beeline中,-e和-u都使用了dispatch方法,显然这个方法相当重要。跳入Beeline.java的dispatch方法:
public static final String COMMAND_PREFIX = "!";
/**
* Dispatch the specified line to the appropriate {@link CommandHandler}.
*
* @param line
* the command-line to dispatch
* @return true if the command was "successful"
*/
boolean dispatch(String line) {
if (line == null) {
// exit
exit = true;
return true;
}
if (line.trim().length() == 0) {
return true;
}
if (isComment(line)) {
return true;
}
line = line.trim();
// save it to the current script, if any
if (scriptOutputFile != null) {
scriptOutputFile.addLine(line);
}
if (isHelpRequest(line)) {
line = "!help";
}
if (isBeeLine) {
if (line.startsWith(COMMAND_PREFIX)) {
// handle SQLLine command in beeline which starts with ! and does not end with ;
return execCommandWithPrefix(line);
} else {
return commands.sql(line, getOpts().getEntireLineAsCommand());
}
} else {
return commands.sql(line, getOpts().getEntireLineAsCommand());
}
}
到这步又要根据isBeeLine判断调用哪种方法。
显然根据之前的分析,isBeeLine正常情况=true,那么入参line以“!”开头则执行execCommandWithPrefix(line):
/**
* This method is used for executing commands beginning with !
* @param line
* @return
*/
public boolean execCommandWithPrefix(String line) {
Map<String, CommandHandler> cmdMap = new TreeMap<String, CommandHandler>();
line = line.substring(1);
for (int i = 0; i < commandHandlers.length; i++) {
String match = commandHandlers[i].matches(line);
if (match != null) {
cmdMap.put(match, commandHandlers[i]);
}
}
if (cmdMap.size() == 0) {
return error(loc("unknown-command", line));
}
if (cmdMap.size() > 1) {
// any exact match?
CommandHandler handler = cmdMap.get(line);
if (handler == null) {
return error(loc("multiple-matches", cmdMap.keySet().toString()));
}
return handler.execute(line);
}
return cmdMap.values().iterator().next().execute(line);
}
而这个遍历的句柄commandHandlers也是在Beeline.java中:
final CommandHandler[] commandHandlers = new CommandHandler[] {
new ReflectiveCommandHandler(this, new String[] {"quit", "done", "exit"},
null),
new ReflectiveCommandHandler(this, new String[] {"connect", "open"},
new Completer[] {new StringsCompleter(getConnectionURLExamples())}),
new ReflectiveCommandHandler(this, new String[] {"describe"},
new Completer[] {new TableNameCompletor(this)}),
new ReflectiveCommandHandler(this, new String[] {"indexes"},
new Completer[] {new TableNameCompletor(this)}),
new ReflectiveCommandHandler(this, new String[] {"primarykeys"},
new Completer[] {new TableNameCompletor(this)}),
new ReflectiveCommandHandler(this, new String[] {"exportedkeys"},
new Completer[] {new TableNameCompletor(this)}),
new ReflectiveCommandHandler(this, new String[] {"manual"},
null),
new ReflectiveCommandHandler(this, new String[] {"importedkeys"},
new Completer[] {new TableNameCompletor(this)}),
new ReflectiveCommandHandler(this, new String[] {"procedures"},
null),
new ReflectiveCommandHandler(this, new String[] {"tables"},
null),
new ReflectiveCommandHandler(this, new String[] {"typeinfo"},
null),
new ReflectiveCommandHandler(this, new String[] {"columns"},
new Completer[] {new TableNameCompletor(this)}),
new ReflectiveCommandHandler(this, new String[] {"reconnect"},
null),
new ReflectiveCommandHandler(this, new String[] {"dropall"},
new Completer[] {new TableNameCompletor(this)}),
new ReflectiveCommandHandler(this, new String[] {"history"},
null),
new ReflectiveCommandHandler(this, new String[] {"metadata"},
new Completer[] {
new StringsCompleter(getMetadataMethodNames())}),
new ReflectiveCommandHandler(this, new String[] {"nativesql"},
null),
new ReflectiveCommandHandler(this, new String[] {"dbinfo"},
null),
new ReflectiveCommandHandler(this, new String[] {"rehash"},
null),
new ReflectiveCommandHandler(this, new String[] {"verbose"},
null),
new ReflectiveCommandHandler(this, new String[] {"run"},
new Completer[] {new FileNameCompleter()}),
new ReflectiveCommandHandler(this, new String[] {"batch"},
null),
new ReflectiveCommandHandler(this, new String[] {"list"},
null),
new ReflectiveCommandHandler(this, new String[] {"all"},
null),
new ReflectiveCommandHandler(this, new String[] {"go", "#"},
null),
new ReflectiveCommandHandler(this, new String[] {"script"},
new Completer[] {new FileNameCompleter()}),
new ReflectiveCommandHandler(this, new String[] {"record"},
new Completer[] {new FileNameCompleter()}),
new ReflectiveCommandHandler(this, new String[] {"brief"},
null),
new ReflectiveCommandHandler(this, new String[] {"close"},
null),
new ReflectiveCommandHandler(this, new String[] {"closeall"},
null),
new ReflectiveCommandHandler(this, new String[] {"isolation"},
new Completer[] {new StringsCompleter(getIsolationLevels())}),
new ReflectiveCommandHandler(this, new String[] {"outputformat"},
new Completer[] {new StringsCompleter(
formats.keySet().toArray(new String[0]))}),
new ReflectiveCommandHandler(this, new String[] {"autocommit"},
null),
new ReflectiveCommandHandler(this, new String[] {"commit"},
null),
new ReflectiveCommandHandler(this, new String[] {"properties"},
new Completer[] {new FileNameCompleter()}),
new ReflectiveCommandHandler(this, new String[] {"rollback"},
null),
new ReflectiveCommandHandler(this, new String[] {"help", "?"},
null),
new ReflectiveCommandHandler(this, new String[] {"set"},
getOpts().optionCompleters()),
new ReflectiveCommandHandler(this, new String[] {"save"},
null),
new ReflectiveCommandHandler(this, new String[] {"scan"},
null),
new ReflectiveCommandHandler(this, new String[] {"sql"},
null),
new ReflectiveCommandHandler(this, new String[] {"sh"},
null),
new ReflectiveCommandHandler(this, new String[] {"call"},
null),
new ReflectiveCommandHandler(this, new String[] {"nullemptystring"},
new Completer[] {new BooleanCompleter()}),
new ReflectiveCommandHandler(this, new String[]{"addlocaldriverjar"},
null),
new ReflectiveCommandHandler(this, new String[]{"addlocaldrivername"},
null),
new ReflectiveCommandHandler(this, new String[]{"delimiter"},
null)
};
显然!connect会被句柄捕获。之后根据被捕获的次数【Map类型的cmdMap实例对象会起到去重作用】执行execute(line)方法,确保每个命令都被执行。
这里的CommandHandler是个接口:
package org.apache.hive.beeline;
import jline.console.completer.Completer;
/**
* A generic command to be executed. Execution of the command
* should be dispatched to the {@link #execute(java.lang.String)} method after determining that
* the command is appropriate with
* the {@link #matches(java.lang.String)} method.
*
*/
interface CommandHandler {
/**
* @return the name of the command
*/
public String getName();
/**
* @return all the possible names of this command.
*/
public String[] getNames();
/**
* @return the short help description for this command.
*/
public String getHelpText();
/**
* Check to see if the specified string can be dispatched to this
* command.
*
* @param line
* the command line to check.
* @return the command string that matches, or null if it no match
*/
public String matches(String line);
/**
* Execute the specified command.
*
* @param line
* the full command line to execute.
*/
public boolean execute(String line);
/**
* Returns the completors that can handle parameters.
*/
public Completer[] getParameterCompleters();
/**
* Returns exception thrown for last command
* @return
*/
public Throwable getLastException();
}
查看继承树:
可以看到其有唯一的抽象继承类AbstractCommandHandler:
package org.apache.hive.beeline;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import jline.console.completer.Completer;
import jline.console.completer.NullCompleter;
/**
* An abstract implementation of CommandHandler.
*
*/
public abstract class AbstractCommandHandler implements CommandHandler {
private final BeeLine beeLine;
private final String name;
private final String[] names;
private final String helpText;
private Completer[] parameterCompleters = new Completer[0];
protected transient Throwable lastException;
public AbstractCommandHandler(BeeLine beeLine, String[] names, String helpText,
Completer[] completors) {
this.beeLine = beeLine;
name = names[0];
this.names = names;
this.helpText = helpText;
if (completors == null || completors.length == 0) {
parameterCompleters = new Completer[] { new NullCompleter() };
} else {
List<Completer> c = new LinkedList<Completer>(Arrays.asList(completors));
c.add(new NullCompleter());
parameterCompleters = c.toArray(new Completer[0]);
}
}
@Override
public String getHelpText() {
return helpText;
}
@Override
public String getName() {
return name;
}
@Override
public String[] getNames() {
return names;
}
@Override
public String matches(String line) {
if (line == null || line.length() == 0) {
return null;
}
String[] parts = beeLine.split(line);
if (parts == null || parts.length == 0) {
return null;
}
for (String name2 : names) {
if (name2.startsWith(parts[0])) {
return name2;
}
}
return null;
}
public void setParameterCompleters(Completer[] parameterCompleters) {
this.parameterCompleters = parameterCompleters;
}
@Override
public Completer[] getParameterCompleters() {
return parameterCompleters;
}
@Override
public Throwable getLastException() {
return lastException;
}
}
且有唯一的反射继承类ReflectiveCommandHandler:
package org.apache.hive.beeline;
import jline.console.completer.Completer;
import org.apache.hadoop.fs.shell.Command;
/**
* A {@link Command} implementation that uses reflection to
* determine the method to dispatch the command.
*
*/
public class ReflectiveCommandHandler extends AbstractCommandHandler {
private final BeeLine beeLine;
/**
* @param beeLine
* @param cmds 'cmds' is an array of alternative names for the same command. And that the
* first one is always chosen for display purposes and to lookup help
* documentation from BeeLine.properties file.
* @param completer
*/
public ReflectiveCommandHandler(BeeLine beeLine, String[] cmds, Completer[] completer) {
super(beeLine, cmds, beeLine.loc("help-" + cmds[0]), completer);
this.beeLine = beeLine;
}
public boolean execute(String line) {
lastException = null;
ClientHook hook = ClientCommandHookFactory.get().getHook(beeLine, line);
try {
Object ob = beeLine.getCommands().getClass().getMethod(getName(),
new Class[] {String.class})
.invoke(beeLine.getCommands(), new Object[] {line});
boolean result = (ob != null && ob instanceof Boolean && ((Boolean) ob).booleanValue());
if (hook != null && result) {
hook.postHook(beeLine);
}
return result;
} catch (Throwable e) {
lastException = e;
return beeLine.error(e);
}
}
}
怎么匹配前缀不是重点。重点是这货反射获取到commands对象后,就会获取到ReflectiveCommandHandler构建方法的第二个字符串数组的第一个值【例如connect】,就会调用Commands类的方法实现相关功能。笔者此处还是以connect举例:
package org.apache.hive.beeline;
public class Commands {
public boolean connect(String line) throws Exception {
String example = "Usage: connect [driver]"
+ BeeLine.getSeparator();
String[] parts = beeLine.split(line);
if (parts == null) {
return false;
}
if (parts.length < 2) {
return beeLine.error(example);
}
String url = parts.length < 2 ? null : parts[1];
String user = parts.length < 3 ? null : parts[2];
String pass = parts.length < 4 ? null : parts[3];
String driver = parts.length < 5 ? null : parts[4];
Properties props = new Properties();
if (url != null) {
String saveUrl = getUrlToUse(url);
props.setProperty(JdbcConnectionParams.PROPERTY_URL, saveUrl);
}
String value = null;
if (driver != null) {
props.setProperty(JdbcConnectionParams.PROPERTY_DRIVER, driver);
} else {
value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.PROPERTY_DRIVER);
if (value != null) {
props.setProperty(JdbcConnectionParams.PROPERTY_DRIVER, value);
}
}
if (user != null) {
props.setProperty(JdbcConnectionParams.AUTH_USER, user);
} else {
value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER);
if (value != null) {
props.setProperty(JdbcConnectionParams.AUTH_USER, value);
}
}
if (pass != null) {
props.setProperty(JdbcConnectionParams.AUTH_PASSWD, pass);
} else {
value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PASSWD);
if (value != null) {
props.setProperty(JdbcConnectionParams.AUTH_PASSWD, value);
}
}
value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_TYPE);
if (value != null) {
props.setProperty(JdbcConnectionParams.AUTH_TYPE, value);
}
return connect(props);
}
}
例如匹配到!connect的情况就会执行这个connect方法。类似的情况不再赘述。平时最常见的也就是!connect和!quit了。
不是“!”开头就执行commands.sql(line, getOpts().getEntireLineAsCommand()):
package org.apache.hive.beeline;
public class Commands {
public boolean sql(String line, boolean entireLineAsCommand) {
return execute(line, false, entireLineAsCommand);
}
private boolean execute(String line, boolean call, boolean entireLineAsCommand) {
if (line == null || line.length() == 0) {
return false; // ???
}
// ### FIXME: doing the multi-line handling down here means
// higher-level logic never sees the extra lines. So,
// for example, if a script is being saved, it won't include
// the continuation lines! This is logged as sf.net
// bug 879518.
// use multiple lines for statements not terminated by the delimiter
try {
line = handleMultiLineCmd(line);
} catch (Exception e) {
beeLine.handleException(e);
}
line = line.trim();
List<String> cmdList = getCmdList(line, entireLineAsCommand);
for (int i = 0; i < cmdList.size(); i++) {
String sql = cmdList.get(i).trim();
if (sql.length() != 0) {
if (!executeInternal(sql, call)) {
return false;
}
}
}
return true;
}
}
显然最正常的情况就是执行executeInternal(sql, call)方法【Commands.java中,call=false】:
// Return false only occurred error when execution the sql and the sql should follow the rules
// of beeline.
private boolean executeInternal(String sql, boolean call) {
if (!beeLine.isBeeLine()) {
sql = cliToBeelineCmd(sql);
}
if (sql == null || sql.length() == 0) {
return true;
}
if (beeLine.isComment(sql)) {
//skip this and rest cmds in the line
return true;
}
// is source CMD
if (isSourceCMD(sql)) {
return sourceFile(sql);
}
if (sql.startsWith(BeeLine.COMMAND_PREFIX)) {
return beeLine.execCommandWithPrefix(sql);
}
String prefix = call ? "call" : "sql";
if (sql.startsWith(prefix)) {
sql = sql.substring(prefix.length());
}
// batch statements?
if (beeLine.getBatch() != null) {
beeLine.getBatch().add(sql);
return true;
}
if (!(beeLine.assertConnection())) {
return false;
}
ClientHook hook = ClientCommandHookFactory.get().getHook(beeLine, sql);
try {
Statement stmnt = null;
boolean hasResults;
Thread logThread = null;
try {
long start = System.currentTimeMillis();
if (call) {
stmnt = beeLine.getDatabaseConnection().getConnection().prepareCall(sql);
hasResults = ((CallableStatement) stmnt).execute();
} else {
stmnt = beeLine.createStatement();
// In test mode we want the operation logs regardless of the settings
if (!beeLine.isTestMode() && beeLine.getOpts().isSilent()) {
hasResults = stmnt.execute(sql);
} else {
InPlaceUpdateStream.EventNotifier eventNotifier =
new InPlaceUpdateStream.EventNotifier();
logThread = new Thread(createLogRunnable(stmnt, eventNotifier));
logThread.setDaemon(true);
logThread.start();
if (stmnt instanceof HiveStatement) {
HiveStatement hiveStatement = (HiveStatement) stmnt;
hiveStatement.setInPlaceUpdateStream(
new BeelineInPlaceUpdateStream(
beeLine.getErrorStream(),
eventNotifier
));
}
hasResults = stmnt.execute(sql);
logThread.interrupt();
logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
}
}
beeLine.showWarnings();
if (hasResults) {
OutputFile outputFile = beeLine.getRecordOutputFile();
if (beeLine.isTestMode() && outputFile != null && outputFile.isActiveConverter()) {
outputFile.fetchStarted();
if (!sql.trim().toLowerCase().startsWith("explain")) {
outputFile.foundQuery(true);
} else {
outputFile.foundQuery(false);
}
}
do {
ResultSet rs = stmnt.getResultSet();
try {
int count = beeLine.print(rs);
long end = System.currentTimeMillis();
beeLine.info(
beeLine.loc("rows-selected", count) + " " + beeLine.locElapsedTime(end - start));
} finally {
if (logThread != null) {
logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
showRemainingLogsIfAny(stmnt);
logThread = null;
}
rs.close();
}
} while (BeeLine.getMoreResults(stmnt));
if (beeLine.isTestMode() && outputFile != null && outputFile.isActiveConverter()) {
outputFile.fetchFinished();
}
} else {
int count = stmnt.getUpdateCount();
long end = System.currentTimeMillis();
beeLine.info(
beeLine.loc("rows-affected", count) + " " + beeLine.locElapsedTime(end - start));
}
} finally {
if (logThread != null) {
if (!logThread.isInterrupted()) {
logThread.interrupt();
}
logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
showRemainingLogsIfAny(stmnt);
}
if (stmnt != null) {
stmnt.close();
}
}
} catch (Exception e) {
return beeLine.error(e);
}
beeLine.showWarnings();
if (hook != null) {
hook.postHook(beeLine);
}
return true;
}
然后就看到熟悉的Statement,和普通的JDBC大体相似的一波操作猛如虎。。。还顺带搞了个10000ms的log线程有啥用暂时也不用深究。
当call=false时,又会根据是否在test模式及一个配置项走2种方式,但最终都要执行:stmnt.execute。这个方法不陌生,就是Java的JDBC常用的Statement接口的execute方法。当执行sql的statement返回结果时,又要根据是否有enplain执行不同操作。最后就是是否statement,展示warning信息,及最后postHook释放连接资源。
当call=false时,执行:
stmnt = beeLine.createStatement();
获取到了JDBC的statement。跳入Beeline.java可以看到:
Statement createStatement() throws SQLException {
Statement stmnt = getDatabaseConnection().getConnection().createStatement();
if (getOpts().timeout > -1) {
stmnt.setQueryTimeout(getOpts().timeout);
}
if (signalHandler != null) {
signalHandler.setStatement(stmnt);
}
return stmnt;
}
最开始的getDatabaseConnection方法:
private final DatabaseConnections connections = new DatabaseConnections();
DatabaseConnection getDatabaseConnection() {
return getDatabaseConnections().current();
}
DatabaseConnections getDatabaseConnections() {
return connections;
}
显然这是个从类似连接池的类里拿取数据库连接的方法:
package org.apache.hive.beeline;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
class DatabaseConnections {
private final List<DatabaseConnection> connections = new ArrayList<DatabaseConnection>();
private int index = -1;
public DatabaseConnection current() {
if (index != -1) {
return connections.get(index);
}
return null;
}
public int size() {
return connections.size();
}
public Iterator<DatabaseConnection> iterator() {
return connections.iterator();
}
public void remove() {
if (index != -1) {
connections.remove(index);
}
while (index >= connections.size()) {
index--;
}
}
public void setConnection(DatabaseConnection connection) {
if (connections.indexOf(connection) == -1) {
connections.add(connection);
}
index = connections.indexOf(connection);
}
public int getIndex() {
return index;
}
public boolean setIndex(int index) {
if (index < 0 || index >= connections.size()) {
return false;
}
this.index = index;
return true;
}
}
而连接对象DatabaseConnection:
package org.apache.hive.beeline;
import org.apache.hive.jdbc.HiveConnection;
import jline.console.completer.ArgumentCompleter;
import jline.console.completer.Completer;
class DatabaseConnection {
private final String driver;
private final String url;
private final Properties info;
public DatabaseConnection(BeeLine beeLine, String driver, String url,
Properties info) throws SQLException {
this.beeLine = beeLine;
this.driver = driver;
this.url = url;
this.info = info;
}
public Connection getConnection() throws SQLException {
if (connection != null) {
return connection;
}
connect();
return connection;
}
/**
* Connection to the specified data source.
*/
boolean connect() throws SQLException {
try {
if (driver != null && driver.length() != 0) {
Class.forName(driver);
}
} catch (ClassNotFoundException cnfe) {
return beeLine.error(cnfe);
}
boolean isDriverRegistered = false;
try {
isDriverRegistered = DriverManager.getDriver(getUrl()) != null;
} catch (Exception e) {
}
try {
close();
} catch (Exception e) {
return beeLine.error(e);
}
Map<String, String> hiveVars = beeLine.getOpts().getHiveVariables();
if (hiveVars != null){
for (Map.Entry<String, String> var : hiveVars.entrySet()) {
info.put(HIVE_VAR_PREFIX + var.getKey(), var.getValue());
}
}
Map<String, String> hiveConfVars = beeLine.getOpts().getHiveConfVariables();
if (hiveConfVars != null){
for (Map.Entry<String, String> var : hiveConfVars.entrySet()) {
info.put(HIVE_CONF_PREFIX + var.getKey(), var.getValue());
}
}
if (isDriverRegistered) {
// if the driver registered in the driver manager, get the connection via the driver manager
setConnection(DriverManager.getConnection(getUrl(), info));
} else {
beeLine.debug("Use the driver from local added jar file.");
setConnection(getConnectionFromLocalDriver(getUrl(), info));
}
setDatabaseMetaData(getConnection().getMetaData());
try {
beeLine.info(beeLine.loc("connected", new Object[] {
getDatabaseMetaData().getDatabaseProductName(),
getDatabaseMetaData().getDatabaseProductVersion()}));
} catch (Exception e) {
beeLine.handleException(e);
}
try {
beeLine.info(beeLine.loc("driver", new Object[] {
getDatabaseMetaData().getDriverName(),
getDatabaseMetaData().getDriverVersion()}));
} catch (Exception e) {
beeLine.handleException(e);
}
try {
getConnection().setAutoCommit(beeLine.getOpts().getAutoCommit());
// TODO: Setting autocommit should not generate an exception as long as it is set to false
// beeLine.autocommitStatus(getConnection());
} catch (Exception e) {
beeLine.handleException(e);
}
try {
beeLine.getCommands().isolation("isolation: " + beeLine.getOpts().getIsolation());
} catch (Exception e) {
beeLine.handleException(e);
}
return true;
}
}
从构造方法来看,如果是null就会自动执行connect方法,这个connect方法就是类似JDBC创建连接的那一套,加载驱动包并且创建连接对象,那么DatabaseConnection的实例对象一定是有连接的。而DatabaseConnection的构造方法也传入了driver和url等参数。可以看到,就是喜闻乐见的JDBC。
根据加载后的配置判断,如果有脚本文件,执行executeFile(getOpts().getScriptFile()):
private int executeFile(String fileName) {
InputStream fileStream = null;
try {
if (!isBeeLine) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(fileName);
FileSystem fs;
HiveConf conf = getCommands().getHiveConf(true);
if (!path.toUri().isAbsolute()) {
fs = FileSystem.getLocal(conf);
path = fs.makeQualified(path);
} else {
fs = FileSystem.get(path.toUri(), conf);
}
fileStream = fs.open(path);
} else {
fileStream = new FileInputStream(fileName);
}
return execute(initializeConsoleReader(fileStream), !getOpts().getForce());
} catch (Throwable t) {
handleException(t);
return ERRNO_OTHER;
} finally {
IOUtils.closeStream(fileStream);
}
}
也就是会去获取到文件流,调用execute方法去具体执行。
最后会执行execute(reader, false)。根据之前的分析,这是记录历史文件及日志相关的功能,不是重点。正常执行后会退出。如果有-f传入的脚本文件,最终会在这个方法执行完毕,并且正常退出。
private int execute(ConsoleReader reader, boolean exitOnError) {
int lastExecutionResult = ERRNO_OK;
Character mask = (System.getProperty("jline.terminal", "").equals("jline.UnsupportedTerminal")) ? null
: ConsoleReader.NULL_MASK;
while (!exit) {
try {
// Execute one instruction; terminate on executing a script if there is an error
// in silent mode, prevent the query and prompt being echoed back to terminal
String line = (getOpts().isSilent() && getOpts().getScriptFile() != null) ? reader
.readLine(null, mask) : reader.readLine(getPrompt());
// trim line
if (line != null) {
line = line.trim();
}
if (!dispatch(line)) {
lastExecutionResult = ERRNO_OTHER;
if (exitOnError) {
break;
}
} else if (line != null) {
lastExecutionResult = ERRNO_OK;
}
} catch (Throwable t) {
handleException(t);
return ERRNO_OTHER;
}
}
return lastExecutionResult;
}
正常的大数据平台一般也是有-e传参的,文件传参不赘述。
从源码不难看出,beeline就是走JDBC连接Hive,并通过-u、-e等传入了参数,交给statement执行SQL。从源码也可以看出是通过url创建的连接,而JDBC众所周知是需要走反射的方式:
/**
* Attempts to establish a connection to the given database URL.
* The DriverManager
attempts to select an appropriate driver from
* the set of registered JDBC drivers.
*
* Note: If a property is specified as part of the {@code url} and
* is also specified in the {@code Properties} object, it is
* implementation-defined as to which value will take precedence.
* For maximum portability, an application should only specify a
* property once.
*
* @param url a database url of the form
* jdbc:subprotocol:subname
* @param info a list of arbitrary string tag/value pairs as
* connection arguments; normally at least a "user" and
* "password" property should be included
* @return a Connection to the URL
* @exception SQLException if a database access error occurs or the url is
* {@code null}
* @throws SQLTimeoutException when the driver has determined that the
* timeout value specified by the {@code setLoginTimeout} method
* has been exceeded and has at least tried to cancel the
* current database connection attempt
*/
@CallerSensitive
public static Connection getConnection(String url,
java.util.Properties info) throws SQLException {
return (getConnection(url, info, Reflection.getCallerClass()));
}
接下来看看反射加载的包具体实现,看看url是怎样传入的配置项。
Hive的JDBC驱动包名称都不陌生,就是这个org.apache.hive.jdbc.HiveDriver:
package org.apache.hive.jdbc;
import java.io.IOException;
import java.net.URL;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverPropertyInfo;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.Properties;
import java.util.jar.Attributes;
import java.util.jar.Manifest;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
/**
* HiveDriver.
*
*/
public class HiveDriver implements Driver {
static {
try {
java.sql.DriverManager.registerDriver(new HiveDriver());
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Is this driver JDBC compliant?
*/
private static final boolean JDBC_COMPLIANT = false;
/**
* Property key for the database name.
*/
private static final String DBNAME_PROPERTY_KEY = "DBNAME";
/**
* Property key for the Hive Server2 host.
*/
private static final String HOST_PROPERTY_KEY = "HOST";
/**
* Property key for the Hive Server2 port.
*/
private static final String PORT_PROPERTY_KEY = "PORT";
/**
*
*/
public HiveDriver() {
// TODO Auto-generated constructor stub
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkWrite("foobah");
}
}
/**
* Checks whether a given url is in a valid format.
*
* The current uri format is: jdbc:hive://[host[:port]]
*
* jdbc:hive:// - run in embedded mode jdbc:hive://localhost - connect to
* localhost default port (10000) jdbc:hive://localhost:5050 - connect to
* localhost port 5050
*
* TODO: - write a better regex. - decide on uri format
*/
@Override
public boolean acceptsURL(String url) throws SQLException {
return Pattern.matches(Utils.URL_PREFIX + ".*", url);
}
/*
* As per JDBC 3.0 Spec (section 9.2)
* "If the Driver implementation understands the URL, it will return a Connection object;
* otherwise it returns null"
*/
@Override
public Connection connect(String url, Properties info) throws SQLException {
return acceptsURL(url) ? new HiveConnection(url, info) : null;
}
/**
* Package scoped access to the Driver's Major Version
* @return The Major version number of the driver. -1 if it cannot be determined from the
* manifest.mf file.
*/
static int getMajorDriverVersion() {
int version = -1;
try {
String fullVersion = HiveDriver.fetchManifestAttribute(
Attributes.Name.IMPLEMENTATION_VERSION);
String[] tokens = fullVersion.split("\\."); //$NON-NLS-1$
if(tokens != null && tokens.length > 0 && tokens[0] != null) {
version = Integer.parseInt(tokens[0]);
}
} catch (Exception e) {
// Possible reasons to end up here:
// - Unable to read version from manifest.mf
// - Version string is not in the proper X.x.xxx format
version = -1;
}
return version;
}
/**
* Package scoped access to the Driver's Minor Version
* @return The Minor version number of the driver. -1 if it cannot be determined from the
* manifest.mf file.
*/
static int getMinorDriverVersion() {
int version = -1;
try {
String fullVersion = HiveDriver.fetchManifestAttribute(
Attributes.Name.IMPLEMENTATION_VERSION);
String[] tokens = fullVersion.split("\\."); //$NON-NLS-1$
if(tokens != null && tokens.length > 1 && tokens[1] != null) {
version = Integer.parseInt(tokens[1]);
}
} catch (Exception e) {
// Possible reasons to end up here:
// - Unable to read version from manifest.mf
// - Version string is not in the proper X.x.xxx format
version = -1;
}
return version;
}
/**
* Returns the major version of this driver.
*/
@Override
public int getMajorVersion() {
return HiveDriver.getMajorDriverVersion();
}
/**
* Returns the minor version of this driver.
*/
@Override
public int getMinorVersion() {
return HiveDriver.getMinorDriverVersion();
}
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
// JDK 1.7
throw new SQLFeatureNotSupportedException("Method not supported");
}
@Override
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
if (info == null) {
info = new Properties();
}
if ((url != null) && url.startsWith(Utils.URL_PREFIX)) {
info = parseURLforPropertyInfo(url, info);
}
DriverPropertyInfo hostProp = new DriverPropertyInfo(HOST_PROPERTY_KEY,
info.getProperty(HOST_PROPERTY_KEY, ""));
hostProp.required = false;
hostProp.description = "Hostname of Hive Server2";
DriverPropertyInfo portProp = new DriverPropertyInfo(PORT_PROPERTY_KEY,
info.getProperty(PORT_PROPERTY_KEY, ""));
portProp.required = false;
portProp.description = "Port number of Hive Server2";
DriverPropertyInfo dbProp = new DriverPropertyInfo(DBNAME_PROPERTY_KEY,
info.getProperty(DBNAME_PROPERTY_KEY, "default"));
dbProp.required = false;
dbProp.description = "Database name";
DriverPropertyInfo[] dpi = new DriverPropertyInfo[3];
dpi[0] = hostProp;
dpi[1] = portProp;
dpi[2] = dbProp;
return dpi;
}
/**
* Returns whether the driver is JDBC compliant.
*/
@Override
public boolean jdbcCompliant() {
return JDBC_COMPLIANT;
}
/**
* Takes a url in the form of jdbc:hive://[hostname]:[port]/[db_name] and
* parses it. Everything after jdbc:hive// is optional.
*
* The output from Utils.parseUrl() is massaged for the needs of getPropertyInfo
* @param url
* @param defaults
* @return
* @throws java.sql.SQLException
*/
private Properties parseURLforPropertyInfo(String url, Properties defaults) throws SQLException {
Properties urlProps = (defaults != null) ? new Properties(defaults)
: new Properties();
if (url == null || !url.startsWith(Utils.URL_PREFIX)) {
throw new SQLException("Invalid connection url: " + url);
}
JdbcConnectionParams params = null;
try {
params = Utils.parseURL(url, defaults);
} catch (ZooKeeperHiveClientException e) {
throw new SQLException(e);
}
String host = params.getHost();
if (host == null){
host = "";
}
String port = Integer.toString(params.getPort());
if(host.equals("")){
port = "";
}
else if(port.equals("0") || port.equals("-1")){
port = Utils.DEFAULT_PORT;
}
String db = params.getDbName();
urlProps.put(HOST_PROPERTY_KEY, host);
urlProps.put(PORT_PROPERTY_KEY, port);
urlProps.put(DBNAME_PROPERTY_KEY, db);
return urlProps;
}
/**
* Lazy-load manifest attributes as needed.
*/
private static Attributes manifestAttributes = null;
/**
* Loads the manifest attributes from the jar.
*
* @throws java.net.MalformedURLException
* @throws IOException
*/
private static synchronized void loadManifestAttributes() throws IOException {
if (manifestAttributes != null) {
return;
}
Class<?> clazz = HiveDriver.class;
String classContainer = clazz.getProtectionDomain().getCodeSource()
.getLocation().toString();
URL manifestUrl = new URL("jar:" + classContainer
+ "!/META-INF/MANIFEST.MF");
Manifest manifest = new Manifest(manifestUrl.openStream());
manifestAttributes = manifest.getMainAttributes();
}
/**
* Package scoped to allow manifest fetching from other HiveDriver classes
* Helper to initialize attributes and return one.
*
* @param attributeName
* @return
* @throws SQLException
*/
static String fetchManifestAttribute(Attributes.Name attributeName)
throws SQLException {
try {
loadManifestAttributes();
} catch (IOException e) {
throw new SQLException("Couldn't load manifest attributes.", e);
}
return manifestAttributes.getValue(attributeName);
}
}
匹配的就是:
package org.apache.hive.jdbc;
public class Utils {
static final Logger LOG = LoggerFactory.getLogger(Utils.class.getName());
/**
* The required prefix for the connection URL.
*/
public static final String URL_PREFIX = "jdbc:hive2://";
}
类似"jdbc:hive2://"这种开头的url。
当然也能看到根据配置解析参数的parseURLforPropertyInfo方法。
静态调用的构造方法:
public HiveDriver() {
// TODO Auto-generated constructor stub
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkWrite("foobah");
}
}
貌似是做安全检查的。
在HiveDriver的connect方法可以找到:
/*
* As per JDBC 3.0 Spec (section 9.2)
* "If the Driver implementation understands the URL, it will return a Connection object;
* otherwise it returns null"
*/
@Override
public Connection connect(String url, Properties info) throws SQLException {
return acceptsURL(url) ? new HiveConnection(url, info) : null;
}
跳入:
package org.apache.hive.jdbc;
public class HiveConnection implements java.sql.Connection {
public HiveConnection(String uri, Properties info) throws SQLException {
setupLoginTimeout();
try {
connParams = Utils.parseURL(uri, info);
} catch (ZooKeeperHiveClientException e) {
throw new SQLException(e);
}
jdbcUriString = connParams.getJdbcUriString();
// JDBC URL: jdbc:hive2://:/dbName;sess_var_list?hive_conf_list#hive_var_list
// each list: =;= and so on
// sess_var_list -> sessConfMap
// hive_conf_list -> hiveConfMap
// hive_var_list -> hiveVarMap
host = Utils.getCanonicalHostName(connParams.getHost());
port = connParams.getPort();
sessConfMap = connParams.getSessionVars();
isEmbeddedMode = connParams.isEmbeddedMode();
if (sessConfMap.containsKey(JdbcConnectionParams.FETCH_SIZE)) {
fetchSize = Integer.parseInt(sessConfMap.get(JdbcConnectionParams.FETCH_SIZE));
}
if (sessConfMap.containsKey(JdbcConnectionParams.INIT_FILE)) {
initFile = sessConfMap.get(JdbcConnectionParams.INIT_FILE);
}
wmPool = sessConfMap.get(JdbcConnectionParams.WM_POOL);
for (String application : JdbcConnectionParams.APPLICATION) {
wmApp = sessConfMap.get(application);
if (wmApp != null) break;
}
// add supported protocols
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V3);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V4);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V5);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10);
if (isEmbeddedMode) {
EmbeddedThriftBinaryCLIService embeddedClient = new EmbeddedThriftBinaryCLIService();
embeddedClient.init(null, connParams.getHiveConfs());
client = embeddedClient;
connParams.getHiveConfs().clear();
// open client session
openSession();
executeInitSql();
} else {
int maxRetries = 1;
try {
String strRetries = sessConfMap.get(JdbcConnectionParams.RETRIES);
if (StringUtils.isNotBlank(strRetries)) {
maxRetries = Integer.parseInt(strRetries);
}
} catch(NumberFormatException e) { // Ignore the exception
}
for (int numRetries = 0;;) {
try {
// open the client transport
openTransport();
// set up the client
client = new TCLIService.Client(new TBinaryProtocol(transport));
// open client session
openSession();
executeInitSql();
break;
} catch (Exception e) {
LOG.warn("Failed to connect to " + connParams.getHost() + ":" + connParams.getPort());
String errMsg = null;
String warnMsg = "Could not open client transport with JDBC Uri: " + jdbcUriString + ": ";
if (ZooKeeperHiveClientHelper.isZkDynamicDiscoveryMode(sessConfMap)) {
errMsg = "Could not open client transport for any of the Server URI's in ZooKeeper: ";
// Try next available server in zookeeper, or retry all the servers again if retry is enabled
while(!Utils.updateConnParamsFromZooKeeper(connParams) && ++numRetries < maxRetries) {
connParams.getRejectedHostZnodePaths().clear();
}
// Update with new values
jdbcUriString = connParams.getJdbcUriString();
host = Utils.getCanonicalHostName(connParams.getHost());
port = connParams.getPort();
} else {
errMsg = warnMsg;
++numRetries;
}
if (numRetries >= maxRetries) {
throw new SQLException(errMsg + e.getMessage(), " 08S01", e);
} else {
LOG.warn(warnMsg + e.getMessage() + " Retrying " + numRetries + " of " + maxRetries);
}
}
}
}
// Wrap the client with a thread-safe proxy to serialize the RPC calls
client = newSynchronizedClient(client);
}
}
显然这货就是数据连接实际调用的方法。
可以看到Hive的JDBC url正确的格式:
jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list
并且注释里写了要每个list都遵守k=v且用“;”隔开的格式。
当然不会使用默认Derby的嵌入式模式,继续查看正常模式的执行方法。在设置完成拉取数据的批量、重试次数等信息后,就会循环执行【超出最大重试次数就抛异常退出】:
// open the client transport
openTransport();
// set up the client
client = new TCLIService.Client(new TBinaryProtocol(transport));
// open client session
openSession();
executeInitSql();
这里是4步走战略。
跳入这个方法:
private void openTransport() throws Exception {
assumeSubject =
JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap
.get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE));
transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
if (!transport.isOpen()) {
transport.open();
}
logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort());
}
这个三元运算符是不是很熟悉?根据是否为Http传输模式,创建Http传输和加密的二进制传输。加密二进制传输当然不止有Kerberos方式:
/**
* Create transport per the connection options
* Supported transport options are:
* - SASL based transports over
* + Kerberos
* + Delegation token
* + SSL
* + non-SSL
* - Raw (non-SASL) socket
*
* Kerberos and Delegation token supports SASL QOP configurations
* @throws SQLException, TTransportException
*/
private TTransport createBinaryTransport() throws SQLException, TTransportException {
try {
TTransport socketTransport = createUnderlyingTransport();
// handle secure connection if specified
if (!JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))) {
// If Kerberos
Map<String, String> saslProps = new HashMap<String, String>();
SaslQOP saslQOP = SaslQOP.AUTH;
if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_QOP)) {
try {
saslQOP = SaslQOP.fromString(sessConfMap.get(JdbcConnectionParams.AUTH_QOP));
} catch (IllegalArgumentException e) {
throw new SQLException("Invalid " + JdbcConnectionParams.AUTH_QOP +
" parameter. " + e.getMessage(), "42000", e);
}
saslProps.put(Sasl.QOP, saslQOP.toString());
} else {
// If the client did not specify qop then just negotiate the one supported by server
saslProps.put(Sasl.QOP, "auth-conf,auth-int,auth");
}
saslProps.put(Sasl.SERVER_AUTH, "true");
if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL)) {
transport = KerberosSaslHelper.getKerberosTransport(
sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host,
socketTransport, saslProps, assumeSubject);
} else {
// If there's a delegation token available then use token based connection
String tokenStr = getClientDelegationToken(sessConfMap);
if (tokenStr != null) {
transport = KerberosSaslHelper.getTokenTransport(tokenStr,
host, socketTransport, saslProps);
} else {
// we are using PLAIN Sasl connection with user/password
String userName = getUserName();
String passwd = getPassword();
// Overlay the SASL transport on top of the base socket transport (SSL or non-SSL)
transport = PlainSaslHelper.getPlainTransport(userName, passwd, socketTransport);
}
}
} else {
// Raw socket connection (non-sasl)
transport = socketTransport;
}
} catch (SaslException e) {
throw new SQLException("Could not create secure connection to "
+ jdbcUriString + ": " + e.getMessage(), " 08S01", e);
}
return transport;
}
但是当JDBC的url连接串有principal的时候:
if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL)) {
transport = KerberosSaslHelper.getKerberosTransport(
sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host,
socketTransport, saslProps, assumeSubject);
}
当然毫无疑问要调用Kerberos相关的方法:
package org.apache.hive.service.auth;
import java.io.IOException;
import java.util.Map;
import javax.security.sasl.SaslException;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.rpc.thrift.TCLIService;
import org.apache.hive.service.rpc.thrift.TCLIService.Iface;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TTransport;
public final class KerberosSaslHelper {
public static TTransport getKerberosTransport(String principal, String host,
TTransport underlyingTransport, Map<String, String> saslProps, boolean assumeSubject)
throws SaslException {
try {
String[] names = principal.split("[/@]");
if (names.length != 3) {
throw new IllegalArgumentException("Kerberos principal should have 3 parts: " + principal);
}
if (assumeSubject) {
return createSubjectAssumedTransport(principal, underlyingTransport, saslProps);
} else {
HadoopThriftAuthBridge.Client authBridge =
HadoopThriftAuthBridge.getBridge().createClientWithConf("kerberos");
return authBridge.createClientTransport(principal, host, "KERBEROS", null,
underlyingTransport, saslProps);
}
} catch (IOException e) {
throw new SaslException("Failed to open client transport", e);
}
}
}
这玩意儿写死了要根据“@”和“/”来切分字符串,如果切出来不是3部分就要报错。
剩下的部分参见之前的一篇稿子:https://lizhiyong.blog.csdn.net/article/details/124184528
这篇稿子有分析过Hive2和Hive3生成Kerberos传输对象时的区别,碍于篇幅,此处不再展开。
将上一步生成的传输对象放进来,执行构造方法返回一个新的类,不是重点。
private void openSession() throws SQLException {
TOpenSessionReq openReq = new TOpenSessionReq();
Map<String, String> openConf = new HashMap<String, String>();
// for remote JDBC client, try to set the conf var using 'set foo=bar'
for (Entry<String, String> hiveConf : connParams.getHiveConfs().entrySet()) {
openConf.put("set:hiveconf:" + hiveConf.getKey(), hiveConf.getValue());
}
// For remote JDBC client, try to set the hive var using 'set hivevar:key=value'
for (Entry<String, String> hiveVar : connParams.getHiveVars().entrySet()) {
openConf.put("set:hivevar:" + hiveVar.getKey(), hiveVar.getValue());
}
// switch the database
openConf.put("use:database", connParams.getDbName());
// set the fetchSize
openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size",
Integer.toString(fetchSize));
if (wmPool != null) {
openConf.put("set:hivevar:wmpool", wmPool);
}
if (wmApp != null) {
openConf.put("set:hivevar:wmapp", wmApp);
}
// set the session configuration
Map<String, String> sessVars = connParams.getSessionVars();
if (sessVars.containsKey(HiveAuthConstants.HS2_PROXY_USER)) {
openConf.put(HiveAuthConstants.HS2_PROXY_USER,
sessVars.get(HiveAuthConstants.HS2_PROXY_USER));
}
openReq.setConfiguration(openConf);
// Store the user name in the open request in case no non-sasl authentication
if (JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))) {
openReq.setUsername(sessConfMap.get(JdbcConnectionParams.AUTH_USER));
openReq.setPassword(sessConfMap.get(JdbcConnectionParams.AUTH_PASSWD));
}
try {
TOpenSessionResp openResp = client.OpenSession(openReq);
// validate connection
Utils.verifySuccess(openResp.getStatus());
if (!supportedProtocols.contains(openResp.getServerProtocolVersion())) {
throw new TException("Unsupported Hive2 protocol");
}
protocol = openResp.getServerProtocolVersion();
sessHandle = openResp.getSessionHandle();
// Update fetchSize if modified by server
String serverFetchSize =
openResp.getConfiguration().get("hive.server2.thrift.resultset.default.fetch.size");
if (serverFetchSize != null) {
fetchSize = Integer.parseInt(serverFetchSize);
}
} catch (TException e) {
LOG.error("Error opening session", e);
throw new SQLException("Could not establish connection to "
+ jdbcUriString + ": " + e.getMessage(), " 08S01", e);
}
isClosed = false;
}
这个方法是根据代理、认证、配置等,做了一些初始化配置项和对象的操作。
private void executeInitSql() throws SQLException {
if (initFile != null) {
try {
List<String> sqlList = parseInitFile(initFile);
Statement st = createStatement();
for(String sql : sqlList) {
boolean hasResult = st.execute(sql);
if (hasResult) {
ResultSet rs = st.getResultSet();
while (rs.next()) {
System.out.println(rs.getString(1));
}
}
}
} catch(Exception e) {
LOG.error("Failed to execute initial SQL");
throw new SQLException(e.getMessage());
}
}
}
只是根据获取到的初始化文件预先执行一遍解析出来的SQL。不是重点。
package org.apache.hive.jdbc;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.logging.Logger;
import javax.sql.DataSource;
/**
* HiveDataSource.
*
*/
public class HiveDataSource implements DataSource {
/**
*
*/
public HiveDataSource() {
// TODO Auto-generated constructor stub
}
/*
* (non-Javadoc)
*
* @see javax.sql.DataSource#getConnection()
*/
@Override
public Connection getConnection() throws SQLException {
return getConnection("", "");
}
/*
* (non-Javadoc)
*
* @see javax.sql.DataSource#getConnection(java.lang.String, java.lang.String)
*/
@Override
public Connection getConnection(String username, String password)
throws SQLException {
try {
return new HiveConnection("", null);
} catch (Exception ex) {
throw new SQLException("Error in getting HiveConnection",ex);
}
}
/*
* (non-Javadoc)
*
* @see javax.sql.CommonDataSource#getLogWriter()
*/
@Override
public PrintWriter getLogWriter() throws SQLException {
// TODO Auto-generated method stub
throw new SQLFeatureNotSupportedException("Method not supported");
}
/*
* (non-Javadoc)
*
* @see javax.sql.CommonDataSource#getLoginTimeout()
*/
@Override
public int getLoginTimeout() throws SQLException {
// TODO Auto-generated method stub
throw new SQLFeatureNotSupportedException("Method not supported");
}
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
// JDK 1.7
throw new SQLFeatureNotSupportedException("Method not supported");
}
/*
* (non-Javadoc)
*
* @see javax.sql.CommonDataSource#setLogWriter(java.io.PrintWriter)
*/
@Override
public void setLogWriter(PrintWriter arg0) throws SQLException {
// TODO Auto-generated method stub
throw new SQLFeatureNotSupportedException("Method not supported");
}
/*
* (non-Javadoc)
*
* @see javax.sql.CommonDataSource#setLoginTimeout(int)
*/
@Override
public void setLoginTimeout(int arg0) throws SQLException {
// TODO Auto-generated method stub
throw new SQLFeatureNotSupportedException("Method not supported");
}
/*
* (non-Javadoc)
*
* @see java.sql.Wrapper#isWrapperFor(java.lang.Class)
*/
@Override
public boolean isWrapperFor(Class<?> arg0) throws SQLException {
// TODO Auto-generated method stub
throw new SQLFeatureNotSupportedException("Method not supported");
}
/*
* (non-Javadoc)
*
* @see java.sql.Wrapper#unwrap(java.lang.Class)
*/
@Override
public <T> T unwrap(Class<T> arg0) throws SQLException {
// TODO Auto-generated method stub
throw new SQLFeatureNotSupportedException("Method not supported");
}
}
看懂了JDBC的connect方法,也就能明白Spark、Flink、Hudi、Spring等组件是如何获取到Hive连接的。通过这个数据源HiveDataSource构建的连接,其实底层还是JDBC,与手写无异。
根据源码:
package org.apache.hive.jdbc;
public class Utils {
public static class JdbcConnectionParams {
// Note on client side parameter naming convention:
// Prefer using a shorter camelCase param name instead of using the same name as the
// corresponding
// HiveServer2 config.
// For a jdbc url: jdbc:hive2://:/dbName;sess_var_list?hive_conf_list#hive_var_list,
// client side params are specified in sess_var_list
// Client param names:
// Retry setting
static final String RETRIES = "retries";
public static final String AUTH_TYPE = "auth";
// We're deprecating this variable's name.
public static final String AUTH_QOP_DEPRECATED = "sasl.qop";
public static final String AUTH_QOP = "saslQop";
public static final String AUTH_SIMPLE = "noSasl";
public static final String AUTH_TOKEN = "delegationToken";
public static final String AUTH_USER = "user";
public static final String AUTH_PRINCIPAL = "principal";
public static final String AUTH_PASSWD = "password";
public static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
public static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
public static final String ANONYMOUS_USER = "anonymous";
public static final String ANONYMOUS_PASSWD = "anonymous";
public static final String USE_SSL = "ssl";
public static final String SSL_TRUST_STORE = "sslTrustStore";
public static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
// We're deprecating the name and placement of this in the parsed map (from hive conf vars to
// hive session vars).
static final String TRANSPORT_MODE_DEPRECATED = "hive.server2.transport.mode";
public static final String TRANSPORT_MODE = "transportMode";
// We're deprecating the name and placement of this in the parsed map (from hive conf vars to
// hive session vars).
static final String HTTP_PATH_DEPRECATED = "hive.server2.thrift.http.path";
public static final String HTTP_PATH = "httpPath";
public static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode";
public static final String PROPERTY_DRIVER = "driver";
public static final String PROPERTY_URL = "url";
// Don't use dynamic service discovery
static final String SERVICE_DISCOVERY_MODE_NONE = "none";
// Use ZooKeeper for indirection while using dynamic service discovery
public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA = "zooKeeperHA";
static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
// Default namespace value on ZooKeeper.
// This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri.
static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
static final String ZOOKEEPER_ACTIVE_PASSIVE_HA_DEFAULT_NAMESPACE = "hs2ActivePassiveHA";
static final String COOKIE_AUTH = "cookieAuth";
static final String COOKIE_AUTH_FALSE = "false";
static final String COOKIE_NAME = "cookieName";
// The default value of the cookie name when CookieAuth=true
static final String DEFAULT_COOKIE_NAMES_HS2 = "hive.server2.auth";
// The http header prefix for additional headers which have to be appended to the request
static final String HTTP_HEADER_PREFIX = "http.header.";
// Set the fetchSize
static final String FETCH_SIZE = "fetchSize";
static final String INIT_FILE = "initFile";
static final String WM_POOL = "wmPool";
// Cookie prefix
static final String HTTP_COOKIE_PREFIX = "http.cookie.";
// We support ways to specify application name modeled after some existing DBs, since
// there's no standard approach.
// MSSQL: applicationName https://docs.microsoft.com/en-us/sql/connect/jdbc/building-the-connection-url
// Postgres 9~: ApplicationName https://jdbc.postgresql.org/documentation/91/connect.html
// Note: various ODBC names used include "Application Name", "APP", etc. Add those?
static final String[] APPLICATION = new String[] { "applicationName", "ApplicationName" };
// --------------- Begin 2 way ssl options -------------------------
// Use two way ssl. This param will take effect only when ssl=true
static final String USE_TWO_WAY_SSL = "twoWay";
static final String TRUE = "true";
static final String SSL_KEY_STORE = "sslKeyStore";
static final String SSL_KEY_STORE_PASSWORD = "keyStorePassword";
static final String SSL_KEY_STORE_TYPE = "JKS";
static final String SUNX509_ALGORITHM_STRING = "SunX509";
static final String SUNJSSE_ALGORITHM_STRING = "SunJSSE";
// --------------- End 2 way ssl options ----------------------------
// Non-configurable params:
// Currently supports JKS keystore format
static final String SSL_TRUST_STORE_TYPE = "JKS";
private static final String HIVE_VAR_PREFIX = "hivevar:";
private static final String HIVE_CONF_PREFIX = "hiveconf:";
private String host = null;
private int port = 0;
private String jdbcUriString;
private String dbName = DEFAULT_DATABASE;
private Map<String,String> hiveConfs = new LinkedHashMap<String,String>();
private Map<String,String> hiveVars = new LinkedHashMap<String,String>();
private Map<String,String> sessionVars = new LinkedHashMap<String,String>();
private boolean isEmbeddedMode = false;
private String suppliedURLAuthority;
private String zooKeeperEnsemble = null;
private String currentHostZnodePath;
private final List<String> rejectedHostZnodePaths = new ArrayList<String>();
public JdbcConnectionParams() {
}
public JdbcConnectionParams(JdbcConnectionParams params) {
this.host = params.host;
this.port = params.port;
this.jdbcUriString = params.jdbcUriString;
this.dbName = params.dbName;
this.hiveConfs.putAll(params.hiveConfs);
this.hiveVars.putAll(params.hiveVars);
this.sessionVars.putAll(params.sessionVars);
this.isEmbeddedMode = params.isEmbeddedMode;
this.suppliedURLAuthority = params.suppliedURLAuthority;
this.zooKeeperEnsemble = params.zooKeeperEnsemble;
this.currentHostZnodePath = params.currentHostZnodePath;
this.rejectedHostZnodePaths.addAll(rejectedHostZnodePaths);
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public String getJdbcUriString() {
return jdbcUriString;
}
public String getDbName() {
return dbName;
}
public Map<String, String> getHiveConfs() {
return hiveConfs;
}
public Map<String, String> getHiveVars() {
return hiveVars;
}
public boolean isEmbeddedMode() {
return isEmbeddedMode;
}
public Map<String, String> getSessionVars() {
return sessionVars;
}
public String getSuppliedURLAuthority() {
return suppliedURLAuthority;
}
public String getZooKeeperEnsemble() {
return zooKeeperEnsemble;
}
public List<String> getRejectedHostZnodePaths() {
return rejectedHostZnodePaths;
}
public String getCurrentHostZnodePath() {
return currentHostZnodePath;
}
public void setHost(String host) {
this.host = host;
}
public void setPort(int port) {
this.port = port;
}
public void setJdbcUriString(String jdbcUriString) {
this.jdbcUriString = jdbcUriString;
}
public void setDbName(String dbName) {
this.dbName = dbName;
}
public void setHiveConfs(Map<String, String> hiveConfs) {
this.hiveConfs = hiveConfs;
}
public void setHiveVars(Map<String, String> hiveVars) {
this.hiveVars = hiveVars;
}
public void setEmbeddedMode(boolean embeddedMode) {
this.isEmbeddedMode = embeddedMode;
}
public void setSessionVars(Map<String, String> sessionVars) {
this.sessionVars = sessionVars;
}
public void setSuppliedURLAuthority(String suppliedURLAuthority) {
this.suppliedURLAuthority = suppliedURLAuthority;
}
public void setZooKeeperEnsemble(String zooKeeperEnsemble) {
this.zooKeeperEnsemble = zooKeeperEnsemble;
}
public void setCurrentHostZnodePath(String currentHostZnodePath) {
this.currentHostZnodePath = currentHostZnodePath;
}
}
}
这里定义了非常多的配置项。
根据注释sess_var_list是client端的配置。此外,还有hive_conf_list和hive_var_list。此处看到也就是普通的Map类型。
事实上,根据官网手册,beeline执行:
set -v;
即可获取到所有Hadoop和Hive相关的配置项。用CDP7的Hive On Tez的小朋友们一定要注意到坑,Tez的配置项可不一定齐全。。。如果是hive.xx.tez.xx这种一定有,因为它是Hive的Tez配置,但是tez.xx这种一定没有,因为这种是Tez自己的配置项。可以在beeline中做查询和设置:
0: jdbc:hive2://192.168.88.101:10000/default> set tez.am.resource.memory.mb;
+-----------------------------------------+
| set |
+-----------------------------------------+
| tez.am.resource.memory.mb is undefined |
+-----------------------------------------+
1 row selected (0.098 seconds)
0: jdbc:hive2://192.168.88.101:10000/default>
但是set -v拿到的配置项里一定没有这种。读者可以自己去试试。
鹅,set -v的结果太长了,具体见附录:https://lizhiyong.blog.csdn.net/article/details/126634922
通过查看官方文档及扒源码,很容易理解Beeline的执行流程及JDBC连接流程。平时固定的用法其实都是源码中写死的,一定要养成多看源码的好习惯,才能凡事不求人。
Beeline的执行流程总结概括一下:
Main方法初始化Beeline对象→吊起唯一的方法→读取配置的load方法→
启动历史的setupHistory方法→平滑退出的addBeelineShutdownHook方法→
初始化命令行读取器的initializeConsoleReader方法→初始化入参的initArgs方法→
调度的dispatch方法→执行脚本文件的executeFile方法【不一定执行】→
最终执行execute方法【如果有-f传入脚本文件,内部也会吊起调度的dispatch方法】
而运行时最主要的是调度的dispatch方法。总结概括一下:
根据入参判断是否有!开头
-->有!开头就调用有前缀方法execCommandWithPrefix方法→根据!之后的关键字匹配→反射执行同名方法
-->无!开头就调用无前缀方法commands.sql→按照JDBC获取连接、获取statement、执行sql返回resultSet、展示结果及释放资源的标准套路执行sql
JDBC连接Hive也总结概括一下:
HiveDriver捕获url句柄、反射加载驱动类→
HiveConnection解析url参数→最大retry次数内遍历获取连接对象【非嵌入式模式】
遍历获取连接对象的连接步骤也总结概括一下:
调用openTransport方法获取传输对象【http、二进制;二进制方式还会有Kerberos等多种方式】→
设置client对象→开启会话openSession方法【此时会根据代理、认证、配置等设置批量大小等很多参数】→
执行初始化executeInitSql【如果有-i传入的文件】
以及Spark、Flink、Hudi、Spring等组件常用的HiveDataSource底层其实也是通过JDBC连接到Hive。并且可以在beeline中执行set -v来查看所有可以设置的Hadoop及Hive的参数【包括Hive所需的tez参数,但是不包括计算引擎tez自己的参数】。
Hive虽然古老,还是有很多可学习借鉴的地方。