上篇我们讲解了flink-conf.yaml的解析和3种flink命令行客户端的添加,现在到了客户端提交application部分了,这里我们先看如何进行flink命令自定义参数的解析的,和flink命令行客户端的选择
flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.main中,CliFrontend的实例化实现如下:
/** Submits the job based on the arguments. */
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
......省略部分......
int retCode = 31;
try {
// 进行CliFrontend客户端的实例化
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
// 安全模块的安装
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// 进行flink自定义参数的解析,并进行application的提交
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
}
......省略部分......
}
这里先对CliFrontend进行了实例化,我们来看CliFrontend的构造函数,完成的工作如下:
public CliFrontend(Configuration configuration, List customCommandLines) {
this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines);
}
public CliFrontend(
Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
List customCommandLines) {
this.configuration = checkNotNull(configuration);
this.customCommandLines = checkNotNull(customCommandLines);
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
// 进行客户端的文件系统初始化。Flink的文件系统采用了插件的方式,以支持不同的文件系统
FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
this.customCommandLineOptions = new Options();
// 添加zookeeperNamespace、jobmanager、-D传递的参数的key和value到customCommandLineOptions
// 和GenericCli、FlinkYarnSessionCli的参数
for (CustomCommandLine customCommandLine : customCommandLines) {
customCommandLine.addGeneralOptions(customCommandLineOptions);
customCommandLine.addRunOptions(customCommandLineOptions);
}
// 获取客户端的超时时间和flink application的默认并行度
this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
}
后面是安全模块的安装,再后面就是进行flink命令自定义参数的解析,然后提交flink的application了
flink-clients/src/main/java/org.apache.flink.cli.CliFrontend.parseAndRun函数的实现如下:
/**
* Parses the command line arguments and starts the requested action.
*
* @param args command line arguments of the client.
* @return The return code of the program
*/
public int parseAndRun(String[] args) {
// check for action
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// 获取执行的动作,我们这里是run
// get action
String action = args[0];
// 除了run,剩余的参数
// remove action from parameters
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// do action
switch (action) {
// run动作的执行方法
case ACTION_RUN:
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
......省略部分......
}
}
这里进行action动作的获取,然后执行run方法。传入的params是剩余的flink命令自定义参数。我们接下来看run方法是如何进行参数解析的
flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.run函数实现如下:
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
// 添加flink的默认参数help、verbose、fromSavepoint、allowNonRestoredState、restoreMode、jarfile、
// class、classpath、parallelism、arguments、detached、shutdownOnAttachedExit、yarndetached、
// python、pyFiles、pyModule、pyRequirements、pyArchives、pyExecutable、pyClientExecutable
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
// 通过默认的参数,解析出只包含flink命令自定义的参数的commandLine
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
......省略部分......
}
先获取默认的flink运行参数,再解析出只包含flink命令自定义的参数的commandLine。接下来看getCommand函数的实现
public CommandLine getCommandLine(
final Options commandOptions, final String[] args, final boolean stopAtNonOptions)
throws CliArgsException {
// 将flink默认的参数,和我们在CliFrontend构造函数创建的customCommandLineOptions进行追加
// customCommandLineOptions包含zookeeperNamespace、jobmanager、-D传递的参数的key和value
// 和GenericCli、FlinkYarnSessionCli的参数
final Options commandLineOptions =
CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
// 通过flink的参数,解析出只包含flink命令自定义的参数的commandLine
return CliFrontendParser.parse(commandLineOptions, args, stopAtNonOptions);
}
追加完flink 3种命令行客户端的参数,再解析出只包含flink命令自定义的参数的commandLine
对CliFrontendParser.parse进行不停的跳转。最终调用了commons-cli-1.15.0.jar/org.apache.commons.cli.DefaultParser的parse方法。循环进行进行参数的处理
public CommandLine parse(Options options, String[] arguments, Properties properties, boolean stopAtNonOption) throws ParseException {
this.options = options;
this.stopAtNonOption = stopAtNonOption;
this.skipParsing = false;
this.currentOption = null;
this.expectedOpts = new ArrayList(options.getRequiredOptions());
Iterator var5 = options.getOptionGroups().iterator();
while(var5.hasNext()) {
OptionGroup group = (OptionGroup)var5.next();
group.setSelected((Option)null);
}
this.cmd = new CommandLine();
if (arguments != null) {
String[] var9 = arguments;
int var10 = arguments.length;
// 循环处理每个参数
for(int var7 = 0; var7 < var10; ++var7) {
String argument = var9[var7];
// 对参数进行处理
this.handleToken(argument);
}
}
this.checkRequiredArgs();
this.handleProperties(properties);
this.checkRequiredOptions();
return this.cmd;
}
commons-cli-1.15.0.jar/org.apache.commons.cli.DefaultParser的handleToken方法如下:
private void handleToken(String token) throws ParseException {
this.currentToken = token;
if (this.skipParsing) {
this.cmd.addArg(token);
} else if ("--".equals(token)) {
this.skipParsing = true;
// flink命令传递的参数的值(非key)的处理
} else if (this.currentOption != null && this.currentOption.acceptsArg() && this.isArgument(token)) {
// 将flink命令传递的参数的值,添加到上一步设置的this.currentOption
this.currentOption.addValueForProcessing(this.stripLeadingAndTrailingQuotesDefaultOn(token));
// 长参数--arg的处理,并设置为this.currentOption。对currentOption的修改就是对commandLineOptions的修改
} else if (token.startsWith("--")) {
this.handleLongOption(token);
// 短参数-arg的处理,并设置为this.currentOption。对currentOption的修改就是对commandLineOptions的修改
} else if (token.startsWith("-") && !"-".equals(token)) {
this.handleShortAndLongOption(token);
} else {
this.handleUnknownToken(token);
}
if (this.currentOption != null && !this.currentOption.acceptsArg()) {
this.currentOption = null;
}
}
handleToken的主要处理逻辑是:
当我们执行flink run --help,是如何打印出帮助信息的
flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.run函数实现如下:
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
......省略部分......
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// 打印flink run --help帮助信息
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
......省略部分......
}
如何run后面跟了–help,就会打印帮助信息
flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontendParser.printHelpForRun函数实现如下:
public static void printHelpForRun(Collection customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
// 设置左边留多少空格
formatter.setLeftPadding(5);
// 设置打印显示的宽度
formatter.setWidth(80);
System.out.println("\nAction \"run\" compiles and runs a program.");
System.out.println("\n Syntax: run [OPTIONS] ");
// 设置action区域的的标题信息,后面和Options一起打印
formatter.setSyntaxPrefix(" \"run\" action options:");
// 获取run的Options,然后进行打印
formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
// 循环设置每个命令行客户端的SyntaxPrefix,后面和Options一起打印,再获取命令行客户端的Options,然后进行打印
printCustomCliOptions(customCommandLines, formatter, true);
System.out.println();
}
这里打印信息是通过HelpFormatter这个类来实现的,具体的实现这里就不贴出来了,注意逻辑是将每一部分要打印的信息按格式追加到StringBuffer中,最后用PrintWriter进行输出
上一篇,我们讲了3种flink命令行客户端的添加,将GenericCli、flinkYarnSessionCLI、DefaultCLI这3种命令行客户端添加到了customCommandLines这个ArrayList里面了。现在看flink最终选择了哪种命令行客户端
flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.run函数实现如下:
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
......省略部分......
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
// 获取active的flink命令行客户端
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
......省略部分......
}
获取active的flink命令行客户端
flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.validateAndGetActiveCommandLine函数实现如下:
/**
* Gets the custom command-line for the arguments.
*
* @param commandLine The input to the command-line.
* @return custom command-line which is active (may only be one at a time)
*/
public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
LOG.debug("Custom commandlines: {}", customCommandLines);
for (CustomCommandLine cli : customCommandLines) {
LOG.debug(
"Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
if (cli.isActive(commandLine)) {
return cli;
}
}
throw new IllegalStateException("No valid command-line found.");
}
循环列表,谁先符合active条件,则直接返回退出。我们这里返回的是最后一个命令行客户端DefaultCli