• Flink Yarn Per Job - 提交应用


    图片

    YarnClusterDescriptor

    private ApplicationReport startAppMaster(
        Configuration configuration,
        String applicationName,
        String yarnClusterEntrypoint,
        JobGraph jobGraph,
        YarnClient yarnClient,
        YarnClientApplication yarnApplication,
        ClusterSpecification clusterSpecification) throws Exception {
        ... ... 
       yarnClient.submitApplication(appContext);
    
        ... ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    YarnClientImpl extends YarnClient
    
    • 1
    public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException {
        ApplicationId applicationId = appContext.getApplicationId();
        if (applicationId == null) {
            throw new ApplicationIdNotProvidedException("ApplicationId is not provided in ApplicationSubmissionContext");
        } else {
            SubmitApplicationRequest request = (SubmitApplicationRequest)Records.newRecord(SubmitApplicationRequest.class);
            request.setApplicationSubmissionContext(appContext);
           // 重要
            this.rmClient.submitApplication(request);
           ... ...
           }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    图片

    ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol
    
    • 1
    public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException, IOException {
        SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto();
    
        try {
            return new SubmitApplicationResponsePBImpl(this.proxy.submitApplication((RpcController)null, requestProto));
        } catch (ServiceException var4) {
            RPCUtil.unwrapAndThrowException(var4);
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    ClientRMService

    @Override
    public SubmitApplicationResponse submitApplication(
        SubmitApplicationRequest request) throws YarnException {
      ApplicationSubmissionContext submissionContext = request
          .getApplicationSubmissionContext();
      ApplicationId applicationId = submissionContext.getApplicationId();
    
      // ApplicationSubmissionContext needs to be validated for safety - only
      // those fields that are independent of the RM's configuration will be
      // checked here, those that are dependent on RM configuration are validated
      // in RMAppManager.
    
      String user = null;
      try {
        // Safety
          // 安全校验
        user = UserGroupInformation.getCurrentUser().getShortUserName();
      } catch (IOException ie) {
        LOG.warn("Unable to get the current user.", ie);
        RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
            ie.getMessage(), "ClientRMService",
            "Exception in submitting application", applicationId);
        throw RPCUtil.getRemoteException(ie);
      }
    
      // Check whether app has already been put into rmContext,
      // If it is, simply return the response
      if (rmContext.getRMApps().get(applicationId) != null) {
        LOG.info("This is an earlier submitted application: " + applicationId);
        return SubmitApplicationResponse.newInstance();
      }
    
      if (submissionContext.getQueue() == null) {
      // DEFAULT_QUEUE_NAME = "default"
        submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
      }
      if (submissionContext.getApplicationName() == null) {
        submissionContext.setApplicationName(
          //  DEFAULT_APPLICATION_NAME = "N/A"
            YarnConfiguration.DEFAULT_APPLICATION_NAME);
      }
      if (submissionContext.getApplicationType() == null) {
        submissionContext
          // DEFAULT_APPLICATION_TYPE = "YARN"
          .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
      } else {
        if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
          submissionContext.setApplicationType(submissionContext
            .getApplicationType().substring(0,
              YarnConfiguration.APPLICATION_TYPE_LENGTH));
        }
      }
    
      try {
        // call RMAppManager to submit application directly
          // 直接调用 RMAppManager 提交应用
        rmAppManager.submitApplication(submissionContext,
            System.currentTimeMillis(), user);
    
        LOG.info("Application with id " + applicationId.getId() + 
            " submitted by user " + user);
        RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
            "ClientRMService", applicationId);
      } catch (YarnException e) {
        LOG.info("Exception in submitting application with id " +
            applicationId.getId(), e);
        RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
            e.getMessage(), "ClientRMService",
            "Exception in submitting application", applicationId);
        throw e;
      }
    
      SubmitApplicationResponse response = recordFactory
          .newRecordInstance(SubmitApplicationResponse.class);
      return response;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    // call RMAppManager to submit application directly
    // 直接调用 RMAppManager 提交应用
    rmAppManager.submitApplication(submissionContext,
        System.currentTimeMillis(), user);
    
    • 1
    • 2
    • 3
    • 4

    图片

  • 相关阅读:
    【电商实战02】如何借助工具快速生成代码?初学者容易踩的坑有哪些?
    【Linux】vimrc 配置方案
    【JVM基础】虚拟机栈
    【redis】7.6 安装与配置Redis - (docker-compose)
    纯CSS实现炫酷文本时钟
    图论基础学习笔记
    Nginx禁止文件下载防止服务器被恶意扫描
    2.2 调用星火大模型的API
    Vue/Nuxt框架开发的PC端网站兼容平板设备的
    面试还被问TCP?一条龙通关
  • 原文地址:https://blog.csdn.net/hyunbar/article/details/126128786