任务提交方式:运行命令行flink脚本
使用flink脚本提交任务示例:
flink run ...
从 flink 脚本可以看到 org.apache.flink.client.cli.CliFrontend 是入口类
# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
主要功能是接收并解析命令行传入的命令,调用相应工具类执行命令
提供以下actions:
接下来我们看 mian()如何执行 run 流程
/** Submits the job based on the arguments. */
public static void main(final String[] args) {
// 打印基本的环境信息
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration flink-conf.yaml
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines 封装命令行接口:按顺序Generic、Yarn、Default
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
try {
// 4、 创建 CliFrontend 对象
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
// 5、加载安全配置模块
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// 6.根据命令行参数进行Switch case 匹配,执行对应的action、回调,并返回状态码。这块是主要逻辑
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) { //....
}finally {
System.exit(retCode); // 7、获取执行返回状态码,关闭提交程序
}
}
详细流程分析如下
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
logEnvironmentInfo 具体实现:
/**
* Logs information about the environment, like code revision, current user, Java version,
* and JVM parameters.
*
* @param log The logger to log the information to.
* @param componentName The component name to mention in the log.
* @param commandLineArgs The arguments accompanying the starting the component.
*/
public static void logEnvironmentInfo(Logger log, String componentName, String[] commandLineArgs) {
if (log.isInfoEnabled()) {
// 得到代码git的最终提交id和日期
RevisionInformation rev = getRevisionInformation();
// 代码版本
String version = getVersion();
// JVM版本,利用JavaSDK自带的ManagementFactory类来获取。
String jvmVersion = getJvmVersion();
// JVM的启动参数,也是通过JavaSDK自带的ManagementFactory类来获取。
String[] options = getJvmStartupOptionsArray();
// JAVA_Home目录
String javaHome = System.getenv("JAVA_HOME");
// JVM的最大堆内存大小,单位Mb。
long maxHeapMegabytes = getMaxJvmHeapMemory() >>> 20;
// 打印基本信息
log.info("--------------------------------------------------------------------------------");
log.info(" Starting " + componentName + " (Version: " + version + ", "
+ "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")");
log.info(" OS current user: " + System.getProperty("user.name"));
log.info(" Current Hadoop/Kerberos user: " + getHadoopUser());
log.info(" JVM: " + jvmVersion);
log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : javaHome));
// hadoop的版本信息
String hadoopVersionString = getHadoopVersionString();
if (hadoopVersionString != null) {
log.info(" Hadoop version: " + hadoopVersionString);
} else {
log.info(" No Hadoop Dependency available");
}
// 打印JVM运行 参数
if (options.length == 0) {
log.info(" JVM Options: (none)");
}
else {
log.info(" JVM Options:");
for (String s: options) {
log.info(" " + s);
}
}
// 任务程序启动参数
if (commandLineArgs == null || commandLineArgs.length == 0) {
log.info(" Program Arguments: (none)");
}
else {
log.info(" Program Arguments:");
for (String s: commandLineArgs) {
log.info(" " + s);
}
}
log.info(" Classpath: " + System.getProperty("java.class.path"));
log.info("--------------------------------------------------------------------------------");
}
}
// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();
根据环境变量 FLINK_CONF_DIR 获取 flink 配置文件目录
调用 GlobalConfiguration 的 loadConfiguration 方法加载 flink 配置文件 flink-conf.yaml 中的配置,解析后转成 Configuration 对象
// 2. load the global configuration flink-conf.yaml
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
loadConfiguration 具体实现:
/**
* Loads the configuration files from the specified directory. If the dynamic properties
* configuration is not null, then it is added to the loaded configuration.
*
* @param configDir directory to load the configuration from
* @param dynamicProperties configuration file containing the dynamic properties. Null if none.
* @return The configuration loaded from the given configuration directory
*/
public static Configuration loadConfiguration(
final String configDir, @Nullable final Configuration dynamicProperties) {
if (configDir == null) {
throw new IllegalArgumentException(
"Given configuration directory is null, cannot load configuration");
}
final File confDirFile = new File(configDir);
if (!(confDirFile.exists())) {
throw new IllegalConfigurationException(
"The given configuration directory name '"
+ configDir
+ "' ("
+ confDirFile.getAbsolutePath()
+ ") does not describe an existing directory.");
}
/** 1.判断配置目录是否为空,不为空获取配置文件,就是flink的配置文件flink-conf.yaml */
// get Flink yaml configuration file
final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
if (!yamlConfigFile.exists()) {
throw new IllegalConfigurationException(
"The Flink config file '"
+ yamlConfigFile
+ "' ("
+ confDirFile.getAbsolutePath()
+ ") does not exist.");
}
/** 2.【核心逻辑】获取到文件文件后,调用loadYAMLResource方法,去解析yaml配置文件,并返回HashMap键值对形式的Configuration */
Configuration configuration = loadYAMLResource(yamlConfigFile);
if (dynamicProperties != null) {
configuration.addAll(dynamicProperties);
}
return enrichWithEnvironmentVariables(configuration);
}
调用loadCustomCommandLines方法,加载 自定义命令行(CustomCommandLine)
// 3. load the custom command lines
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
调用 loadCustomCommandLines 方法,加载自定义命令行
public static List<CustomCommandLine> loadCustomCommandLines(
Configuration configuration, String configurationDirectory) {
List<CustomCommandLine> customCommandLines = new ArrayList<>();
// 1、创建一个 GenericCLI
customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
// 2. YARN会话的命令行接口,所有选项参数都是以y/yarn前缀。
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
// 3. 添加yarn模式命令行
customCommandLines.add(
loadCustomCommandLine(
flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {
LOG.info("Loading FallbackYarnSessionCli");
// 4、出现异常时,添加 FallbackYarnSessionCli
customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
}
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
// the active CustomCommandLine in order and DefaultCLI isActive always return true.
// 5、添加 DefaultCLI
customCommandLines.add(new DefaultCLI());
return customCommandLines;
}
类图关系如下:

后面章节讲解获取活跃状态的命令行客户端,就是下面封装的GenericCLI、FlinkYarnSessionCli、DefaultCLI 三个客户端。按顺序判断那个是活跃,谁活跃就使用谁,然后跳出判断,返回结果。下面介绍其判断逻辑。
// 1、创建一个 GenericCLI
customCommandLines.add(new GenericCLI(configuration, configurationDirectory));

通过反射构建 yarn 命令行
/**
* 通过反射构建命令行
* @param className 加载的类名全程.
* @param params 构建参数
*/
private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
// 1. 加载classpath里相关的类,这个加载的类实现了CustomCommandLine接口
Class<? extends CustomCommandLine> customCliClass =
Class.forName(className).asSubclass(CustomCommandLine.class);
// 2. 从参数里构建出参数的Class类型
Class<?>[] types = new Class<?>[params.length];
for (int i = 0; i < params.length; i++) {
Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
types[i] = params[i].getClass();
}
// 3. 生成构造器org.apache.flink.yarn.cli$FlinkYarnSessionCli
Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
// 4. 构造器实例化。调用org.apache.flink.yarn.cli$FlinkYarnSessionCli的构造方法,进行实例化。
return constructor.newInstance(params);
}
/**
* 初始化一个FlinkYarnSessionCli
* @param configuration 全局的配置
* @param configurationDirectory 全局的配置文件目录
* @param shortPrefix 命令行参数的缩写前缀
* @param longPrefix 命令行参数的展开前缀
* @param acceptInteractiveInput 是否接受交互型输入
* @throws FlinkException
*/
public FlinkYarnSessionCli(
Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
String configurationDirectory,
String shortPrefix,
String longPrefix,
boolean acceptInteractiveInput)
throws FlinkException {
// 1. 初始化参数
super(configuration, shortPrefix, longPrefix);
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
this.configurationDirectory = checkNotNull(configurationDirectory);
this.acceptInteractiveInput = acceptInteractiveInput;
// Create the command line options
// 2. 创建命令行选项
query =
new Option(
shortPrefix + "q",
longPrefix + "query",
false,
"Display available YARN resources (memory, cores)");
queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
shipPath =
new Option(
shortPrefix + "t",
longPrefix + "ship",
true,
"Ship files in the specified directory (t for transfer)");
flinkJar =
new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
jmMemory =
new Option(
shortPrefix + "jm",
longPrefix + "jobManagerMemory",
true,
"Memory for JobManager Container with optional unit (default: MB)");
tmMemory =
new Option(
shortPrefix + "tm",
longPrefix + "taskManagerMemory",
true,
"Memory per TaskManager Container with optional unit (default: MB)");
slots =
new Option(
shortPrefix + "s",
longPrefix + "slots",
true,
"Number of slots per TaskManager");
dynamicproperties =
Option.builder(shortPrefix + "D")
.argName("property=value")
.numberOfArgs(2)
.valueSeparator()
.desc("use value for given property")
.build();
name =
new Option(
shortPrefix + "nm",
longPrefix + "name",
true,
"Set a custom name for the application on YARN");
applicationType =
new Option(
shortPrefix + "at",
longPrefix + "applicationType",
true,
"Set a custom application type for the application on YARN");
zookeeperNamespace =
new Option(
shortPrefix + "z",
longPrefix + "zookeeperNamespace",
true,
"Namespace to create the Zookeeper sub-paths for high availability mode");
nodeLabel =
new Option(
shortPrefix + "nl",
longPrefix + "nodeLabel",
true,
"Specify YARN node label for the YARN application");
help =
new Option(
shortPrefix + "h",
longPrefix + "help",
false,
"Help for the Yarn session CLI.");
allOptions = new Options();
allOptions.addOption(flinkJar);
allOptions.addOption(jmMemory);
allOptions.addOption(tmMemory);
allOptions.addOption(queue);
allOptions.addOption(query);
allOptions.addOption(shipPath);
allOptions.addOption(slots);
allOptions.addOption(dynamicproperties);
allOptions.addOption(DETACHED_OPTION);
allOptions.addOption(YARN_DETACHED_OPTION);
allOptions.addOption(name);
allOptions.addOption(applicationId);
allOptions.addOption(applicationType);
allOptions.addOption(zookeeperNamespace);
allOptions.addOption(nodeLabel);
allOptions.addOption(help);
// try loading a potential yarn properties file
// 3. 加载默认的yarn配置文件
this.yarnPropertiesFileLocation =
configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION);
final File yarnPropertiesLocation = getYarnPropertiesLocation(yarnPropertiesFileLocation);
// 4. 解析出yarn的配置参数
yarnPropertiesFile = new Properties();
if (yarnPropertiesLocation.exists()) {
LOG.info(
"Found Yarn properties file under {}.",
yarnPropertiesLocation.getAbsolutePath());
try (InputStream is = new FileInputStream(yarnPropertiesLocation)) {
yarnPropertiesFile.load(is);
} catch (IOException ioe) {
throw new FlinkException(
"Could not read the Yarn properties file "
+ yarnPropertiesLocation
+ ". Please delete the file at "
+ yarnPropertiesLocation.getAbsolutePath()
+ '.',
ioe);
}
final String yarnApplicationIdString =
yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);
if (yarnApplicationIdString == null) {
throw new FlinkException(
"Yarn properties file found but doesn't contain a "
+ "Yarn application id. Please delete the file at "
+ yarnPropertiesLocation.getAbsolutePath());
}
try {
// try converting id to ApplicationId
// 尝试将id转化成ApplicationId
yarnApplicationIdFromYarnProperties =
ConverterUtils.toApplicationId(yarnApplicationIdString);
} catch (Exception e) {
throw new FlinkException(
"YARN properties contain an invalid entry for "
+ "application id: "
+ yarnApplicationIdString
+ ". Please delete the file at "
+ yarnPropertiesLocation.getAbsolutePath(),
e);
}
} else {
yarnApplicationIdFromYarnProperties = null;
}
}
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {
LOG.info("Loading FallbackYarnSessionCli");
// 4、出现异常时,添加 FallbackYarnSessionCli
customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
默认命令行, standalone模式使用
public CliFrontend(
Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
List<CustomCommandLine> customCommandLines) {
// 1. 初始化对象属性判断是否为空
this.configuration = checkNotNull(configuration);
this.customCommandLines = checkNotNull(customCommandLines);
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
// 2. 初始化文件系统
FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
this.customCommandLineOptions = new Options();
// 3. 给命令行对象添加选项
// 获取用户命令行配置customCommandLines,遍历list将其添加到运行配置和一般配置中
for (CustomCommandLine customCommandLine : customCommandLines) {
customCommandLine.addGeneralOptions(customCommandLineOptions);
customCommandLine.addRunOptions(customCommandLineOptions);
}
// 4. 从全局配置里得到akka 客户端等待超时时间(akka.client.timeout)
this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
// 5. 从全局配置里得到默认的系统并行度
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
}
更详细的内容见 Flink1.15源码解析–安全模块及安全上下文
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
这块是主要逻辑
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
可能需要具有的安全上下文才能运行可调用的.
/** A security context with may be required to run a Callable. */
public interface SecurityContext {
<T> T runSecured(Callable<T> securedCallable) throws Exception;
}
/**
* Parses the command line arguments and starts the requested action.
*
* @param args command line arguments of the client.
* @return The return code of the program
*/
public int parseAndRun(String[] args) {
// check for action
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// get action 提取执行动作,比如run,list,cancel
String action = args[0];
// remove action from parameters 从参数中移除执行动作
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// do action
switch (action) {
case ACTION_RUN:
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
case ACTION_INFO:
info(params);
return 0;
case ACTION_CANCEL:
cancel(params);
return 0;
case ACTION_STOP:
stop(params);
return 0;
case ACTION_SAVEPOINT:
savepoint(params);
return 0;
case "-h":
case "--help":
CliFrontendParser.printHelp(customCommandLines);
return 0;
case "-v":
case "--version":
String version = EnvironmentInformation.getVersion();
String commitID = EnvironmentInformation.getRevisionInformation().commitId;
System.out.print("Version: " + version);
System.out.println(
commitID.equals(EnvironmentInformation.UNKNOWN)
? ""
: ", Commit ID: " + commitID);
return 0;
default:
System.out.printf("\"%s\" is not a valid action.\n", action);
System.out.println();
System.out.println(
"Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
System.out.println();
System.out.println(
"Specify the version option (-v or --version) to print Flink version.");
System.out.println();
System.out.println(
"Specify the help option (-h or --help) to get help on the command.");
return 1;
}
} catch (CliArgsException ce) {
return handleArgException(ce);
} catch (ProgramParametrizationException ppe) {
return handleParametrizationException(ppe);
} catch (ProgramMissingJobException pmje) {
return handleMissingJobException();
} catch (Exception e) {
return handleError(e);
}
}
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag 判断是否需要help操作 -h
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
// 1、验证并获取一个活跃的 自定义命令行
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
// 2、创建一个 ProgramOptions 区分 python or jar
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
// 3、获取 jar和其他依赖
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
// 4、 获取有效配置:HA的id、Target(session、per-job)、JobManager内存、TaskManager内存、每个TM的slot数..
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
// 5、执行程序
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
executeProgram(effectiveConfiguration, program);
}
}
从 1.4 初始化的CustomCommandLine 列表中获取活跃的
/**
* Gets the custom command-line for the arguments.
*
* @param commandLine The input to the command-line.
* @return custom command-line which is active (may only be one at a time)
*/
public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
LOG.debug("Custom commandlines: {}", customCommandLines);
for (CustomCommandLine cli : customCommandLines) {
LOG.debug(
"Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
if (cli.isActive(commandLine)) {
return cli;
}
}
throw new IllegalStateException("No valid command-line found.");
}
create
public static ProgramOptions create(CommandLine line) throws CliArgsException {
if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
// python 项目
return createPythonProgramOptions(line);
} else {
// java
return new ProgramOptions(line);
}
}
/** Get all provided libraries needed to run the program from the ProgramOptions. */
private List<URL> getJobJarAndDependencies(ProgramOptions programOptions)
throws CliArgsException {
// 入口类 -c
String entryPointClass = programOptions.getEntryPointClassName();
// 依赖jar路径 -j
String jarFilePath = programOptions.getJarFilePath();
try {
File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
} catch (FileNotFoundException | ProgramInvocationException e) {
throw new CliArgsException(
"Could not get job jar and dependencies from JAR file: " + e.getMessage(), e);
}
}
HA的id、Target(session、per-job)、JobManager内存、TaskManager内存、每个TM的slot数…
// 5、执行程序
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
executeProgram(effectiveConfiguration, program);
}
调用的是 ClientUtils.executeProgram
protected void executeProgram(final Configuration configuration, final PackagedProgram program)
throws ProgramInvocationException {
ClientUtils.executeProgram(
new DefaultExecutorServiceLoader(), configuration, program, false, false);
}