• 【Flink源码】再谈Flink程序提交流程(上)


    前面在 【Flink源码】从StreamExecutionEnvironment.execute看Flink提交过程 一文中,我们着重探讨了 StreamExecutionEnvironment 的 execute 方法是如何提交一个任务的,当时为了省事,我们是以本地运行环境为例
    但是在实际的运行环境中,Flink 往往是架设在 Yarn 架构下以 per-job 模式运行的
    因此,为了还原真实场景下 Flink 程序的提交流程,我们有必要探讨 yarn-per-job 提交流程

    首先,让我们回顾一下 Flink 任务提交流程

    1662555979240

    1. Flink 提交任务,Client 向 HDFS 上传 Flink 的 Jar 包和配置
    2. 向 Yarn ResourceManager 提交任务
    3. ResourceManager 分配 Container 资源并通知对应的 NodeManager 启动 ApplicationMaster,ApplicationMaster 启动后加载 Flink 的 Jar 包和配置来构建环境,然后启动 JobManager
    4. ApplicationMaster 向 ResourceManager 申请资源启动 TaskManager
    5. ResourceManager 分配 Container 资源后,由 ApplicationMaster 通知资源所在节点的 NodeManager 启动 TaskManager,NodeManager 加载 Jar 和配置构建环境并启动 TaskManager
    6. TaskManager 启动后向 JobManager 发送心跳包,并等待 JobManager 向其分配任务

    yarn-per-job 提交流程

    回忆一下,我们在讲到 execute 提交流程时,一路探寻到 executeAsync 方法,并在该方法中发现是由 PipelineExecutor 的 execute 方法实际执行的。如下代码

    StreamExecutionEnvironment.java

    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        checkNotNull(streamGraph, "StreamGraph cannot be null.");
        final PipelineExecutor executor = getPipelineExecutor();
        // 选择合适的 executor 提交任务
        CompletableFuture<JobClient> jobClientFuture =
                executor.execute(streamGraph, configuration, userClassloader);
    
        try {
            JobClient jobClient = jobClientFuture.get();
            jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
            collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
            collectIterators.clear();
            return jobClient;
        } catch (ExecutionException executionException) {
            final Throwable strippedException =
                    ExceptionUtils.stripExecutionException(executionException);
            jobListeners.forEach(
                    jobListener -> jobListener.onJobSubmitted(null, strippedException));
    
            throw new FlinkException(
                    String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
                    strippedException);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    PipelineExecutor 作为 Pipeline 的执行器接口,根据不同的环境存在不同的实现类
    本文针对 yarn 环境下的实现类深入探讨任务提交流程
    首先找到 yarn 执行器实现类 AbstractJobClusterExecutor

    AbstractJobClusterExecutor.java

    public CompletableFuture<JobClient> execute(
            @Nonnull final Pipeline pipeline,
            @Nonnull final Configuration configuration,
            @Nonnull final ClassLoader userCodeClassloader)
            throws Exception {
        // 根据 StreamGraph 生成 JobGraph
        final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
    
        // 创建并启动 yarn 客户端
        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                clusterClientFactory.createClusterDescriptor(configuration)) {
            final ExecutionConfigAccessor configAccessor =
                    ExecutionConfigAccessor.fromConfiguration(configuration);
            // 获取集群配置参数
            final ClusterSpecification clusterSpecification =
                    clusterClientFactory.getClusterSpecification(configuration);
            // 部署集群
            final ClusterClientProvider<ClusterID> clusterClientProvider =
                    clusterDescriptor.deployJobCluster(
                            clusterSpecification, jobGraph, configAccessor.getDetachedMode());
            LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
    
            return CompletableFuture.completedFuture(
                    new ClusterClientJobClientAdapter<>(
                            clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在该方法中,通过 clusterClientFactory.createClusterDescriptor(configuration) 实现了

    1. yarn 客户端的创建与启动
    2. 获取集群配置参数
    3. 部署集群

    启动 yarn 客户端

    先说 yarn 客户端的启动
    createClusterDescriptor 是 继承了 ClusterClientFactory 接口的 ClientFactory 的方法
    我们找到 ClusterClientFactory 的 yarn 实现类 YarnClusterClientFactory

    YarnClusterClientFactory.java

    public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
        checkNotNull(configuration);
    
        final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
        YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
    
        return getClusterDescriptor(configuration);
    }
    
    private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
        // 创建 Yarn 客户端
        final YarnClient yarnClient = YarnClient.createYarnClient();
        // 获取 Yarn 配置
        final YarnConfiguration yarnConfiguration =
                Utils.getYarnAndHadoopConfiguration(configuration);
        // 根据配置初始化 Yarn 客户端
        yarnClient.init(yarnConfiguration);
        // 启动 Yarn 客户端
        yarnClient.start();
        
        // 生成 Yarn 集群描述器
        return new YarnClusterDescriptor(
                configuration,
                yarnConfiguration,
                yarnClient,
                YarnClientYarnClusterInformationRetriever.create(yarnClient),
                false);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    到了 getClusterDescriptor 这里就比较直观了,调用了 org.apache.hadoop.yarn.* 创建并启动 Yarn 客户端

    获取集群配置参数

    回到 AbstractJobClusterExecutor,第二步获取集群配置参数是通过 getClusterSpecification 方法完成,我们来看一看这个方法源码
    我们在 YarnClusterClientFactory 的父类 AbstractContainerizedClusterClientFactory 找到这个方法

    AbstractContainerizedClusterClientFactory.java

    public ClusterSpecification getClusterSpecification(Configuration configuration) {
        checkNotNull(configuration);
        
        // JobManager 配置参数
        final int jobManagerMemoryMB =
                JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                                configuration, JobManagerOptions.TOTAL_PROCESS_MEMORY)
                        .getTotalProcessMemorySize()
                        .getMebiBytes();
        
        // TaskManager 配置参数
        final int taskManagerMemoryMB =
                TaskExecutorProcessUtils.processSpecFromConfig(
                                TaskExecutorProcessUtils
                                        .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
                                                configuration,
                                                TaskManagerOptions.TOTAL_PROCESS_MEMORY))
                        .getTotalProcessMemorySize()
                        .getMebiBytes();
        
        // 每个 TaskManager 的 slot 数量
        int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
    
        return new ClusterSpecification.ClusterSpecificationBuilder()
                .setMasterMemoryMB(jobManagerMemoryMB)
                .setTaskManagerMemoryMB(taskManagerMemoryMB)
                .setSlotsPerTaskManager(slotsPerTaskManager)
                .createClusterSpecification();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    很清楚了,不多做解释了

    部署集群

    最后,我们来看一下 execute 的最后一个步骤部署集群,通过 deployJobCluster 方法实现
    我们找到 ClusterDescriptor 的 yarn 实现类 YarnClusterDescriptor

    YarnClusterDescriptor.java

    public ClusterClientProvider<ApplicationId> deployJobCluster(
            ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
            throws ClusterDeploymentException {
    
        LOG.warn(
                "Job Clusters are deprecated since Flink 1.15. Please use an Application Cluster/Application Mode instead.");
        try {
            return deployInternal(
                    clusterSpecification,
                    "Flink per-job cluster",
                    getYarnJobClusterEntrypoint(),   // 获取 YarnJobClusterEntryPoint,启动 AM 的入口
                    jobGraph,
                    detached);
        } catch (Exception e) {
            throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    可以看到,部署操作全部由 deployInternal 方法完成,下面我们就进入这个方法,看看它究竟做了什么事

    YarnClusterDescriptor.java

    private ClusterClientProvider<ApplicationId> deployInternal(
            ClusterSpecification clusterSpecification,
            String applicationName,
            String yarnClusterEntrypoint,
            @Nullable JobGraph jobGraph,
            boolean detached)
            throws Exception {
    
        final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
        if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
            boolean useTicketCache =
                    flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
    
            if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
                throw new RuntimeException(
                        "Hadoop security with Kerberos is enabled but the login user "
                                + "does not have Kerberos credentials or delegation tokens!");
            }
    
            final boolean fetchToken =
                    flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
            final boolean yarnAccessFSEnabled =
                    !CollectionUtil.isNullOrEmpty(
                            flinkConfiguration.get(
                                    SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS));
            if (!fetchToken && yarnAccessFSEnabled) {
                throw new IllegalConfigurationException(
                        String.format(
                                "When %s is disabled, %s must be disabled as well.",
                                SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
                                SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS.key()));
            }
        }
    
        isReadyForDeployment(clusterSpecification);
    
        // ------------------ Check if the specified queue exists --------------------
    
        checkYarnQueues(yarnClient);
    
        // ------------------ Check if the YARN ClusterClient has the requested resources
        // --------------
    
        // Create application via yarnClient
        // 创建 Yarn 客户端应用
        final YarnClientApplication yarnApplication = yarnClient.createApplication();
        final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
    
        Resource maxRes = appResponse.getMaximumResourceCapability();
    
        final ClusterResourceDescription freeClusterMem;
        try {
            freeClusterMem = getCurrentFreeClusterResources(yarnClient);
        } catch (YarnException | IOException e) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw new YarnDeploymentException(
                    "Could not retrieve information about free cluster resources.", e);
        }
    
        final int yarnMinAllocationMB =
                yarnConfiguration.getInt(
                        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
                        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
        if (yarnMinAllocationMB <= 0) {
            throw new YarnDeploymentException(
                    "The minimum allocation memory "
                            + "("
                            + yarnMinAllocationMB
                            + " MB) configured via '"
                            + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
                            + "' should be greater than 0.");
        }
    
        final ClusterSpecification validClusterSpecification;
        try {
            validClusterSpecification =
                    validateClusterResources(
                            clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
        } catch (YarnDeploymentException yde) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw yde;
        }
    
        LOG.info("Cluster specification: {}", validClusterSpecification);
    
        final ClusterEntrypoint.ExecutionMode executionMode =
                detached
                        ? ClusterEntrypoint.ExecutionMode.DETACHED
                        : ClusterEntrypoint.ExecutionMode.NORMAL;
    
        flinkConfiguration.setString(
                ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());
        
        // 启动 APP master
        ApplicationReport report =
                startAppMaster(
                        flinkConfiguration,
                        applicationName,
                        yarnClusterEntrypoint,
                        jobGraph,
                        yarnClient,
                        yarnApplication,
                        validClusterSpecification);
    
        // print the application id for user to cancel themselves.
        if (detached) {
            final ApplicationId yarnApplicationId = report.getApplicationId();
            logDetachedClusterInformation(yarnApplicationId, LOG);
        }
    
        setClusterEntrypointInfoToConfig(report);
    
        return () -> {
            try {
                return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
            } catch (Exception e) {
                throw new RuntimeException("Error while creating RestClusterClient.", e);
            }
        };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120

    值得注意的事,在该方法中创建了应用,并且通过 startAppMaster 方法启动了 app master
    此外,注释里说该方法会一直阻塞,直到 ApplicationMaster / JobManager 被部署在 Yarn 上
    下面我们来看一下 startAppMaster 做了什么事
    前方,超长方法预警!

    private ApplicationReport startAppMaster(
        Configuration configuration,
            String applicationName,
            String yarnClusterEntrypoint,
            JobGraph jobGraph,
            YarnClient yarnClient,
            YarnClientApplication yarnApplication,
            ClusterSpecification clusterSpecification)
            throws Exception {
    
        // ------------------ Initialize the file systems -------------------------
        // 初始化文件系统(HDFS)
        org.apache.flink.core.fs.FileSystem.initialize(
                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
    
        final FileSystem fs = FileSystem.get(yarnConfiguration);
    
        // hard coded check for the GoogleHDFS client because its not overriding the getScheme()
        // method.
        if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
                && fs.getScheme().startsWith("file")) {
            LOG.warn(
                    "The file system scheme is '"
                            + fs.getScheme()
                            + "'. This indicates that the "
                            + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
                            + "The Flink YARN client needs to store its files in a distributed file system");
        }
    
        ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
    
        // 获取文件上传路径
        final List<Path> providedLibDirs =
                Utils.getQualifiedRemoteProvidedLibDirs(configuration, yarnConfiguration);
        final Optional<Path> providedUsrLibDir =
                Utils.getQualifiedRemoteProvidedUsrLib(configuration, yarnConfiguration);
    
        Path stagingDirPath = getStagingDir(fs);
        FileSystem stagingDirFs = stagingDirPath.getFileSystem(yarnConfiguration);
        // 上传文件的工具类
        final YarnApplicationFileUploader fileUploader =
                YarnApplicationFileUploader.from(
                        stagingDirFs,
                        stagingDirPath,
                        providedLibDirs,
                        appContext.getApplicationId(),
                        getFileReplication());
    
        // The files need to be shipped and added to classpath.
        Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
        for (File file : shipFiles) {
            systemShipFiles.add(file.getAbsoluteFile());
        }
    
        final String logConfigFilePath =
                configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
        if (logConfigFilePath != null) {
            systemShipFiles.add(new File(logConfigFilePath));
        }
    
        // Set-up ApplicationSubmissionContext for the application
    
        final ApplicationId appId = appContext.getApplicationId();
    
        // ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
        setHAClusterIdIfNotSet(configuration, appId);
    
        // yarn 高可用设置
        if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
            // activate re-execution of failed applications
            // yarn 重试次数,默认 2
            appContext.setMaxAppAttempts(
                    configuration.getInteger(
                            YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
    
            activateHighAvailabilitySupport(appContext);
        } else {
            // set number of application retries to 1 in the default case
            // 不是高可用重试次数为 1
            appContext.setMaxAppAttempts(
                    configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
        }
    
        // 用户 jar 包
        final Set<Path> userJarFiles = new HashSet<>();
        if (jobGraph != null) {
            // 获取用户 jar 包
            userJarFiles.addAll(
                    jobGraph.getUserJars().stream()
                            .map(f -> f.toUri())
                            .map(Path::new)
                            .collect(Collectors.toSet()));
        }
    
        final List<URI> jarUrls =
                ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
        if (jarUrls != null
                && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
            userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
        }
    
        // only for per job mode
        if (jobGraph != null) {
            for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                    jobGraph.getUserArtifacts().entrySet()) {
                // only upload local files
                if (!Utils.isRemotePath(entry.getValue().filePath)) {
                    Path localPath = new Path(entry.getValue().filePath);
                    Tuple2<Path, Long> remoteFileInfo =
                            fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
                    jobGraph.setUserArtifactRemotePath(
                            entry.getKey(), remoteFileInfo.f0.toString());
                }
            }
    
            jobGraph.writeUserArtifactEntriesToConfiguration();
        }
    
        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            addLibFoldersToShipFiles(systemShipFiles);
        }
    
        // Register all files in provided lib dirs as local resources with public visibility
        // and upload the remaining dependencies as local resources with APPLICATION visibility.
        final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
    
        // 多次调用上传 HDFS 的方法,分别是:
        // => systemShipFiles:日志的配置文件、lib / 目录下除了 dist 的 jar 包
        // => shipOnlyFiles:plugins / 目录下的文件
        // => userJarFiles:用户代码的 jar 包
        final List<String> uploadedDependencies =
                fileUploader.registerMultipleLocalResources(
                        systemShipFiles.stream()
                                .map(e -> new Path(e.toURI()))
                                .collect(Collectors.toSet()),
                        Path.CUR_DIR,
                        LocalResourceType.FILE);
        systemClassPaths.addAll(uploadedDependencies);
    
        // upload and register ship-only files
        // Plugin files only need to be shipped and should not be added to classpath.
        // 上传 plugins/ 目录下的文件
        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            Set<File> shipOnlyFiles = new HashSet<>();
            addPluginsFoldersToShipFiles(shipOnlyFiles);
            fileUploader.registerMultipleLocalResources(
                    shipOnlyFiles.stream()
                            .map(e -> new Path(e.toURI()))
                            .collect(Collectors.toSet()),
                    Path.CUR_DIR,
                    LocalResourceType.FILE);
        }
    
        if (!shipArchives.isEmpty()) {
            fileUploader.registerMultipleLocalResources(
                    shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
                    Path.CUR_DIR,
                    LocalResourceType.ARCHIVE);
        }
    
        // only for application mode
        // Python jar file only needs to be shipped and should not be added to classpath.
        if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)
                && PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) {
            fileUploader.registerMultipleLocalResources(
                    Collections.singletonList(
                            new Path(PackagedProgramUtils.getPythonJar().toURI())),
                    ConfigConstants.DEFAULT_FLINK_OPT_DIR,
                    LocalResourceType.FILE);
        }
    
        // Upload and register user jars
        // 上传用户代码的 jar 包
        final List<String> userClassPaths =
                fileUploader.registerMultipleLocalResources(
                        userJarFiles,
                        userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
                                ? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
                                : Path.CUR_DIR,
                        LocalResourceType.FILE);
    
        // usrlib in remote will be used first.
        if (providedUsrLibDir.isPresent()) {
            final List<String> usrLibClassPaths =
                    fileUploader.registerMultipleLocalResources(
                            Collections.singletonList(providedUsrLibDir.get()),
                            Path.CUR_DIR,
                            LocalResourceType.FILE);
            userClassPaths.addAll(usrLibClassPaths);
        } else if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
            // local usrlib will be automatically shipped if it exists and there is no remote
            // usrlib.
            final Set<File> usrLibShipFiles = new HashSet<>();
            addUsrLibFolderToShipFiles(usrLibShipFiles);
            final List<String> usrLibClassPaths =
                    fileUploader.registerMultipleLocalResources(
                            usrLibShipFiles.stream()
                                    .map(e -> new Path(e.toURI()))
                                    .collect(Collectors.toSet()),
                            Path.CUR_DIR,
                            LocalResourceType.FILE);
            userClassPaths.addAll(usrLibClassPaths);
        }
    
        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
            systemClassPaths.addAll(userClassPaths);
        }
    
        // normalize classpath by sorting
        Collections.sort(systemClassPaths);
        Collections.sort(userClassPaths);
    
        // classpath assembler
        StringBuilder classPathBuilder = new StringBuilder();
        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        for (String classPath : systemClassPaths) {
            classPathBuilder.append(classPath).append(File.pathSeparator);
        }
    
        // Setup jar for ApplicationMaster
        final YarnLocalResourceDescriptor localResourceDescFlinkJar =
                fileUploader.uploadFlinkDist(flinkJarPath);
        classPathBuilder
                .append(localResourceDescFlinkJar.getResourceKey())
                .append(File.pathSeparator);
    
        // write job graph to tmp file and add it to local resource
        // TODO: server use user main method to generate job graph
        // 将 JobGraph 写入 tmp 文件并添加到本地资源,并上传到 HDFS
        if (jobGraph != null) {
            // 在本地创建 jobGraph 临时文件
            File tmpJobGraphFile = null;
            try {
                tmpJobGraphFile = File.createTempFile(appId.toString(), null);
                try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
                        ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
                    obOutput.writeObject(jobGraph);
                }
    
                final String jobGraphFilename = "job.graph";
                configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);
    
                fileUploader.registerSingleLocalResource(
                        jobGraphFilename,
                        new Path(tmpJobGraphFile.toURI()),
                        "",
                        LocalResourceType.FILE,
                        true,
                        false);
                classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
            } catch (Exception e) {
                LOG.warn("Add job graph to local resource fail.");
                throw e;
            } finally {
                if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
                    LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
                }
            }
        }
    
        // Upload the flink configuration
        // write out configuration file
        // 上传 Flink 配置文件 flink-conf.yaml
        File tmpConfigurationFile = null;
        try {
            tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
    
            // remove localhost bind hosts as they render production clusters unusable
            removeLocalhostBindHostSetting(configuration, JobManagerOptions.BIND_HOST);
            removeLocalhostBindHostSetting(configuration, TaskManagerOptions.BIND_HOST);
            // this setting is unconditionally overridden anyway, so we remove it for clarity
            configuration.removeConfig(TaskManagerOptions.HOST);
    
            BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
    
            String flinkConfigKey = "flink-conf.yaml";
            fileUploader.registerSingleLocalResource(
                    flinkConfigKey,
                    new Path(tmpConfigurationFile.getAbsolutePath()),
                    "",
                    LocalResourceType.FILE,
                    true,
                    true);
            classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
        } finally {
            if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
                LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
            }
        }
    
        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
    
        // To support Yarn Secure Integration Test Scenario
        // In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
        // the Yarn site XML
        // and KRB5 configuration files. We are adding these files as container local resources for
        // the container
        // applications (JM/TMs) to have proper secure cluster setup
        Path remoteYarnSiteXmlPath = null;
        if (System.getenv("IN_TESTS") != null) {
            File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
            LOG.info(
                    "Adding Yarn configuration {} to the AM container local resource bucket",
                    f.getAbsolutePath());
            Path yarnSitePath = new Path(f.getAbsolutePath());
            remoteYarnSiteXmlPath =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.YARN_SITE_FILE_NAME,
                                    yarnSitePath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            if (System.getProperty("java.security.krb5.conf") != null) {
                configuration.set(
                        SecurityOptions.KERBEROS_KRB5_PATH,
                        System.getProperty("java.security.krb5.conf"));
            }
        }
    
        // 上传权限验证信息
        Path remoteKrb5Path = null;
        boolean hasKrb5 = false;
        String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
        if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
            final File krb5 = new File(krb5Config);
            LOG.info(
                    "Adding KRB5 configuration {} to the AM container local resource bucket",
                    krb5.getAbsolutePath());
            final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
            remoteKrb5Path =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.KRB5_FILE_NAME,
                                    krb5ConfPath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            hasKrb5 = true;
        }
    
        Path remotePathKeytab = null;
        String localizedKeytabPath = null;
        String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        if (keytab != null) {
            boolean localizeKeytab =
                    flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
            localizedKeytabPath =
                    flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            if (localizeKeytab) {
                // Localize the keytab to YARN containers via local resource.
                LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
                remotePathKeytab =
                        fileUploader
                                .registerSingleLocalResource(
                                        localizedKeytabPath,
                                        new Path(keytab),
                                        "",
                                        LocalResourceType.FILE,
                                        false,
                                        false)
                                .getPath();
            } else {
                // // Assume Keytab is pre-installed in the container.
                localizedKeytabPath =
                        flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            }
        }
    
        final JobManagerProcessSpec processSpec =
                JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                        flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
        // 封装启动 AppMaster 容器的 Java 命令
        final ContainerLaunchContext amContainer =
                setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);
    
        // New delegation token framework
        if (configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
            setTokensFor(amContainer);
        }
        // Old delegation token framework
        if (UserGroupInformation.isSecurityEnabled()) {
            LOG.info("Adding delegation token to the AM container.");
            final List<Path> pathsToObtainToken = new ArrayList<>();
            boolean fetchToken =
                    configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
            if (fetchToken) {
                List<Path> yarnAccessList =
                        ConfigUtils.decodeListFromConfig(
                                configuration,
                                SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS,
                                Path::new);
                pathsToObtainToken.addAll(yarnAccessList);
                pathsToObtainToken.addAll(fileUploader.getRemotePaths());
            }
            Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);
        }
    
        amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
        // 上传完毕
        fileUploader.close();
    
        // Setup CLASSPATH and environment variables for ApplicationMaster
        // AppMaster 的环境配置
        final Map<String, String> appMasterEnv =
                generateApplicationMasterEnv(
                        fileUploader,
                        classPathBuilder.toString(),
                        localResourceDescFlinkJar.toString(),
                        appId.toString());
    
        if (localizedKeytabPath != null) {
            appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
            String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
            appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
            if (remotePathKeytab != null) {
                appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
            }
        }
    
        // To support Yarn Secure Integration Test Scenario
        if (remoteYarnSiteXmlPath != null) {
            appMasterEnv.put(
                    YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
        }
        if (remoteKrb5Path != null) {
            appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
        }
    
        // 设置 AM 容器环境信息
        amContainer.setEnvironment(appMasterEnv);
    
        // Set up resource type requirements for ApplicationMaster
        Resource capability = Records.newRecord(Resource.class);
        capability.setMemory(clusterSpecification.getMasterMemoryMB());
        capability.setVirtualCores(
                flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
    
        final String customApplicationName = customName != null ? customName : applicationName;
    
        appContext.setApplicationName(customApplicationName);
        appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(capability);
    
        // Set priority for application
        int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
        if (priorityNum >= 0) {
            Priority priority = Priority.newInstance(priorityNum);
            appContext.setPriority(priority);
        }
    
        if (yarnQueue != null) {
            appContext.setQueue(yarnQueue);
        }
    
        setApplicationNodeLabel(appContext);
    
        setApplicationTags(appContext);
    
        // add a hook to clean up in case deployment fails
        Thread deploymentFailureHook =
                new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        // YarnClient 提交应用,内部开始走 Hadoop Yarn 的源码
        yarnClient.submitApplication(appContext);
    
        LOG.info("Waiting for the cluster to be allocated");
        final long startTime = System.currentTimeMillis();
        ApplicationReport report;
        YarnApplicationState lastAppState = YarnApplicationState.NEW;
        loop:
        while (true) {
            try {
                report = yarnClient.getApplicationReport(appId);
            } catch (IOException e) {
                throw new YarnDeploymentException("Failed to deploy the cluster.", e);
            }
            YarnApplicationState appState = report.getYarnApplicationState();
            LOG.debug("Application State: {}", appState);
            switch (appState) {
                case FAILED:
                case KILLED:
                    throw new YarnDeploymentException(
                            "The YARN application unexpectedly switched to state "
                                    + appState
                                    + " during deployment. \n"
                                    + "Diagnostics from YARN: "
                                    + report.getDiagnostics()
                                    + "\n"
                                    + "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
                                    + "yarn logs -applicationId "
                                    + appId);
                    // break ..
                case RUNNING:
                    LOG.info("YARN application has been deployed successfully.");
                    break loop;
                case FINISHED:
                    LOG.info("YARN application has been finished successfully.");
                    break loop;
                default:
                    if (appState != lastAppState) {
                        LOG.info("Deploying cluster, current state " + appState);
                    }
                    if (System.currentTimeMillis() - startTime > 60000) {
                        LOG.info(
                                "Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
                    }
            }
            lastAppState = appState;
            Thread.sleep(250);
        }
    
        // since deployment was successful, remove the hook
        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
        return report;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379
    • 380
    • 381
    • 382
    • 383
    • 384
    • 385
    • 386
    • 387
    • 388
    • 389
    • 390
    • 391
    • 392
    • 393
    • 394
    • 395
    • 396
    • 397
    • 398
    • 399
    • 400
    • 401
    • 402
    • 403
    • 404
    • 405
    • 406
    • 407
    • 408
    • 409
    • 410
    • 411
    • 412
    • 413
    • 414
    • 415
    • 416
    • 417
    • 418
    • 419
    • 420
    • 421
    • 422
    • 423
    • 424
    • 425
    • 426
    • 427
    • 428
    • 429
    • 430
    • 431
    • 432
    • 433
    • 434
    • 435
    • 436
    • 437
    • 438
    • 439
    • 440
    • 441
    • 442
    • 443
    • 444
    • 445
    • 446
    • 447
    • 448
    • 449
    • 450
    • 451
    • 452
    • 453
    • 454
    • 455
    • 456
    • 457
    • 458
    • 459
    • 460
    • 461
    • 462
    • 463
    • 464
    • 465
    • 466
    • 467
    • 468
    • 469
    • 470
    • 471
    • 472
    • 473
    • 474
    • 475
    • 476
    • 477
    • 478
    • 479
    • 480
    • 481
    • 482
    • 483
    • 484
    • 485
    • 486
    • 487
    • 488
    • 489
    • 490
    • 491
    • 492
    • 493
    • 494
    • 495
    • 496
    • 497
    • 498
    • 499
    • 500
    • 501
    • 502
    • 503
    • 504
    • 505
    • 506
    • 507
    • 508
    • 509
    • 510
    • 511
    • 512
    • 513
    • 514
    • 515
    • 516
    • 517
    • 518
    • 519
    • 520
    • 521
    • 522
    • 523
    • 524
    • 525
    • 526
    • 527
    • 528
    • 529
    • 530
    • 531

    这五百多行代码完成了 App Master 的启动流程,看似很复杂,其实主要就做了两件事:

    1. 上传 jar 包和配置文件到 HDFS(官方注释和补充注释已经很清晰)
    2. 封装 ApplicationMaster(AM) 参数和命令

    细分过程如下:

    • FileSystem.initialize
      • 初始化文件系统
    • YarnApplicationFileUploader.from
      • 文件上传工具
    • 上传各种文件,具体如下:
      • 运行程序的 jar 包
      • 日志配置 log4j.properties
      • flink-dist.jar,即核心依赖包
      • jobGraph 的对象文件
      • flink 配置信息
    • setupApplicationMasterContainer
      • 设置 AM 容器
    • fileUploader.close()
      • 关闭文件上传
    • Map appMasterEnv = new HashMap<>()
      • 创建用于存放 AM 的环境信息
    • amContainer.setEnvironment(appMasterEnv)
      • 设置环境信息到 amContainer
    • yarnClient.submitApplication(appContext)
      • 提交应用,其中包含 AM 容器
    • ShutdownHookUtil.removeShutdownHook
      • 部署成功后,移除 hook

    接下来我们重点看一下第二点
    代码中通过 setupApplicationMasterContainer 方法完成了封装过程

    YarnClusterDescriptor.java

    ContainerLaunchContext setupApplicationMasterContainer(
            String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) {
        // ------------------ Prepare Application Master Container  ------------------------------
    
        // respect custom JVM options in the YAML file
        String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
        if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
            javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
        }
    
        // krb5.conf file will be available as local resource in JM/TM container
        if (hasKrb5) {
            javaOpts += " -Djava.security.krb5.conf=krb5.conf";
        }
    
        // Set up the container launch context for the application master
        // 创建 ApplicationMaster 的容器启动上下文
        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
    
        final Map<String, String> startCommandValues = new HashMap<>();
        startCommandValues.put("java", "$JAVA_HOME/bin/java");
    
        String jvmHeapMem =
                JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);
        startCommandValues.put("jvmmem", jvmHeapMem);
    
        startCommandValues.put("jvmopts", javaOpts);
        startCommandValues.put(
                "logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));
    
        startCommandValues.put("class", yarnClusterEntrypoint);
        startCommandValues.put(
                "redirects",
                "1> "
                        + ApplicationConstants.LOG_DIR_EXPANSION_VAR
                        + "/jobmanager.out "
                        + "2> "
                        + ApplicationConstants.LOG_DIR_EXPANSION_VAR
                        + "/jobmanager.err");
        String dynamicParameterListStr =
                JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);
        startCommandValues.put("args", dynamicParameterListStr);
    
        final String commandTemplate =
                flinkConfiguration.getString(
                        ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
                        ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
        final String amCommand =
                BootstrapTools.getStartCommand(commandTemplate, startCommandValues);
    
        amContainer.setCommands(Collections.singletonList(amCommand));
    
        LOG.debug("Application Master start command: " + amCommand);
    
        return amContainer;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    接下来我们再回到 setAppMaster 方法中回顾 AM 封装之后的操作

        final Map<String, String> appMasterEnv =
                generateApplicationMasterEnv(
                        fileUploader,
                        classPathBuilder.toString(),
                        localResourceDescFlinkJar.toString(),
                        appId.toString());
    
        if (localizedKeytabPath != null) {
            appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
            String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
            appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
            if (remotePathKeytab != null) {
                appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
            }
        }
    
        // To support Yarn Secure Integration Test Scenario
        if (remoteYarnSiteXmlPath != null) {
            appMasterEnv.put(
                    YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
        }
        if (remoteKrb5Path != null) {
            appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
        }
    
        // 设置 AM 容器环境信息
        amContainer.setEnvironment(appMasterEnv);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    提交应用

    在 startAppMaster 方法的最后,通过 yarnClient.submitApplication(appContext) 进行了应用的提交
    我们来看提交过程的源码

    YarnClientImpl.java

    public ApplicationId
          submitApplication(ApplicationSubmissionContext appContext)
              throws YarnException, IOException {
        ApplicationId applicationId = appContext.getApplicationId();
        if (applicationId == null) {
          throw new ApplicationIdNotProvidedException(
              "ApplicationId is not provided in ApplicationSubmissionContext");
        }
        SubmitApplicationRequest request =
            Records.newRecord(SubmitApplicationRequest.class);
        request.setApplicationSubmissionContext(appContext);
    
        // Automatically add the timeline DT into the CLC
        // Only when the security and the timeline service are both enabled
        if (isSecurityEnabled() && timelineV1ServiceEnabled &&
                getConfig().get(YarnConfiguration.TIMELINE_HTTP_AUTH_TYPE)
                        .equals(KerberosAuthenticationHandler.TYPE)) {
          addTimelineDelegationToken(appContext.getAMContainerSpec());
        }
    
        // Automatically add the DT for Log Aggregation path
        // This is useful when a separate storage is used for log aggregation
        try {
          if (isSecurityEnabled()) {
            addLogAggregationDelegationToken(appContext.getAMContainerSpec());
          }
        } catch (Exception e) {
          LOG.warn("Failed to obtain delegation token for Log Aggregation Path", e);
        }
    
        //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
        rmClient.submitApplication(request);
    
        int pollCount = 0;
        long startTime = System.currentTimeMillis();
        EnumSet<YarnApplicationState> waitingStates = 
                                     EnumSet.of(YarnApplicationState.NEW,
                                     YarnApplicationState.NEW_SAVING,
                                     YarnApplicationState.SUBMITTED);
        EnumSet<YarnApplicationState> failToSubmitStates = 
                                      EnumSet.of(YarnApplicationState.FAILED,
                                      YarnApplicationState.KILLED);		
        while (true) {
          try {
            ApplicationReport appReport = getApplicationReport(applicationId);
            YarnApplicationState state = appReport.getYarnApplicationState();
            if (!waitingStates.contains(state)) {
              if(failToSubmitStates.contains(state)) {
                throw new YarnException("Failed to submit " + applicationId + 
                    " to YARN : " + appReport.getDiagnostics());
              }
              LOG.info("Submitted application " + applicationId);
              break;
            }
    
            long elapsedMillis = System.currentTimeMillis() - startTime;
            if (enforceAsyncAPITimeout() &&
                elapsedMillis >= asyncApiPollTimeoutMillis) {
              throw new YarnException("Timed out while waiting for application " +
                  applicationId + " to be submitted successfully");
            }
    
            // Notify the client through the log every 10 poll, in case the client
            // is blocked here too long.
            if (++pollCount % 10 == 0) {
              LOG.info("Application submission is not finished, " +
                  "submitted application " + applicationId +
                  " is still in " + state);
            }
            try {
              Thread.sleep(submitPollIntervalMillis);
            } catch (InterruptedException ie) {
              String msg = "Interrupted while waiting for application "
                  + applicationId + " to be successfully submitted.";
              LOG.error(msg);
              throw new YarnException(msg, ie);
            }
          } catch (ApplicationNotFoundException ex) {
            // FailOver or RM restart happens before RMStateStore saves
            // ApplicationState
            LOG.info("Re-submit application " + applicationId + "with the " +
                "same ApplicationSubmissionContext");
            rmClient.submitApplication(request);
          }
        }
    
        return applicationId;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    ApplicationClientProtocolPBClientImpl.java

    public SubmitApplicationResponse submitApplication(
        SubmitApplicationRequest request) throws YarnException,
        IOException {
        // 取出报文
        SubmitApplicationRequestProto requestProto = 
                ((SubmitApplicationRequestPBImpl) request).getProto();
        // 将报文发送到服务端,并将返回结果构成 response
        try {
            return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto));
        } catch (ServiceException e) {
            RPCUtil.unwrapAndThrowException(e);
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    我们继续再看 proxy.submitApplication
    proxy 是 ApplicationClientProtocolPB 对象,找到其代理实现类 ApplicationClientProtocolPBServiceImpl

    ApplicationClientProtocolPBServiceImpl.java

    public SubmitApplicationResponseProto submitApplication(RpcController arg0,
        SubmitApplicationRequestProto proto) throws ServiceException {
        // 服务端重新构建报文
        SubmitApplicationRequestPBImpl request = new SubmitApplicationRequestPBImpl(proto);
    
        try {
            SubmitApplicationResponse response = real.submitApplication(request);
            return ((SubmitApplicationResponsePBImpl)response).getProto();
        } catch (YarnException e) {
            throw new ServiceException(e);
        } catch (IOException e) {
            throw new ServiceException(e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    real 是 ApplicationClientProtocol 的对象
    ApplicationClientProtocol 是一个接口,找到其实现类 ClientRMService

    ClientRMService.java

    public SubmitApplicationResponse submitApplication(
          SubmitApplicationRequest request) throws YarnException, IOException {
        ApplicationSubmissionContext submissionContext = request
            .getApplicationSubmissionContext();
        ApplicationId applicationId = submissionContext.getApplicationId();
        CallerContext callerContext = CallerContext.getCurrent();
    
        // ApplicationSubmissionContext needs to be validated for safety - only
        // those fields that are independent of the RM's configuration will be
        // checked here, those that are dependent on RM configuration are validated
        // in RMAppManager.
    
        UserGroupInformation userUgi = null;
        String user = null;
        try {
          // Safety
          userUgi = UserGroupInformation.getCurrentUser();
          user = userUgi.getShortUserName();
        } catch (IOException ie) {
          LOG.warn("Unable to get the current user.", ie);
          RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
              ie.getMessage(), "ClientRMService",
              "Exception in submitting application", applicationId, callerContext,
              submissionContext.getQueue());
          throw RPCUtil.getRemoteException(ie);
        }
    
        checkTags(submissionContext.getApplicationTags());
    
        if (timelineServiceV2Enabled) {
          // Sanity check for flow run
          String value = null;
          try {
            for (String tag : submissionContext.getApplicationTags()) {
              if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") ||
                  tag.startsWith(
                      TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
                value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length()
                    + 1);
                // In order to check the number format
                Long.valueOf(value);
              }
            }
          } catch (NumberFormatException e) {
            LOG.warn("Invalid to flow run: " + value +
                ". Flow run should be a long integer", e);
            RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
                e.getMessage(), "ClientRMService",
                "Exception in submitting application", applicationId,
                submissionContext.getQueue());
            throw RPCUtil.getRemoteException(e);
          }
        }
    
        // Check whether app has already been put into rmContext,
        // If it is, simply return the response
        if (rmContext.getRMApps().get(applicationId) != null) {
          LOG.info("This is an earlier submitted application: " + applicationId);
          return SubmitApplicationResponse.newInstance();
        }
    
        ByteBuffer tokenConf =
            submissionContext.getAMContainerSpec().getTokensConf();
        if (tokenConf != null) {
          int maxSize = getConfig()
              .getInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE,
                  YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES);
          LOG.info("Using app provided configurations for delegation token renewal,"
              + " total size = " + tokenConf.capacity());
          if (tokenConf.capacity() > maxSize) {
            throw new YarnException(
                "Exceed " + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE
                    + " = " + maxSize + " bytes, current conf size = "
                    + tokenConf.capacity() + " bytes.");
          }
        }
        if (submissionContext.getQueue() == null) {
          submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
        }
        if (submissionContext.getApplicationName() == null) {
          submissionContext.setApplicationName(
              YarnConfiguration.DEFAULT_APPLICATION_NAME);
        }
        if (submissionContext.getApplicationType() == null) {
          submissionContext
            .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
        } else {
          if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
            submissionContext.setApplicationType(submissionContext
              .getApplicationType().substring(0,
                YarnConfiguration.APPLICATION_TYPE_LENGTH));
          }
        }
    
        ReservationId reservationId = request.getApplicationSubmissionContext()
                .getReservationID();
    
        checkReservationACLs(submissionContext.getQueue(), AuditConstants
                .SUBMIT_RESERVATION_REQUEST, reservationId);
    
        if (this.contextPreProcessor != null) {
          this.contextPreProcessor.preProcess(Server.getRemoteIp().getHostName(),
              applicationId, submissionContext);
        }
    
        try {
          // call RMAppManager to submit application directly
          // 将应用请求提交到 Yarn 上的 RMAppManager 去提交任务
          rmAppManager.submitApplication(submissionContext,
              System.currentTimeMillis(), userUgi);
    
          LOG.info("Application with id " + applicationId.getId() + 
              " submitted by user " + user);
          RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
              "ClientRMService", applicationId, callerContext,
              submissionContext.getQueue(),
              submissionContext.getNodeLabelExpression());
        } catch (YarnException e) {
          LOG.info("Exception in submitting " + applicationId, e);
          RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
              e.getMessage(), "ClientRMService",
              "Exception in submitting application", applicationId, callerContext,
              submissionContext.getQueue(),
              submissionContext.getNodeLabelExpression());
          throw e;
        }
    
        return recordFactory
            .newRecordInstance(SubmitApplicationResponse.class);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130

    至此,我们通过 rmAppManager.submitApplication 方法将应用请求提交到 Yarn 上的 ResourceManager 去提交任务

  • 相关阅读:
    openfeign原理
    网络层协议 ——— IP协议
    【C++】C++11——右值引用和移动语义、左值引用和右值引用、右值引用使用场景和意义、完美转发、新的类功能
    云计算与大数据——Spark的安装和配置
    Python---数据容器分类及通用操作
    传统考勤太复杂怎么办?这个小技巧,我必须吹爆!
    pip安装Cartopy小白版
    移动端测试——移动端App抓包
    Vue实现刷新当前单个页面 适用于keep-alive【vue3适用】
    基于JavaFX的扫雷游戏实现(四)——排行榜
  • 原文地址:https://blog.csdn.net/wwb44444/article/details/127722741