• Flink1.15源码解析--任务提交流程----flink run


    零、前言

    任务提交方式:运行命令行flink脚本
    使用flink脚本提交任务示例:

    flink run ...
    
    • 1

    从 flink 脚本可以看到 org.apache.flink.client.cli.CliFrontend入口类

    # Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
    exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
    
    • 1
    • 2

    主要功能是接收并解析命令行传入的命令,调用相应工具类执行命令
    提供以下actions:

    • run:编译并运行程序
    • cancel:取消正在运行的程序(官方不推荐使用该方式)
    • stop:使用保存点停止正在运行的程序(仅用于流作业)
    • savepoint:触发正在运行的作业的保存点或处置现有的保存点
    • info:显示程序执行计划(JSON)
    • list:列出正在运行和计划的程序

    一、CliFrontend

    接下来我们看 mian()如何执行 run 流程

    • 1、获取flink的conf目录的路径
    • 2、根据conf路径,加载配置
    • 3、封装命令行接口:按顺序Generic、Yarn、Default
        /** Submits the job based on the arguments. */
        public static void main(final String[] args) {
        	// 打印基本的环境信息
            EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
    
            // 1. find the configuration directory
            final String configurationDirectory = getConfigurationDirectoryFromEnv();
    
            // 2. load the global configuration flink-conf.yaml
            final Configuration configuration =
                    GlobalConfiguration.loadConfiguration(configurationDirectory);
    
            // 3. load the custom command lines 封装命令行接口:按顺序Generic、Yarn、Default
            final List<CustomCommandLine> customCommandLines =
                    loadCustomCommandLines(configuration, configurationDirectory);
    
            int retCode = 31; 
            try {
            	// 4、 创建 CliFrontend  对象
                final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
    			// 5、加载安全配置模块
                SecurityUtils.install(new SecurityConfiguration(cli.configuration));
                // 6.根据命令行参数进行Switch case 匹配,执行对应的action、回调,并返回状态码。这块是主要逻辑
                retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
            } catch (Throwable t) { //....        
            }finally {
                System.exit(retCode); // 7、获取执行返回状态码,关闭提交程序
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    详细流程分析如下

    1.1、打印基本的环境信息

    EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
    
    • 1

    logEnvironmentInfo 具体实现:

    /**
    	 * Logs information about the environment, like code revision, current user, Java version,
    	 * and JVM parameters.
    	 *
    	 * @param log The logger to log the information to.
    	 * @param componentName The component name to mention in the log.
    	 * @param commandLineArgs The arguments accompanying the starting the component.
    	 */
    	public static void logEnvironmentInfo(Logger log, String componentName, String[] commandLineArgs) {
    		if (log.isInfoEnabled()) {
    			// 得到代码git的最终提交id和日期
    	        RevisionInformation rev = getRevisionInformation();
    	        // 代码版本
    	        String version = getVersion();
    	        // JVM版本,利用JavaSDK自带的ManagementFactory类来获取。
    	        String jvmVersion = getJvmVersion();
    	        // JVM的启动参数,也是通过JavaSDK自带的ManagementFactory类来获取。
    	        String[] options = getJvmStartupOptionsArray();
    	        // JAVA_Home目录
    	        String javaHome = System.getenv("JAVA_HOME");
    	        // JVM的最大堆内存大小,单位Mb。
    	        long maxHeapMegabytes = getMaxJvmHeapMemory() >>> 20;
    	
    	        // 打印基本信息
    			log.info("--------------------------------------------------------------------------------");
    			log.info(" Starting " + componentName + " (Version: " + version + ", "
    					+ "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")");
    			log.info(" OS current user: " + System.getProperty("user.name"));
    			log.info(" Current Hadoop/Kerberos user: " + getHadoopUser());
    			log.info(" JVM: " + jvmVersion);
    			log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
    			log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : javaHome));
    			// hadoop的版本信息
    			String hadoopVersionString = getHadoopVersionString();
    			if (hadoopVersionString != null) {
    				log.info(" Hadoop version: " + hadoopVersionString);
    			} else {
    				log.info(" No Hadoop Dependency available");
    			}
    			// 打印JVM运行 参数
    			if (options.length == 0) {
    				log.info(" JVM Options: (none)");
    			}
    			else {
    				log.info(" JVM Options:");
    				for (String s: options) {
    					log.info("    " + s);
    				}
    			}
    			// 任务程序启动参数
    			if (commandLineArgs == null || commandLineArgs.length == 0) {
    				log.info(" Program Arguments: (none)");
    			}
    			else {
    				log.info(" Program Arguments:");
    				for (String s: commandLineArgs) {
    					log.info("    " + s);
    				}
    			}
    
    			log.info(" Classpath: " + System.getProperty("java.class.path"));
    
    			log.info("--------------------------------------------------------------------------------");
    		}
    	}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    1.2、获取 flink 配置文件目录

            // 1. find the configuration directory
            final String configurationDirectory = getConfigurationDirectoryFromEnv();
    
    • 1
    • 2

    根据环境变量 FLINK_CONF_DIR 获取 flink 配置文件目录

    1.3、加载 flink 配置文件解析成 Configuration 对象

    调用 GlobalConfiguration 的 loadConfiguration 方法加载 flink 配置文件 flink-conf.yaml 中的配置,解析后转成 Configuration 对象

            // 2. load the global configuration flink-conf.yaml
            final Configuration configuration =
                    GlobalConfiguration.loadConfiguration(configurationDirectory);
    
    • 1
    • 2
    • 3

    loadConfiguration 具体实现:

      /**
       * Loads the configuration files from the specified directory. If the dynamic properties
       * configuration is not null, then it is added to the loaded configuration.
       *
       * @param configDir directory to load the configuration from
       * @param dynamicProperties configuration file containing the dynamic properties. Null if none.
       * @return The configuration loaded from the given configuration directory
       */
      public static Configuration loadConfiguration(
          final String configDir, @Nullable final Configuration dynamicProperties) {
    
        if (configDir == null) {
          throw new IllegalArgumentException(
              "Given configuration directory is null, cannot load configuration");
        }
    
        final File confDirFile = new File(configDir);
        if (!(confDirFile.exists())) {
          throw new IllegalConfigurationException(
              "The given configuration directory name '"
                  + configDir
                  + "' ("
                  + confDirFile.getAbsolutePath()
                  + ") does not describe an existing directory.");
        }
        /** 1.判断配置目录是否为空,不为空获取配置文件,就是flink的配置文件flink-conf.yaml */
        // get Flink yaml configuration file
        final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
    
        if (!yamlConfigFile.exists()) {
          throw new IllegalConfigurationException(
              "The Flink config file '"
                  + yamlConfigFile
                  + "' ("
                  + confDirFile.getAbsolutePath()
                  + ") does not exist.");
        }
        /** 2.【核心逻辑】获取到文件文件后,调用loadYAMLResource方法,去解析yaml配置文件,并返回HashMap键值对形式的Configuration */
        Configuration configuration = loadYAMLResource(yamlConfigFile);
    
        if (dynamicProperties != null) {
          configuration.addAll(dynamicProperties);
        }
    
        return enrichWithEnvironmentVariables(configuration);
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    1.4、加载自定义命令行(CustomCommandLine)

    调用loadCustomCommandLines方法,加载 自定义命令行(CustomCommandLine)

           // 3. load the custom command lines
            final List<CustomCommandLine> customCommandLines =
                    loadCustomCommandLines(configuration, configurationDirectory);
    
    • 1
    • 2
    • 3

    调用 loadCustomCommandLines 方法,加载自定义命令行

    • 1、创建一个 GenericCLI
      1. 通过反射 添加yarn模式命令行
      • 添加异常时, 添加 FallbackYarnSessionCli
    • 3、添加 DefaultCLI
        public static List<CustomCommandLine> loadCustomCommandLines(
                Configuration configuration, String configurationDirectory) {
            List<CustomCommandLine> customCommandLines = new ArrayList<>();
            // 1、创建一个 GenericCLI
            customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
    
            //	Command line interface of the YARN session, with a special initialization here
            //	to prefix all options with y/yarn.
            //  2. YARN会话的命令行接口,所有选项参数都是以y/yarn前缀。
            final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
            try {
                // 3. 添加yarn模式命令行
                customCommandLines.add(
                        loadCustomCommandLine(
                                flinkYarnSessionCLI,
                                configuration,
                                configurationDirectory,
                                "y",
                                "yarn"));
            } catch (NoClassDefFoundError | Exception e) {
                final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
                try {
                    LOG.info("Loading FallbackYarnSessionCli");
                    // 4、出现异常时,添加 FallbackYarnSessionCli
                    customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
                } catch (Exception exception) {
                    LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
                }
            }
    
            //	Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
            // the active CustomCommandLine in order and DefaultCLI isActive always return true.
            // 5、添加 DefaultCLI
            customCommandLines.add(new DefaultCLI());
    
            return customCommandLines;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    类图关系如下:
    在这里插入图片描述

    后面章节讲解获取活跃状态的命令行客户端,就是下面封装的GenericCLI、FlinkYarnSessionCli、DefaultCLI 三个客户端。按顺序判断那个是活跃,谁活跃就使用谁,然后跳出判断,返回结果。下面介绍其判断逻辑。

    • GenericCLI:存在execution.target、-e 、–executor、-t、–target这几个配置或参数,且值不为null,则使用GenericCLI。
    • FlinkYarnSessionCli:-m --jobmanager的值等于yarn-cluster 或 参数中传入的yarn applicationId值存在 或 execution.target的值为yarn-session或yarn-pre-job
    • DefaultCLI:默认返回true,standalone模式使用

    1.4.1、创建一个 GenericCLI

            // 1、创建一个 GenericCLI
            customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
    
    • 1
    • 2

    1.4.2、 通过反射 添加yarn模式命令行

    在这里插入图片描述
    通过反射构建 yarn 命令行

    /**
      * 通过反射构建命令行
      * @param className 加载的类名全程.
      * @param params 构建参数
      */
    private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
    
        // 1. 加载classpath里相关的类,这个加载的类实现了CustomCommandLine接口
        Class<? extends CustomCommandLine> customCliClass =
            Class.forName(className).asSubclass(CustomCommandLine.class);
    
        // 2. 从参数里构建出参数的Class类型
        Class<?>[] types = new Class<?>[params.length];
        for (int i = 0; i < params.length; i++) {
            Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
            types[i] = params[i].getClass();
        }
        // 3. 生成构造器org.apache.flink.yarn.cli$FlinkYarnSessionCli
        Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
    
        // 4. 构造器实例化。调用org.apache.flink.yarn.cli$FlinkYarnSessionCli的构造方法,进行实例化。
        return constructor.newInstance(params);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    	/**
    	 * 初始化一个FlinkYarnSessionCli
    	 * @param configuration  全局的配置
    	 * @param configurationDirectory  全局的配置文件目录
    	 * @param shortPrefix   命令行参数的缩写前缀
    	 * @param longPrefix    命令行参数的展开前缀
    	 * @param acceptInteractiveInput 是否接受交互型输入
    	 * @throws FlinkException
    	 */
        public FlinkYarnSessionCli(
                Configuration configuration,
                ClusterClientServiceLoader clusterClientServiceLoader,
                String configurationDirectory,
                String shortPrefix,
                String longPrefix,
                boolean acceptInteractiveInput)
                throws FlinkException {
            // 1. 初始化参数
            super(configuration, shortPrefix, longPrefix);
            this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
            this.configurationDirectory = checkNotNull(configurationDirectory);
            this.acceptInteractiveInput = acceptInteractiveInput;
    
            // Create the command line options
    		// 2. 创建命令行选项
            query =
                    new Option(
                            shortPrefix + "q",
                            longPrefix + "query",
                            false,
                            "Display available YARN resources (memory, cores)");
            queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
            shipPath =
                    new Option(
                            shortPrefix + "t",
                            longPrefix + "ship",
                            true,
                            "Ship files in the specified directory (t for transfer)");
            flinkJar =
                    new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
            jmMemory =
                    new Option(
                            shortPrefix + "jm",
                            longPrefix + "jobManagerMemory",
                            true,
                            "Memory for JobManager Container with optional unit (default: MB)");
            tmMemory =
                    new Option(
                            shortPrefix + "tm",
                            longPrefix + "taskManagerMemory",
                            true,
                            "Memory per TaskManager Container with optional unit (default: MB)");
            slots =
                    new Option(
                            shortPrefix + "s",
                            longPrefix + "slots",
                            true,
                            "Number of slots per TaskManager");
            dynamicproperties =
                    Option.builder(shortPrefix + "D")
                            .argName("property=value")
                            .numberOfArgs(2)
                            .valueSeparator()
                            .desc("use value for given property")
                            .build();
            name =
                    new Option(
                            shortPrefix + "nm",
                            longPrefix + "name",
                            true,
                            "Set a custom name for the application on YARN");
            applicationType =
                    new Option(
                            shortPrefix + "at",
                            longPrefix + "applicationType",
                            true,
                            "Set a custom application type for the application on YARN");
            zookeeperNamespace =
                    new Option(
                            shortPrefix + "z",
                            longPrefix + "zookeeperNamespace",
                            true,
                            "Namespace to create the Zookeeper sub-paths for high availability mode");
            nodeLabel =
                    new Option(
                            shortPrefix + "nl",
                            longPrefix + "nodeLabel",
                            true,
                            "Specify YARN node label for the YARN application");
            help =
                    new Option(
                            shortPrefix + "h",
                            longPrefix + "help",
                            false,
                            "Help for the Yarn session CLI.");
    
            allOptions = new Options();
            allOptions.addOption(flinkJar);
            allOptions.addOption(jmMemory);
            allOptions.addOption(tmMemory);
            allOptions.addOption(queue);
            allOptions.addOption(query);
            allOptions.addOption(shipPath);
            allOptions.addOption(slots);
            allOptions.addOption(dynamicproperties);
            allOptions.addOption(DETACHED_OPTION);
            allOptions.addOption(YARN_DETACHED_OPTION);
            allOptions.addOption(name);
            allOptions.addOption(applicationId);
            allOptions.addOption(applicationType);
            allOptions.addOption(zookeeperNamespace);
            allOptions.addOption(nodeLabel);
            allOptions.addOption(help);
    
            // try loading a potential yarn properties file
            // 3. 加载默认的yarn配置文件
            this.yarnPropertiesFileLocation =
                    configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION);
            final File yarnPropertiesLocation = getYarnPropertiesLocation(yarnPropertiesFileLocation);
    
    		// 4. 解析出yarn的配置参数
            yarnPropertiesFile = new Properties();
    
            if (yarnPropertiesLocation.exists()) {
                LOG.info(
                        "Found Yarn properties file under {}.",
                        yarnPropertiesLocation.getAbsolutePath());
    
                try (InputStream is = new FileInputStream(yarnPropertiesLocation)) {
                    yarnPropertiesFile.load(is);
                } catch (IOException ioe) {
                    throw new FlinkException(
                            "Could not read the Yarn properties file "
                                    + yarnPropertiesLocation
                                    + ". Please delete the file at "
                                    + yarnPropertiesLocation.getAbsolutePath()
                                    + '.',
                            ioe);
                }
    
                final String yarnApplicationIdString =
                        yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);
    
                if (yarnApplicationIdString == null) {
                    throw new FlinkException(
                            "Yarn properties file found but doesn't contain a "
                                    + "Yarn application id. Please delete the file at "
                                    + yarnPropertiesLocation.getAbsolutePath());
                }
    
                try {
                    // try converting id to ApplicationId
                    // 尝试将id转化成ApplicationId
                    yarnApplicationIdFromYarnProperties =
                            ConverterUtils.toApplicationId(yarnApplicationIdString);
                } catch (Exception e) {
                    throw new FlinkException(
                            "YARN properties contain an invalid entry for "
                                    + "application id: "
                                    + yarnApplicationIdString
                                    + ". Please delete the file at "
                                    + yarnPropertiesLocation.getAbsolutePath(),
                            e);
                }
            } else {
                yarnApplicationIdFromYarnProperties = null;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    1.4.2.1、添加异常时, 添加 FallbackYarnSessionCli
                final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
                try {
                    LOG.info("Loading FallbackYarnSessionCli");
                    // 4、出现异常时,添加 FallbackYarnSessionCli
                    customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
                } catch (Exception exception) {
                    LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    1.4.3、添加 DefaultCLI

    默认命令行, standalone模式使用

    1.5、初始化 CliFrontend 对象

        public CliFrontend(
                Configuration configuration,
                ClusterClientServiceLoader clusterClientServiceLoader,
                List<CustomCommandLine> customCommandLines) {
            // 1. 初始化对象属性判断是否为空
            this.configuration = checkNotNull(configuration);
            this.customCommandLines = checkNotNull(customCommandLines);
            this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
    
            // 2. 初始化文件系统
            FileSystem.initialize(
                    configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
    
            this.customCommandLineOptions = new Options();
            // 3. 给命令行对象添加选项
            // 获取用户命令行配置customCommandLines,遍历list将其添加到运行配置和一般配置中
            for (CustomCommandLine customCommandLine : customCommandLines) {
                customCommandLine.addGeneralOptions(customCommandLineOptions);
                customCommandLine.addRunOptions(customCommandLineOptions);
            }
            // 4. 从全局配置里得到akka 客户端等待超时时间(akka.client.timeout)
            this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
            // 5. 从全局配置里得到默认的系统并行度
            this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    1.6、通过 SPI 加载安全配置模块

    更详细的内容见 Flink1.15源码解析–安全模块及安全上下文

     SecurityUtils.install(new SecurityConfiguration(cli.configuration));
    
    • 1

    1.7、根据命令行参数执行对应的action、回调,并返回状态码

    这块是主要逻辑

    retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
    
    • 1

    可能需要具有的安全上下文才能运行可调用的.

    
    /** A security context with may be required to run a Callable. */
    public interface SecurityContext {
    
        <T> T runSecured(Callable<T> securedCallable) throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    1.7.1、具体执行逻辑是 CliFrontend.parseAndRun(args)

    
        /**
         * Parses the command line arguments and starts the requested action.
         *
         * @param args command line arguments of the client.
         * @return The return code of the program
         */
        public int parseAndRun(String[] args) {
    
            // check for action
            if (args.length < 1) {
                CliFrontendParser.printHelp(customCommandLines);
                System.out.println("Please specify an action.");
                return 1;
            }
    
            // get action 提取执行动作,比如run,list,cancel
            String action = args[0];
    
            // remove action from parameters 从参数中移除执行动作
            final String[] params = Arrays.copyOfRange(args, 1, args.length);
    
            try {
                // do action
                switch (action) {
                    case ACTION_RUN:
                        run(params);
                        return 0;
                    case ACTION_RUN_APPLICATION:
                        runApplication(params);
                        return 0;
                    case ACTION_LIST:
                        list(params);
                        return 0;
                    case ACTION_INFO:
                        info(params);
                        return 0;
                    case ACTION_CANCEL:
                        cancel(params);
                        return 0;
                    case ACTION_STOP:
                        stop(params);
                        return 0;
                    case ACTION_SAVEPOINT:
                        savepoint(params);
                        return 0;
                    case "-h":
                    case "--help":
                        CliFrontendParser.printHelp(customCommandLines);
                        return 0;
                    case "-v":
                    case "--version":
                        String version = EnvironmentInformation.getVersion();
                        String commitID = EnvironmentInformation.getRevisionInformation().commitId;
                        System.out.print("Version: " + version);
                        System.out.println(
                                commitID.equals(EnvironmentInformation.UNKNOWN)
                                        ? ""
                                        : ", Commit ID: " + commitID);
                        return 0;
                    default:
                        System.out.printf("\"%s\" is not a valid action.\n", action);
                        System.out.println();
                        System.out.println(
                                "Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
                        System.out.println();
                        System.out.println(
                                "Specify the version option (-v or --version) to print Flink version.");
                        System.out.println();
                        System.out.println(
                                "Specify the help option (-h or --help) to get help on the command.");
                        return 1;
                }
            } catch (CliArgsException ce) {
                return handleArgException(ce);
            } catch (ProgramParametrizationException ppe) {
                return handleParametrizationException(ppe);
            } catch (ProgramMissingJobException pmje) {
                return handleMissingJobException();
            } catch (Exception e) {
                return handleError(e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    1.7.2、执行 run 操作

        /**
         * Executions the run action.
         *
         * @param args Command line arguments for the run action.
         */
        protected void run(String[] args) throws Exception {
            LOG.info("Running 'run' command.");
    
            final Options commandOptions = CliFrontendParser.getRunCommandOptions();
            final CommandLine commandLine = getCommandLine(commandOptions, args, true);
    
            // evaluate help flag  判断是否需要help操作 -h
            if (commandLine.hasOption(HELP_OPTION.getOpt())) {
                CliFrontendParser.printHelpForRun(customCommandLines);
                return;
            }
    
            // 1、验证并获取一个活跃的 自定义命令行
            final CustomCommandLine activeCommandLine =
                    validateAndGetActiveCommandLine(checkNotNull(commandLine));
    
            // 2、创建一个 ProgramOptions 区分 python or jar
            final ProgramOptions programOptions = ProgramOptions.create(commandLine);
    
            // 3、获取 jar和其他依赖
            final List<URL> jobJars = getJobJarAndDependencies(programOptions);
    
            // 4、 获取有效配置:HA的id、Target(session、per-job)、JobManager内存、TaskManager内存、每个TM的slot数..
            final Configuration effectiveConfiguration =
                    getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
    
            LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
    
            // 5、执行程序
            try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
                executeProgram(effectiveConfiguration, program);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    1.7.2.1、验证并获取一个活跃的 自定义命令行

    1.4 初始化的CustomCommandLine 列表中获取活跃的

        /**
         * Gets the custom command-line for the arguments.
         *
         * @param commandLine The input to the command-line.
         * @return custom command-line which is active (may only be one at a time)
         */
        public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
            LOG.debug("Custom commandlines: {}", customCommandLines);
            for (CustomCommandLine cli : customCommandLines) {
                LOG.debug(
                        "Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
                if (cli.isActive(commandLine)) {
                    return cli;
                }
            }
            throw new IllegalStateException("No valid command-line found.");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    1.7.2.2、创建一个 ProgramOptions 区分 python or jar

    create

        public static ProgramOptions create(CommandLine line) throws CliArgsException {
            if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
            	// python 项目
                return createPythonProgramOptions(line);
            } else {
            	// java
                return new ProgramOptions(line);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    1.7.2.3、获取 jar和其他依赖
        /** Get all provided libraries needed to run the program from the ProgramOptions. */
        private List<URL> getJobJarAndDependencies(ProgramOptions programOptions)
                throws CliArgsException {
            // 入口类 -c
            String entryPointClass = programOptions.getEntryPointClassName();
            // 依赖jar路径 -j
            String jarFilePath = programOptions.getJarFilePath();
    
            try {
                File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
                return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
            } catch (FileNotFoundException | ProgramInvocationException e) {
                throw new CliArgsException(
                        "Could not get job jar and dependencies from JAR file: " + e.getMessage(), e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    1.7.2.4、 获取有效配置

    HA的id、Target(session、per-job)、JobManager内存、TaskManager内存、每个TM的slot数…

    1.7.2.5、执行程序
            // 5、执行程序
            try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
                executeProgram(effectiveConfiguration, program);
            }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    调用的是 ClientUtils.executeProgram

        protected void executeProgram(final Configuration configuration, final PackagedProgram program)
                throws ProgramInvocationException {
            ClientUtils.executeProgram(
                    new DefaultExecutorServiceLoader(), configuration, program, false, false);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1.8、获取执行返回状态码,关闭提交程序

    返回Flink1.15源码解析-总目录

  • 相关阅读:
    linux U盘无法使用,提示“Partition table entries are not in disk order“
    ES索引Json格式字段设计
    STM: SpatioTemporal and Motion Encoding for Action Recognition 论文阅读
    数据结构之二叉树(前提知识)
    基于java班费收支管理系统计算机毕业设计源码+系统+lw文档+mysql数据库+调试部署
    Websocket集群解决方案
    Linux开发之线程池详解
    Qt地铁智慧换乘系统浅学( 二 )将存储的站点线路信息绘制到graphicsView(图形视图部件)
    springbootMysql文华学院青年志愿者服务预约系统97973-计算机毕业设计项目选题推荐(附源码)
    Stanford CS 144, Lab 0: networking warmup 实验
  • 原文地址:https://blog.csdn.net/wuxintdrh/article/details/127810949