• Spark 管理和更新Hadoop token 流程


    Hadoop Token 管理

    • AM 通过 kerberos authentication
    • AM 获取 Yarn 和 HDFS Token
    • AM send tokens to containers
    • Containers load tokens

    Enable debug message

    log4j.logger.org.apache.hadoop.security=DEBUG

    AM Generate tokens

    Logs:

    23/09/07 22:38:50,375 INFO [main] security.HadoopDelegationTokenManager:57 : Attempting to login to KDC using principal: hadoop_user@PROD.COM, keytab: /home/hadoop_user/hadoop_user.keytab
    23/09/07 22:38:50,381 DEBUG [main] security.UserGroupInformation:246 : Hadoop login
    23/09/07 22:38:50,381 DEBUG [main] security.UserGroupInformation:192 : hadoop login commit
    23/09/07 22:38:50,382 DEBUG [main] security.UserGroupInformation:200 : Using kerberos user: hadoop_user@PROD.COM
    23/09/07 22:38:50,382 DEBUG [main] security.UserGroupInformation:218 : Using user: "hadoop_user@PROD.COM" with name: hadoop_user@PROD.COM
    23/09/07 22:38:50,382 DEBUG [main] security.UserGroupInformation:230 : User entry: "hadoop_user@PROD.COM"
    23/09/07 22:38:50,382 INFO [main] security.HadoopDelegationTokenManager:57 : Successfully logged into KDC.
    23/09/07 22:38:51,247 INFO [main] security.HadoopFSDelegationTokenProvider:57 : getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-113291108_1, ugi=hadoop_user@PROD.COM (auth:KERBEROS)]] with renewer rm/hadoop-rm-1.vip.hadoop.COM@PROD.COM
    23/09/07 22:38:51,391 DEBUG [main] security.SaslRpcClient:493 : Sending sasl message state: NEGOTIATE
    
    23/09/07 22:38:51,398 DEBUG [main] security.SaslRpcClient:288 : Get token info proto:interface org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB info:@org.apache.hadoop.security.token.TokenInfo(value=class org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector)
    23/09/07 22:38:51,399 DEBUG [main] security.SaslRpcClient:241 : tokens aren't supported for this protocol or user doesn't have one
    23/09/07 22:38:51,399 DEBUG [main] security.SaslRpcClient:313 : Get kerberos info proto:interface org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB info:@org.apache.hadoop.security.KerberosInfo(clientPrincipal=, serverPrincipal=dfs.namenode.kerberos.principal)
    23/09/07 22:38:51,420 DEBUG [main] security.SaslRpcClient:260 : RPC Server's Kerberos principal name for protocol=org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB is nn/hadoop-nn-2.vip.hadoop.COM@PROD.COM
    23/09/07 22:38:51,421 DEBUG [main] security.SaslRpcClient:271 : Creating SASL GSSAPI(KERBEROS)  client to authenticate to service at hadoop-nn-2.vip.hadoop.COM
    23/09/07 22:38:51,425 DEBUG [main] security.SaslRpcClient:194 : Use KERBEROS authentication for protocol ClientNamenodeProtocolPB
    23/09/07 22:38:51,441 DEBUG [main] security.SaslRpcClient:493 : Sending sasl message state: INITIATE
    23/09/07 22:38:51,506 INFO [main] security.HadoopFSDelegationTokenProvider:57 : getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-113291108_1, ugi=hadoop_user@PROD.COM (auth:KERBEROS)]] with renewer hadoop_user@PROD.COM
    23/09/07 22:38:52,807 INFO [main] security.HadoopDelegationTokenManager:57 : Scheduling renewal in 18.0 h.
    23/09/07 22:38:52,809 INFO [main] security.HadoopDelegationTokenManager:57 : Updating delegation tokens.
    23/09/07 22:38:52,833 INFO [main] deploy.SparkHadoopUtil:57 : Updating delegation tokens for current user.
    23/09/07 22:38:52,858 INFO [dispatcher-CoarseGrainedScheduler] deploy.SparkHadoopUtil:57 : Updating delegation tokens for current user.
    23/09/07 22:38:53,119 DEBUG [main] security.SaslRpcClient:288 : Get token info proto:interface org.apache.hadoop.yarn.api.ApplicationClientProtocolPB info:org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo$2@48f2054d
    23/09/07 22:38:53,120 DEBUG [main] security.SaslRpcClient:241 : tokens aren't supported for this protocol or user doesn't have one
    23/09/07 22:38:53,121 DEBUG [main] security.SaslRpcClient:313 : Get kerberos info proto:interface org.apache.hadoop.yarn.api.ApplicationClientProtocolPB info:org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo$1@6ce26986
    23/09/07 22:38:53,124 DEBUG [main] security.SaslRpcClient:343 : getting serverKey: yarn.resourcemanager.principal conf value: rm/_HOST@PROD.COM principal: rm/hadoop-rm-1.hadoop-rm-rm.hm-prod.svc.35.tess.io@PROD.COM
    23/09/07 22:38:53,124 DEBUG [main] security.SaslRpcClient:260 : RPC Server's Kerberos principal name for protocol=org.apache.hadoop.yarn.api.ApplicationClientProtocolPB is rm/hadoop-rm-1.hadoop-rm-rm.hm-prod.svc.35.tess.io@PROD.COM
    23/09/07 22:38:53,124 DEBUG [main] security.SaslRpcClient:271 : Creating SASL GSSAPI(KERBEROS)  client to authenticate to service at hadoop-rm-1.hadoop-rm-rm.hm-prod.svc.35.tess.io
    23/09/07 22:38:53,125 DEBUG [main] security.SaslRpcClient:194 : Use KERBEROS authentication for protocol ApplicationClientProtocolPB
    23/09/07 22:38:53,131 DEBUG [main] security.SaslRpcClient:493 : Sending sasl message state: INITIATE
    23/09/07 22:38:53,182 DEBUG [main] token.Token:260 : Cloned private token Kind: HDFS_DELEGATION_TOKEN, Service: hadoop-nn-1.vip.hadoop.COM:8020, Ident: (token for hadoop_user: HDFS_DELEGATION_TOKEN owner=hadoop_user@PROD.COM, renewer=yarn, realUser=, issueDate=1694151531461, maxDate=1694756331461, sequenceNumber=1007863, masterKeyId=7139) from Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hadoop, Ident: (token for hadoop_user: HDFS_DELEGATION_TOKEN owner=hadoop_user@PROD.COM, renewer=yarn, realUser=, issueDate=1694151531461, maxDate=1694756331461, sequenceNumber=1007863, masterKeyId=7139)
    23/09/07 22:38:53,182 DEBUG [main] token.Token:260 : Cloned private token Kind: HDFS_DELEGATION_TOKEN, Service: hadoop-nn-2.vip.hadoop.COM:8020, Ident: (token for hadoop_user: HDFS_DELEGATION_TOKEN owner=hadoop_user@PROD.COM, renewer=yarn, realUser=, issueDate=1694151531461, maxDate=1694756331461, sequenceNumber=1007863, masterKeyId=7139) from Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hadoop, Ident: (token for hadoop_user: HDFS_DELEGATION_TOKEN owner=hadoop_user@PROD.COM, renewer=yarn, realUser=, issueDate=1694151531461, maxDate=1694756331461, sequenceNumber=1007863, masterKeyId=7139)
    23/09/07 22:38:53,182 DEBUG [main] token.Token:260 : Cloned private token Kind: HDFS_DELEGATION_TOKEN, Service: hadoop-nn-3.vip.hadoop.COM:8020, Ident: (token for hadoop_user: HDFS_DELEGATION_TOKEN owner=hadoop_user@PROD.COM, renewer=yarn, realUser=, issueDate=1694151531461, maxDate=1694756331461, sequenceNumber=1007863, masterKeyId=7139) from Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hadoop, Ident: (token for hadoop_user: HDFS_DELEGATION_TOKEN owner=hadoop_user@PROD.COM, renewer=yarn, realUser=, issueDate=1694151531461, maxDate=1694756331461, sequenceNumber=1007863, masterKeyId=7139)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    CoarseGrainedSchedulerBackend

    启动 token manager

      override def start(): Unit = {
        if (UserGroupInformation.isSecurityEnabled()) {
          delegationTokenManager = createTokenManager()
          delegationTokenManager.foreach { dtm =>
            val ugi = UserGroupInformation.getCurrentUser()
            val tokens = if (dtm.renewalEnabled) {
              dtm.start()
            } else {
              val creds = ugi.getCredentials()
              dtm.obtainDelegationTokens(creds)
              if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) {
                SparkHadoopUtil.get.serialize(creds)
              } else {
                null
              }
            }
            if (tokens != null) {
              updateDelegationTokens(tokens)
            }
          }
        }
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    HadoopDelegationTokenManager

    定时 Refresh tokens

      def start(): Array[Byte] = {
        require(renewalEnabled, "Token renewal must be enabled to start the renewer.")
        require(schedulerRef != null, "Token renewal requires a scheduler endpoint.")
        renewalExecutor =
          ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    
        val ugi = UserGroupInformation.getCurrentUser()
        if (ugi.isFromKeytab()) {
          // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop 3.x,
          // it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added in
          // HADOOP-9567). This task will make sure that the user stays logged in regardless of that
          // configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if the TGT does
          // not need to be renewed yet.
          val tgtRenewalTask = new Runnable() {
            override def run(): Unit = {
              try {
                ugi.checkTGTAndReloginFromKeytab()
              } catch {
                case e: Throwable =>
                  logWarning("Failed to renew TGT from keytab file", e)
              }
            }
          }
          val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
          renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
            TimeUnit.SECONDS)
        }
    
        updateTokensTask()
      }
    
      private def updateTokensTask(): Array[Byte] = {
        try {
          val freshUGI = doLogin()
          val creds = obtainTokensAndScheduleRenewal(freshUGI)
          val tokens = SparkHadoopUtil.get.serialize(creds)
    
          logInfo("Updating delegation tokens.")
          schedulerRef.send(UpdateDelegationTokens(tokens))
          tokens
        } catch {
          case _: InterruptedException =>
            // Ignore, may happen if shutting down.
            null
          case e: Exception =>
            val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
            logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
              " If this happens too often tasks will fail.", e)
            scheduleRenewal(delay)
            null
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    CoarseGrainedSchedulerBackend

          case UpdateDelegationTokens(newDelegationTokens) =>
            updateDelegationTokens(newDelegationTokens)
    
    • 1
    • 2

    Container 启动 Load token

    23/09/07 23:41:56,279 DEBUG [main] security.UserGroupInformation:246 : Hadoop login
    23/09/07 23:41:56,281 DEBUG [main] security.UserGroupInformation:192 : hadoop login commit
    23/09/07 23:41:56,284 DEBUG [main] security.UserGroupInformation:214 : Using local user: UnixPrincipal: hadoop_user
    23/09/07 23:41:56,285 DEBUG [main] security.UserGroupInformation:218 : Using user: "UnixPrincipal: hadoop_user" with name: hadoop_user
    23/09/07 23:41:56,285 DEBUG [main] security.UserGroupInformation:230 : User entry: "hadoop_user"
    23/09/07 23:41:56,285 DEBUG [main] security.UserGroupInformation:741 : Reading credentials from location /hadoop/1/yarn/local/usercache/hadoop_user/appcache/application_1684894519955_69959/container_e2311_1684894519955_69959_01_000021/container_tokens
    23/09/07 23:41:56,303 DEBUG [main] security.UserGroupInformation:746 : Loaded 7 tokens from /hadoop/1/yarn/local/usercache/hadoop_user/appcache/application_1684894519955_69959/container_e2311_1684894519955_69959_01_000021/container_tokens
    23/09/07 23:41:56,304 DEBUG [main] security.UserGroupInformation:787 : UGI loginUser: hadoop_user (auth:SIMPLE)
    
    23/09/07 23:44:54,825 DEBUG [Executor 1 task launch worker for task 1785, task 1757.0 in stage 8.0 of app application_1684894519955_69959] security.SaslRpcClient:284 : Get token info proto:interface org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB info:@org.apache.hadoop.security.token.TokenInfo(value=class org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector)
    23/09/07 23:44:54,831 DEBUG [Executor 1 task launch worker for task 1785, task 1757.0 in stage 8.0 of app application_1684894519955_69959] security.SaslRpcClient:267 : Creating SASL DIGEST-MD5(TOKEN)  client to authenticate to service at default
    23/09/07 23:44:54,833 DEBUG [Executor 1 task launch worker for task 1785, task 1757.0 in stage 8.0 of app application_1684894519955_69959] security.SaslRpcClient:190 : Use TOKEN authentication for protocol ClientNamenodeProtocolPB
    23/09/07 23:44:54,836 DEBUG [Executor 1 task launch worker for task 1785, task 1757.0 in stage 8.0 of app application_1684894519955_69959] security.SaslRpcClient:690 : SASL client callback: setting username: ABZiX2Nhcm1lbEBQUk9ELkVCQVkuQ09NBHlhcm4AigGKc4ZbmYoBipeS35mMAUIqhY4Ggg==
    23/09/07 23:44:54,836 DEBUG [Executor 1 task launch worker for task 1785, task 1757.0 in stage 8.0 of app application_1684894519955_69959] security.SaslRpcClient:695 : SASL client callback: setting userPassword
    23/09/07 23:44:54,836 DEBUG [Executor 1 task launch worker for task 1785, task 1757.0 in stage 8.0 of app application_1684894519955_69959] security.SaslRpcClient:700 : SASL client callback: setting realm: default
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    Spark AM 和 Executor 更新收到的 tokens

        case UpdateDelegationTokens(tokenBytes) =>
          logInfo(s"Received tokens of ${tokenBytes.length} bytes")
          SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
    
    .....	
      UserGroupInformation.getCurrentUser.addCredentials(creds)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Debug message

    # 创建 UGI
    
    UserGroupInformation createRemoteUser(String user, AuthMethod authMethod)
    
    # 为什么UGI.commit() 选择了 local user, 而不是 kerberos user??
    
    23/09/07 22:39:05,904 DEBUG [main] security.UserGroupInformation:246 : Hadoop login
    23/09/07 22:39:05,905 DEBUG [main] security.UserGroupInformation:192 : hadoop login commit
    23/09/07 22:39:05,908 DEBUG [main] security.UserGroupInformation:214 : Using local user: UnixPrincipal: b_carmel
    23/09/07 22:39:05,909 DEBUG [main] security.UserGroupInformation:218 : Using user: "UnixPrincipal: b_carmel" with name: b_carmel
    23/09/07 22:39:05,909 DEBUG [main] security.UserGroupInformation:230 : User entry: "b_carmel"
    
    因为:
    UGI
        HadoopLoginModule
            Subject subject // 这个没有 KerberosPrincipal 
    
    
    * UGI 在 Client 中叫 ticket
    * 通过 Clint.ticket 创建 SaslRpcClient
    
    UserGroupInformation realUser // 一般指系统用户
        doSubjectLogin(subject, null);
    -->
    UserGroupInformation loginUser  // 也是 proxyUser
        Credentials
            secretKeysMap
                Text alias --> byte[] key
        // 准备 loginUser Credentials
        // 23/09/07 22:39:05,928 DEBUG [main] security.UserGroupInformation:746 : Loaded 2 tokens from /hadoop/2/yarn/local/usercache/b_carmel/appcache/application_1693993514169_0446/container_e6161_1693993514169_0446_01_000001/container_tokens
    
        User user // sub class of Principal
            String fullName // 等于 name
            AuthenticationMethod authMethod 
        // 23/09/07 22:39:05,929 DEBUG [main] security.UserGroupInformation:787 : UGI loginUser: b_carmel (auth:SIMPLE)
    
        Subject subject;  // 这个和 HadoopLoginModule 中的 subject 相同
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
  • 相关阅读:
    Spring | 异常处理最佳实践
    Flink standalone 模式下运行WordCount程序过程
    德博能源、西门子能源、霍尼韦尔等出席2023中国可持续生物燃料峰会
    Css3新布局---Grid网格
    kibana操作elasticsearch(增删改查)
    在项目中使用flex布局的justify-content:space-around;遇到的问题,(数量为单数)
    计算机毕业设计(附源码)python舟影短视频平台
    【Python百宝箱】视觉算法秀:Python图像处理舞台上的巅峰对决
    快手治理低质量直播内容,运营者需要注意什么?
    宿主物种丨Jackson告诉你选择二抗的注意事项
  • 原文地址:https://blog.csdn.net/wankunde/article/details/132765295