Flink1.15源码
flink入口类
"org.apache.flink.client.cli.CliFrontend"
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
final String configurationDirectory = getConfigurationDirectoryFromEnv();
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
try {
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
} finally {
System.exit(retCode);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
parseAndRun(args)
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
String action = args[0];
switch (action) {
case ACTION_RUN:
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
...
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
A、run(per-job&yarn-session)
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
executeProgram(effectiveConfiguration, program);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
1、ProgramOptions.create(封装参数)
public static ProgramOptions create(CommandLine line) throws CliArgsException {
if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
return createPythonProgramOptions(line);
} else {
return new ProgramOptions(line);
}
}
1.1、ProgramOptions
protected ProgramOptions(CommandLine line) throws CliArgsException {
super(line);
...
...
this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
}
2、getJobJarAndDependencies(返回可用jar包)
private List<URL> getJobJarAndDependencies(ProgramOptions programOptions)
throws CliArgsException {
String entryPointClass = programOptions.getEntryPointClassName();
String jarFilePath = programOptions.getJarFilePath();
try {
File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
} ...
}
2.1、getJobJarAndDependencies
public static List<URL> getJobJarAndDependencies(
File jarFile, @Nullable String entryPointClassName) throws ProgramInvocationException {
URL jarFileUrl = loadJarFile(jarFile);
List<File> extractedTempLibraries =
jarFileUrl == null
? Collections.emptyList()
: extractContainedLibraries(jarFileUrl);
List<URL> libs = new ArrayList<URL>(extractedTempLibraries.size() + 1);
if (jarFileUrl != null) {
libs.add(jarFileUrl);
}
for (File tmpLib : extractedTempLibraries) {
try {
libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
} catch (MalformedURLException e) {
throw new RuntimeException("URL is invalid. This should not happen.", e);
}
}
if (isPython(entryPointClassName)) {
libs.add(PackagedProgramUtils.getPythonJar());
}
return libs;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
3、executeProgram
protected void executeProgram(final Configuration configuration, final PackagedProgram program)
throws ProgramInvocationException {
ClientUtils.executeProgram(
new DefaultExecutorServiceLoader(), configuration, program, false, false);
}
3.1、ClientUtils.executeProgram
public static void executeProgram(...)
throws ProgramInvocationException {
...
...
try {
program.invokeInteractiveModeForExecution();
} ...
}
3.2、invokeInteractiveModeForExecution
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
...
try {
callMainMethod(mainClass, args);
} ...
}
3.3、callMainMethod
private static void callMainMethod(Class<?> entryClass, String[] args)
throws ProgramInvocationException {
Method mainMethod;
...
try {
mainMethod = entryClass.getMethod("main", String[].class);
}
...
try {
mainMethod.invoke(null, (Object) args);
}
...
}