• 【Flink源码篇】Flink提交流程之flink命令自定义参数的解析和命令行客户端的选择


    1. 上文回顾

    上篇我们讲解了flink-conf.yaml的解析和3种flink命令行客户端的添加,现在到了客户端提交application部分了,这里我们先看如何进行flink命令自定义参数的解析的,和flink命令行客户端的选择

    2. flink命令自定义参数的解析

    2.1 CliFrontend的实例化

    flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.main中,CliFrontend的实例化实现如下:

        /** Submits the job based on the arguments. */
        public static void main(final String[] args) {
            EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
    ......省略部分......
            int retCode = 31;
            try {
                // 进行CliFrontend客户端的实例化
                final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
    
                // 安全模块的安装
                SecurityUtils.install(new SecurityConfiguration(cli.configuration));
                // 进行flink自定义参数的解析,并进行application的提交
                retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
            } 
            ......省略部分......
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    这里先对CliFrontend进行了实例化,我们来看CliFrontend的构造函数,完成的工作如下:

        public CliFrontend(Configuration configuration, List customCommandLines) {
            this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines);
        }
    
        public CliFrontend(
                Configuration configuration,
                ClusterClientServiceLoader clusterClientServiceLoader,
                List customCommandLines) {
            this.configuration = checkNotNull(configuration);
            this.customCommandLines = checkNotNull(customCommandLines);
            this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
    
            // 进行客户端的文件系统初始化。Flink的文件系统采用了插件的方式,以支持不同的文件系统
            FileSystem.initialize(
                    configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
    
            this.customCommandLineOptions = new Options();
    
            // 添加zookeeperNamespace、jobmanager、-D传递的参数的key和value到customCommandLineOptions
            // 和GenericCli、FlinkYarnSessionCli的参数
            for (CustomCommandLine customCommandLine : customCommandLines) {
                customCommandLine.addGeneralOptions(customCommandLineOptions);
                customCommandLine.addRunOptions(customCommandLineOptions);
            }
    
            // 获取客户端的超时时间和flink application的默认并行度
            this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
            this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    后面是安全模块的安装,再后面就是进行flink命令自定义参数的解析,然后提交flink的application了

    2.2 flink命令自定义参数的解析

    flink-clients/src/main/java/org.apache.flink.cli.CliFrontend.parseAndRun函数的实现如下:

        /**
         * Parses the command line arguments and starts the requested action.
         *
         * @param args command line arguments of the client.
         * @return The return code of the program
         */
        public int parseAndRun(String[] args) {
            // check for action
            if (args.length < 1) {
                CliFrontendParser.printHelp(customCommandLines);
                System.out.println("Please specify an action.");
                return 1;
            }
            // 获取执行的动作,我们这里是run
            // get action
            String action = args[0];
    
            // 除了run,剩余的参数
            // remove action from parameters
            final String[] params = Arrays.copyOfRange(args, 1, args.length);
    
            try {
                // do action
                switch (action) {
                    // run动作的执行方法
                    case ACTION_RUN:
                        run(params);
                        return 0;
                    case ACTION_RUN_APPLICATION:
                        runApplication(params);
                        return 0;
                        ......省略部分......
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    这里进行action动作的获取,然后执行run方法。传入的params是剩余的flink命令自定义参数。我们接下来看run方法是如何进行参数解析的

    flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.run函数实现如下:

        /**
         * Executions the run action.
         *
         * @param args Command line arguments for the run action.
         */
        protected void run(String[] args) throws Exception {
            LOG.info("Running 'run' command.");
    
            // 添加flink的默认参数help、verbose、fromSavepoint、allowNonRestoredState、restoreMode、jarfile、
            // class、classpath、parallelism、arguments、detached、shutdownOnAttachedExit、yarndetached、
            // python、pyFiles、pyModule、pyRequirements、pyArchives、pyExecutable、pyClientExecutable
            final Options commandOptions = CliFrontendParser.getRunCommandOptions();
            // 通过默认的参数,解析出只包含flink命令自定义的参数的commandLine
            final CommandLine commandLine = getCommandLine(commandOptions, args, true);
    
            // evaluate help flag
            if (commandLine.hasOption(HELP_OPTION.getOpt())) {
                CliFrontendParser.printHelpForRun(customCommandLines);
                return;
            }
    ......省略部分......
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    先获取默认的flink运行参数,再解析出只包含flink命令自定义的参数的commandLine。接下来看getCommand函数的实现

        public CommandLine getCommandLine(
                final Options commandOptions, final String[] args, final boolean stopAtNonOptions)
                throws CliArgsException {
            // 将flink默认的参数,和我们在CliFrontend构造函数创建的customCommandLineOptions进行追加
            // customCommandLineOptions包含zookeeperNamespace、jobmanager、-D传递的参数的key和value
            // 和GenericCli、FlinkYarnSessionCli的参数
            final Options commandLineOptions =
                    CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
            // 通过flink的参数,解析出只包含flink命令自定义的参数的commandLine
            return CliFrontendParser.parse(commandLineOptions, args, stopAtNonOptions);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    追加完flink 3种命令行客户端的参数,再解析出只包含flink命令自定义的参数的commandLine

    对CliFrontendParser.parse进行不停的跳转。最终调用了commons-cli-1.15.0.jar/org.apache.commons.cli.DefaultParser的parse方法。循环进行进行参数的处理

        public CommandLine parse(Options options, String[] arguments, Properties properties, boolean stopAtNonOption) throws ParseException {
            this.options = options;
            this.stopAtNonOption = stopAtNonOption;
            this.skipParsing = false;
            this.currentOption = null;
            this.expectedOpts = new ArrayList(options.getRequiredOptions());
            Iterator var5 = options.getOptionGroups().iterator();
    
            while(var5.hasNext()) {
                OptionGroup group = (OptionGroup)var5.next();
                group.setSelected((Option)null);
            }
    
            this.cmd = new CommandLine();
            if (arguments != null) {
                String[] var9 = arguments;
                int var10 = arguments.length;
                
                // 循环处理每个参数
                for(int var7 = 0; var7 < var10; ++var7) {
                    String argument = var9[var7];
                    // 对参数进行处理
                    this.handleToken(argument);
                }
            }
    
            this.checkRequiredArgs();
            this.handleProperties(properties);
            this.checkRequiredOptions();
            return this.cmd;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    commons-cli-1.15.0.jar/org.apache.commons.cli.DefaultParser的handleToken方法如下:

        private void handleToken(String token) throws ParseException {
            this.currentToken = token;
            if (this.skipParsing) {
                this.cmd.addArg(token);
            } else if ("--".equals(token)) {
                this.skipParsing = true;
            // flink命令传递的参数的值(非key)的处理
            } else if (this.currentOption != null && this.currentOption.acceptsArg() && this.isArgument(token)) {
               // 将flink命令传递的参数的值,添加到上一步设置的this.currentOption
               this.currentOption.addValueForProcessing(this.stripLeadingAndTrailingQuotesDefaultOn(token));
            // 长参数--arg的处理,并设置为this.currentOption。对currentOption的修改就是对commandLineOptions的修改
            } else if (token.startsWith("--")) {
                this.handleLongOption(token);
            // 短参数-arg的处理,并设置为this.currentOption。对currentOption的修改就是对commandLineOptions的修改
            } else if (token.startsWith("-") && !"-".equals(token)) {
                this.handleShortAndLongOption(token);
            } else {
                this.handleUnknownToken(token);
            }
    
            if (this.currentOption != null && !this.currentOption.acceptsArg()) {
                this.currentOption = null;
            }
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    handleToken的主要处理逻辑是:

    • 分别对短参数-arg和长参数–arg进行处理
    • 如果flink命令传递的参数是key,且在之前的commandLineOptions中,则设置this.currentOption为该key。对currentOption的修改就是对commandLineOptions的修改
    • 如果flink命令传递的参数是value,则将该value添加到上一步设置的this.currentOption

    3. flink run --help大致流程

    当我们执行flink run --help,是如何打印出帮助信息的

    flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.run函数实现如下:

        /**
         * Executions the run action.
         *
         * @param args Command line arguments for the run action.
         */
        protected void run(String[] args) throws Exception {
    ......省略部分......
            final CommandLine commandLine = getCommandLine(commandOptions, args, true);
    
    	   // 打印flink run --help帮助信息
            // evaluate help flag
            if (commandLine.hasOption(HELP_OPTION.getOpt())) {
                CliFrontendParser.printHelpForRun(customCommandLines);
                return;
            }
    
            final CustomCommandLine activeCommandLine =
                    validateAndGetActiveCommandLine(checkNotNull(commandLine));
    ......省略部分......
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    如何run后面跟了–help,就会打印帮助信息

    flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontendParser.printHelpForRun函数实现如下:

        public static void printHelpForRun(Collection customCommandLines) {
            HelpFormatter formatter = new HelpFormatter();
            // 设置左边留多少空格
            formatter.setLeftPadding(5);
            // 设置打印显示的宽度
            formatter.setWidth(80);
    
            System.out.println("\nAction \"run\" compiles and runs a program.");
            System.out.println("\n  Syntax: run [OPTIONS]  ");
            // 设置action区域的的标题信息,后面和Options一起打印
            formatter.setSyntaxPrefix("  \"run\" action options:");
            // 获取run的Options,然后进行打印
            formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
    
            // 循环设置每个命令行客户端的SyntaxPrefix,后面和Options一起打印,再获取命令行客户端的Options,然后进行打印
            printCustomCliOptions(customCommandLines, formatter, true);
    
            System.out.println();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    这里打印信息是通过HelpFormatter这个类来实现的,具体的实现这里就不贴出来了,注意逻辑是将每一部分要打印的信息按格式追加到StringBuffer中,最后用PrintWriter进行输出

    4. flink命令行客户端的选择

    上一篇,我们讲了3种flink命令行客户端的添加,将GenericCli、flinkYarnSessionCLI、DefaultCLI这3种命令行客户端添加到了customCommandLines这个ArrayList里面了。现在看flink最终选择了哪种命令行客户端

    flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.run函数实现如下:

        /**
         * Executions the run action.
         *
         * @param args Command line arguments for the run action.
         */
        protected void run(String[] args) throws Exception {
    ......省略部分......
            // evaluate help flag
            if (commandLine.hasOption(HELP_OPTION.getOpt())) {
                CliFrontendParser.printHelpForRun(customCommandLines);
                return;
            }
    
            // 获取active的flink命令行客户端
            final CustomCommandLine activeCommandLine =
                    validateAndGetActiveCommandLine(checkNotNull(commandLine));
    ......省略部分......
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    获取active的flink命令行客户端

    flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.validateAndGetActiveCommandLine函数实现如下:

        /**
         * Gets the custom command-line for the arguments.
         *
         * @param commandLine The input to the command-line.
         * @return custom command-line which is active (may only be one at a time)
         */
        public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
            LOG.debug("Custom commandlines: {}", customCommandLines);
            for (CustomCommandLine cli : customCommandLines) {
                LOG.debug(
                        "Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
                if (cli.isActive(commandLine)) {
                    return cli;
                }
            }
            throw new IllegalStateException("No valid command-line found.");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    循环列表,谁先符合active条件,则直接返回退出。我们这里返回的是最后一个命令行客户端DefaultCli

  • 相关阅读:
    Power BI 矩阵总计放表第一列
    港科大提出适用于夜间场景语义分割的无监督域自适应新方法
    [12] 使用 CUDA 加速排序算法
    基于 Linux 的 Docker Swarm 集群部署及应用
    Vue使用 dhtmlx-gantt 甘特图
    深度理解实分析:超越公式与算法的学习方法
    Git新技能-stash操作
    深度学习| 注意力机制
    计算机视觉+深度学习+机器学习+opencv+目标检测跟踪+一站式学习(代码+视频+PPT)
    MySql 数据库【数据库设计的三范式】
  • 原文地址:https://blog.csdn.net/yy8623977/article/details/125803988