• flink1.15源码笔记


    Flink1.15源码

    flink入口类

    "org.apache.flink.client.cli.CliFrontend"
    
    • 1
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
        // 1. System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR)找到配置文件路径
        final String configurationDirectory = getConfigurationDirectoryFromEnv();
        // 2. 加载全局配置
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);
        // 3. 加载命令行参数配置
        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);
        int retCode = 31;
        try {
            final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
            SecurityUtils.install(new SecurityConfiguration(cli.configuration));
            // 解析参数并执行
            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args)); 
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", strippedThrowable);
            strippedThrowable.printStackTrace();
        } finally {
            System.exit(retCode);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    parseAndRun(args)

        // 核对是否有参数
        if (args.length < 1) {
            CliFrontendParser.printHelp(customCommandLines);
            System.out.println("Please specify an action.");
            return 1;
        }
        // 提交模式
        String action = args[0];
        switch (action) {
            case ACTION_RUN: // action为'run',其中有两种提交模式:'-t per-job' 和'-t yarn-session'
                run(params);
                return 0;
            case ACTION_RUN_APPLICATION: // action为'run-application',提交模式为:'-t yarn-application'
                runApplication(params);
                return 0;
            ...
                    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    A、run(per-job&yarn-session)

        protected void run(String[] args) throws Exception {
            LOG.info("Running 'run' command.");
            // 添加配置参数:SAVEPOINT_PATH_OPTION,SAVEPOINT_ALLOW_NON_RESTORED_OPTION,SAVEPOINT_RESTORE_MODE
            final Options commandOptions = CliFrontendParser.getRunCommandOptions();
            // 合并配置参数'commandOptions'和命令行输入的'args'参数
            final CommandLine commandLine = getCommandLine(commandOptions, args, true);
    
            // evaluate help flag ,"Show the help message for the CLI Frontend or the action."
            if (commandLine.hasOption(HELP_OPTION.getOpt())) {
                CliFrontendParser.printHelpForRun(customCommandLines);
                return;
            }
            // 验证参数是否可用
            final CustomCommandLine activeCommandLine =
                    validateAndGetActiveCommandLine(checkNotNull(commandLine));
            // 封装参数
            final ProgramOptions programOptions = ProgramOptions.create(commandLine);
            // 返回可用jar包
            final List<URL> jobJars = getJobJarAndDependencies(programOptions);
            // 封装所有有效配置 -> Configuration
            final Configuration effectiveConfiguration =
                    getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
    
            LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
    
            try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
                executeProgram(effectiveConfiguration, program);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    1、ProgramOptions.create(封装参数)

        public static ProgramOptions create(CommandLine line) throws CliArgsException {
            if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
                return createPythonProgramOptions(line);
            } else {
                // 返回项目参数,如 class,jar, parallelism
                return new ProgramOptions(line);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1.1、ProgramOptions
        protected ProgramOptions(CommandLine line) throws CliArgsException {
            super(line);
            ...
                ...
            this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2、getJobJarAndDependencies(返回可用jar包)

        private List<URL> getJobJarAndDependencies(ProgramOptions programOptions)
                throws CliArgsException {
            // 入口类
            String entryPointClass = programOptions.getEntryPointClassName();
            // jar包路径
            String jarFilePath = programOptions.getJarFilePath();
    
            try {
                File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
                return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
            } ...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    2.1、getJobJarAndDependencies
        public static List<URL> getJobJarAndDependencies(
                File jarFile, @Nullable String entryPointClassName) throws ProgramInvocationException {
            URL jarFileUrl = loadJarFile(jarFile);
    
            List<File> extractedTempLibraries =
                    jarFileUrl == null
                            ? Collections.emptyList()
                            : extractContainedLibraries(jarFileUrl);
    
            List<URL> libs = new ArrayList<URL>(extractedTempLibraries.size() + 1);
    
            if (jarFileUrl != null) {
                libs.add(jarFileUrl);
            }
            for (File tmpLib : extractedTempLibraries) {
                try {
                    libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
                } catch (MalformedURLException e) {
                    throw new RuntimeException("URL is invalid. This should not happen.", e);
                }
            }
    
            if (isPython(entryPointClassName)) {
                libs.add(PackagedProgramUtils.getPythonJar());
            }
    
            return libs;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    3、executeProgram

        protected void executeProgram(final Configuration configuration, final PackagedProgram program)
                throws ProgramInvocationException {
            ClientUtils.executeProgram(
                    new DefaultExecutorServiceLoader(), configuration, program, false, false);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    3.1、ClientUtils.executeProgram
        public static void executeProgram(...)
                throws ProgramInvocationException {
                ...
                ...
                try {
                    program.invokeInteractiveModeForExecution();
                } ...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    3.2、invokeInteractiveModeForExecution
        public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
            ...
            try {
                callMainMethod(mainClass, args);
            } ...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    3.3、callMainMethod
        private static void callMainMethod(Class<?> entryClass, String[] args)
                throws ProgramInvocationException {
            Method mainMethod;
            ...
            try {
                mainMethod = entryClass.getMethod("main", String[].class);
            } 
            	...
            try {
                // 调用提交任务中jar包的class方法
                mainMethod.invoke(null, (Object) args);
            } 
            ...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    CVE-2023-3836:大华智慧园区综合管理平台任意文件上传漏洞复现
    数据结构(C语言) 实验-栈与字符串
    探索Elasticsearch的核心个问题
    java基础 io流 字节流 字符流 节点流 包装流 转换流 缓冲流 对象流 打印流 Properties类
    【C++】obj模型文件解析(tiny_obj_loader)
    (c语言)经典bug
    修改RuoYi部署路径 适配nginx子路径访问
    URL.createObjectURL()和URL.revokeObjectURL()
    【组件自定义事件+全局事件总线+消息订阅与发布+TodoList案例——编辑+过度与动画】
    蓝桥杯第十一届c++大学B组详解
  • 原文地址:https://blog.csdn.net/weixin_44429965/article/details/125554191