• Spark 之 deploy


    ExecutorRunner

    org.apache.spark.deploy.worker.ExecutorRunner

      private[worker] def start(): Unit = {
        workerThread = new Thread("ExecutorRunner for " + fullId) {
          override def run(): Unit = { fetchAndRunExecutor() }
        }
        workerThread.start()
        // Shutdown hook that kills actors on shutdown.
        shutdownHook = ShutdownHookManager.addShutdownHook { () =>
          // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
          // be `ExecutorState.LAUNCHING`. In this case, we should set `state` to `FAILED`.
          if (state == ExecutorState.LAUNCHING || state == ExecutorState.RUNNING) {
            state = ExecutorState.FAILED
          }
          killProcess(Some("Worker shutting down")) }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
      private[worker] def start(): Unit = {
        workerThread = new Thread("ExecutorRunner for " + fullId) {
          override def run(): Unit = { fetchAndRunExecutor() }
        }
        workerThread.start()
        // Shutdown hook that kills actors on shutdown.
        shutdownHook = ShutdownHookManager.addShutdownHook { () =>
          // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
          // be `ExecutorState.LAUNCHING`. In this case, we should set `state` to `FAILED`.
          if (state == ExecutorState.LAUNCHING || state == ExecutorState.RUNNING) {
            state = ExecutorState.FAILED
          }
          killProcess(Some("Worker shutting down")) }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    fetchAndRunExecutor 作为线程的主体内容。
    也就是说,val exitCode = process.waitFor() 这一阻塞过程,完全放在了线程里。

      /**
       * Download and run the executor described in our ApplicationDescription
       */
      private def fetchAndRunExecutor(): Unit = {
        try {
          val resourceFileOpt = prepareResourcesFile(SPARK_EXECUTOR_PREFIX, resources, executorDir)
          // Launch the process
          val arguments = appDesc.command.arguments ++ resourceFileOpt.map(f =>
            Seq("--resourcesFile", f.getAbsolutePath)).getOrElse(Seq.empty)
          val subsOpts = appDesc.command.javaOpts.map {
            Utils.substituteAppNExecIds(_, appId, execId.toString)
          }
          val subsCommand = appDesc.command.copy(arguments = arguments, javaOpts = subsOpts)
          val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
            memory, sparkHome.getAbsolutePath, substituteVariables)
          val command = builder.command()
          val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala.toSeq)
            .mkString("\"", "\" \"", "\"")
          logInfo(s"Launch command: $redactedCommand")
    
          builder.directory(executorDir)
          builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
          // In case we are running this from within the Spark Shell, avoid creating a "scala"
          // parent process for the executor command
          builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
    
          // Add webUI log urls
          val baseUrl =
            if (conf.get(UI_REVERSE_PROXY)) {
              conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") +
                s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
            } else {
              s"$webUiScheme$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
            }
          builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
          builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
    
          process = builder.start()
          val header = "Spark Executor Command: %s\n%s\n\n".format(
            redactedCommand, "=" * 40)
    
          // Redirect its stdout and stderr to files
          val stdout = new File(executorDir, "stdout")
          stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true)
    
          val stderr = new File(executorDir, "stderr")
          Files.write(header, stderr, StandardCharsets.UTF_8)
          stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true)
    
          state = ExecutorState.RUNNING
          worker.send(ExecutorStateChanged(appId, execId, state, None, None))
          // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
          // or with nonzero exit code
          val exitCode = process.waitFor()
          state = ExecutorState.EXITED
          val message = "Command exited with code " + exitCode
          worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
        } catch {
          case interrupted: InterruptedException =>
            logInfo("Runner thread for executor " + fullId + " interrupted")
            state = ExecutorState.KILLED
            killProcess(None)
          case e: Exception =>
            logError("Error running executor", e)
            state = ExecutorState.FAILED
            killProcess(Some(e.toString))
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    killProcess 方案
      /**
       * Kill executor process, wait for exit and notify worker to update resource status.
       *
       * @param message the exception message which caused the executor's death
       */
      private def killProcess(message: Option[String]): Unit = {
        var exitCode: Option[Int] = None
        if (process != null) {
          logInfo("Killing process!")
          if (stdoutAppender != null) {
            stdoutAppender.stop()
          }
          if (stderrAppender != null) {
            stderrAppender.stop()
          }
          exitCode = Utils.terminateProcess(process, EXECUTOR_TERMINATE_TIMEOUT_MS)
          if (exitCode.isEmpty) {
            logWarning("Failed to terminate process: " + process +
              ". This process will likely be orphaned.")
          }
        }
        try {
          worker.send(ExecutorStateChanged(appId, execId, state, message, exitCode))
        } catch {
          case e: IllegalStateException => logWarning(e.getMessage(), e)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    org.apache.spark.util.Utils.scala

      /**
       * Terminates a process waiting for at most the specified duration.
       *
       * @return the process exit value if it was successfully terminated, else None
       */
      def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
        // Politely destroy first
        process.destroy()
        if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
          // Successful exit
          Option(process.exitValue())
        } else {
          try {
            process.destroyForcibly()
          } catch {
            case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
          }
          // Wait, again, although this really should return almost immediately
          if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
            Option(process.exitValue())
          } else {
            logWarning("Timed out waiting to forcibly kill process")
            None
          }
        }
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
  • 相关阅读:
    Docker安装NextCloud搭建私有网盘
    POJ1759Garland题解
    Nacos介绍与使用
    -一尺之棰-
    浅谈mysql 第一篇
    互联网技术从业者怎么解决系统高并发?
    SpringBoot整合Redis - @Cacheable 和 RedisTemplate
    Spring后处理器-BeanPostProcessor
    太强了!有了这个 GitHub 热门工具,15 分钟内把微信小程序搬进 App!
    第六十三章 符号概览
  • 原文地址:https://blog.csdn.net/zhixingheyi_tian/article/details/134297625