决定从这一期开始写基于yarn模式部署的spark(基于Spark3.0.0)集群,将用户编写的代码(以WordCount为例子)提交到该集群上运行的一系列流程。
- object Spark03_WordCount {
-
- def main(args: Array[String]): Unit = {
- val sparConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
- val sc = new SparkContext(sparConf)
-
- // 1. 读取文件,获取一行一行的数据
- // hello world
- val lines: RDD[String] = sc.textFile("datas\\wc\\*")
-
- // 2. 将一行数据进行拆分,形成一个一个的单词(分词)
- // 扁平化:将整体拆分成个体的操作
- // "hello world" => hello, world, hello, world
- val words: RDD[String] = lines.flatMap(_.split(" "))
-
- val wordToOne: RDD[(String, Int)] = words.map(
- word => (word, 1)
- )
-
- // wordToOne.reduceByKey((x, y) => {
- // x + y
- // })
- val wordToCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
-
-
- // 5. 将转换结果采集到控制台打印出来
- val array: Array[(String, Int)] = wordToCount.collect()
- array.foreach(println)
-
- // 关闭连接
- sc.stop()
- }
-
- }
比如我们要在命令行提交指令,一般会输入如下命令:
- bin/spark-submit \
- --class com.wy.bigdata.spark.core.wc.Spark03_WordCount \
- --master spark://hadoop102:7077 \
- ./examples/jars/spark-examples_2.12-3.0.0.jar
那么我们打开bin目录下的spark-submit文件看看究竟:

所以说起始入口为org.apache.spark.deploy.SparkSubmit,那么我们就从它开始分析吧。
- override def main(args: Array[String]): Unit = {
- val submit = new SparkSubmit() {
- self =>
-
- override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
- new SparkSubmitArguments(args) {
- override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
-
- override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
-
- override protected def logError(msg: => String): Unit = self.logError(msg)
- }
- }
-
- override protected def logInfo(msg: => String): Unit = printMessage(msg)
-
- override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
-
- override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")
-
- override def doSubmit(args: Array[String]): Unit = {
- try {
- super.doSubmit(args)
- } catch {
- case e: SparkUserAppException =>
- exitFn(e.exitCode)
- }
- }
-
- }
-
- submit.doSubmit(args)
- }

点击super.doSubmit(args)进入
- def doSubmit(args: Array[String]): Unit = {
- // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
- // be reset before the application starts.
- val uninitLog = initializeLogIfNecessary(true, silent = true)
-
- val appArgs = parseArguments(args)
- if (appArgs.verbose) {
- logInfo(appArgs.toString)
- }
- appArgs.action match {
- case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
- case SparkSubmitAction.KILL => kill(appArgs)
- case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
- case SparkSubmitAction.PRINT_VERSION => printVersion()
- }
- }
①parseArguments(args)
解析命令行参数
②appArgs.action match
匹配是那种行为,我们这里是SUBMIT(为什么是SUBMIT,后面分析会提到的),所以执行submit(appArgs, uninitLog)
- private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
- extends SparkSubmitArgumentsParser with Logging {
- var master: String = null
- var deployMode: String = null
- var executorMemory: String = null
- var executorCores: String = null
- var totalExecutorCores: String = null
- var propertiesFile: String = null
- var driverMemory: String = null
- var driverExtraClassPath: String = null
- var driverExtraLibraryPath: String = null
- var driverExtraJavaOptions: String = null
- var queue: String = null
- var numExecutors: String = null
- var files: String = null
- var archives: String = null
- var mainClass: String = null
- var primaryResource: String = null
- var name: String = null
- var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
- var jars: String = null
- var packages: String = null
- var repositories: String = null
- var ivyRepoPath: String = null
- var ivySettingsPath: Option[String] = None
- var packagesExclusions: String = null
- var verbose: Boolean = false
- var isPython: Boolean = false
- var pyFiles: String = null
- var isR: Boolean = false
- var action: SparkSubmitAction = null
- val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
- var proxyUser: String = null
- var principal: String = null
- var keytab: String = null
- private var dynamicAllocationEnabled: Boolean = false
-
- // Standalone cluster mode only
- var supervise: Boolean = false
- var driverCores: String = null
- var submissionToKill: String = null
- var submissionToRequestStatusFor: String = null
- var useRest: Boolean = false // used internally
-
- /** Default properties present in the currently defined defaults file. */
- lazy val defaultSparkProperties: HashMap[String, String] = {
- val defaultProperties = new HashMap[String, String]()
- if (verbose) {
- logInfo(s"Using properties file: $propertiesFile")
- }
- Option(propertiesFile).foreach { filename =>
- val properties = Utils.getPropertiesFromFile(filename)
- properties.foreach { case (k, v) =>
- defaultProperties(k) = v
- }
- // Property files may contain sensitive information, so redact before printing
- if (verbose) {
- Utils.redact(properties).foreach { case (k, v) =>
- logInfo(s"Adding default property: $k=$v")
- }
- }
- }
- defaultProperties
- }
-
- // Set parameters from command line arguments
- parse(args.asJava)
-
- // Populate `sparkProperties` map from properties file
- mergeDefaultSparkProperties()
- // Remove keys that don't start with "spark." from `sparkProperties`.
- ignoreNonSparkProperties()
- // Use `sparkProperties` map along with env vars to fill in any missing parameters
- loadEnvironmentArguments()
-
- useRest = sparkProperties.getOrElse("spark.master.rest.enabled", "false").toBoolean
-
- validateArguments()
- ······

- private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
- val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
- // Let the main class re-initialize the logging system once it starts.
- if (uninitLog) {
- Logging.uninitialize()
- }
-
- if (args.verbose) {
- logInfo(s"Main class:\n$childMainClass")
- logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
- // sysProps may contain sensitive information, so redact before printing
- logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
- logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
- logInfo("\n")
- }
- val loader = getSubmitClassLoader(sparkConf)
- for (jar <- childClasspath) {
- addJarToClasspath(jar, loader)
- }
-
- var mainClass: Class[_] = null
-
- try {
- mainClass = Utils.classForName(childMainClass)
- } catch {
- case e: ClassNotFoundException =>
- logError(s"Failed to load class $childMainClass.")
- if (childMainClass.contains("thriftserver")) {
- logInfo(s"Failed to load main class $childMainClass.")
- logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
- }
- throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
- case e: NoClassDefFoundError =>
- logError(s"Failed to load $childMainClass: ${e.getMessage()}")
- if (e.getMessage.contains("org/apache/hadoop/hive")) {
- logInfo(s"Failed to load hive class.")
- logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
- }
- throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
- }
-
- val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
- mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
- } else {
- new JavaMainApplication(mainClass)
- }
-
- @tailrec
- def findCause(t: Throwable): Throwable = t match {
- case e: UndeclaredThrowableException =>
- if (e.getCause() != null) findCause(e.getCause()) else e
- case e: InvocationTargetException =>
- if (e.getCause() != null) findCause(e.getCause()) else e
- case e: Throwable =>
- e
- }
-
- try {
- app.start(childArgs.toArray, sparkConf)
- } catch {
- case t: Throwable =>
- throw findCause(t)
- }
- }
①prepareSubmitEnvironment(args)
准备提交的环境
②getSubmitClassLoader(sparkConf)
获取提交的类加载器
③Utils.classForName(childMainClass)
根据类的名称得到一个类的信息(反射)
④classOf[SparkApplication].isAssignableFrom(mainClass)
如果该类是SparkApplication的子类或者子接口就会通过构造器创建实例并转换成SparkApplication,不是就new一个JavaMainApplication,很明显经过上面的分析,该类是YarnClusterApplication继承了SparkApplication
⑤app.start(childArgs.toArray, sparkConf)
调用start启动,也就是调用YarnClusterApplication中的start方法
- override def start(args: Array[String], conf: SparkConf): Unit = {
- // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
- // so remove them from sparkConf here for yarn mode.
- conf.remove(JARS)
- conf.remove(FILES)
-
- new Client(new ClientArguments(args), conf, null).run()
- }
①conf.remove(JARS) conf.remove(FILES)
SparkSubmit会使用yarn缓存在yarn模式下分发file和jar,所以在yarn模式下从sparkConf中删除它们。
②new Client(new ClientArguments(args), conf, null).run()
创建一个与yarn集群交互的client并运行
本期涉及到的源码流程图如下:
