• Spark源码(创建与yarn集群交互的client并运行)-第一期


    决定从这一期开始写基于yarn模式部署的spark(基于Spark3.0.0)集群,将用户编写的代码(以WordCount为例子)提交到该集群上运行的一系列流程。

    1. object Spark03_WordCount {
    2. def main(args: Array[String]): Unit = {
    3. val sparConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    4. val sc = new SparkContext(sparConf)
    5. // 1. 读取文件,获取一行一行的数据
    6. // hello world
    7. val lines: RDD[String] = sc.textFile("datas\\wc\\*")
    8. // 2. 将一行数据进行拆分,形成一个一个的单词(分词)
    9. // 扁平化:将整体拆分成个体的操作
    10. // "hello world" => hello, world, hello, world
    11. val words: RDD[String] = lines.flatMap(_.split(" "))
    12. val wordToOne: RDD[(String, Int)] = words.map(
    13. word => (word, 1)
    14. )
    15. // wordToOne.reduceByKey((x, y) => {
    16. // x + y
    17. // })
    18. val wordToCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
    19. // 5. 将转换结果采集到控制台打印出来
    20. val array: Array[(String, Int)] = wordToCount.collect()
    21. array.foreach(println)
    22. // 关闭连接
    23. sc.stop()
    24. }
    25. }

    比如我们要在命令行提交指令,一般会输入如下命令:

    1. bin/spark-submit \
    2. --class com.wy.bigdata.spark.core.wc.Spark03_WordCount \
    3. --master spark://hadoop102:7077 \
    4. ./examples/jars/spark-examples_2.12-3.0.0.jar

    那么我们打开bin目录下的spark-submit文件看看究竟:

    所以说起始入口为org.apache.spark.deploy.SparkSubmit,那么我们就从它开始分析吧。

    1.SparkSubmit#main

    1. override def main(args: Array[String]): Unit = {
    2. val submit = new SparkSubmit() {
    3. self =>
    4. override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
    5. new SparkSubmitArguments(args) {
    6. override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
    7. override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
    8. override protected def logError(msg: => String): Unit = self.logError(msg)
    9. }
    10. }
    11. override protected def logInfo(msg: => String): Unit = printMessage(msg)
    12. override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
    13. override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")
    14. override def doSubmit(args: Array[String]): Unit = {
    15. try {
    16. super.doSubmit(args)
    17. } catch {
    18. case e: SparkUserAppException =>
    19. exitFn(e.exitCode)
    20. }
    21. }
    22. }
    23. submit.doSubmit(args)
    24. }

    2.SparkSubmit#doSubmit

     点击super.doSubmit(args)进入

    1. def doSubmit(args: Array[String]): Unit = {
    2. // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    3. // be reset before the application starts.
    4. val uninitLog = initializeLogIfNecessary(true, silent = true)
    5. val appArgs = parseArguments(args)
    6. if (appArgs.verbose) {
    7. logInfo(appArgs.toString)
    8. }
    9. appArgs.action match {
    10. case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
    11. case SparkSubmitAction.KILL => kill(appArgs)
    12. case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    13. case SparkSubmitAction.PRINT_VERSION => printVersion()
    14. }
    15. }

    ①parseArguments(args)

    解析命令行参数

    ②appArgs.action match

    匹配是那种行为,我们这里是SUBMIT(为什么是SUBMIT,后面分析会提到的),所以执行submit(appArgs, uninitLog)

    3.SparkSubmit#parseArguments->new SparkSubmitArguments(args)

    1. private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
    2. extends SparkSubmitArgumentsParser with Logging {
    3. var master: String = null
    4. var deployMode: String = null
    5. var executorMemory: String = null
    6. var executorCores: String = null
    7. var totalExecutorCores: String = null
    8. var propertiesFile: String = null
    9. var driverMemory: String = null
    10. var driverExtraClassPath: String = null
    11. var driverExtraLibraryPath: String = null
    12. var driverExtraJavaOptions: String = null
    13. var queue: String = null
    14. var numExecutors: String = null
    15. var files: String = null
    16. var archives: String = null
    17. var mainClass: String = null
    18. var primaryResource: String = null
    19. var name: String = null
    20. var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
    21. var jars: String = null
    22. var packages: String = null
    23. var repositories: String = null
    24. var ivyRepoPath: String = null
    25. var ivySettingsPath: Option[String] = None
    26. var packagesExclusions: String = null
    27. var verbose: Boolean = false
    28. var isPython: Boolean = false
    29. var pyFiles: String = null
    30. var isR: Boolean = false
    31. var action: SparkSubmitAction = null
    32. val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
    33. var proxyUser: String = null
    34. var principal: String = null
    35. var keytab: String = null
    36. private var dynamicAllocationEnabled: Boolean = false
    37. // Standalone cluster mode only
    38. var supervise: Boolean = false
    39. var driverCores: String = null
    40. var submissionToKill: String = null
    41. var submissionToRequestStatusFor: String = null
    42. var useRest: Boolean = false // used internally
    43. /** Default properties present in the currently defined defaults file. */
    44. lazy val defaultSparkProperties: HashMap[String, String] = {
    45. val defaultProperties = new HashMap[String, String]()
    46. if (verbose) {
    47. logInfo(s"Using properties file: $propertiesFile")
    48. }
    49. Option(propertiesFile).foreach { filename =>
    50. val properties = Utils.getPropertiesFromFile(filename)
    51. properties.foreach { case (k, v) =>
    52. defaultProperties(k) = v
    53. }
    54. // Property files may contain sensitive information, so redact before printing
    55. if (verbose) {
    56. Utils.redact(properties).foreach { case (k, v) =>
    57. logInfo(s"Adding default property: $k=$v")
    58. }
    59. }
    60. }
    61. defaultProperties
    62. }
    63. // Set parameters from command line arguments
    64. parse(args.asJava)
    65. // Populate `sparkProperties` map from properties file
    66. mergeDefaultSparkProperties()
    67. // Remove keys that don't start with "spark." from `sparkProperties`.
    68. ignoreNonSparkProperties()
    69. // Use `sparkProperties` map along with env vars to fill in any missing parameters
    70. loadEnvironmentArguments()
    71. useRest = sparkProperties.getOrElse("spark.master.rest.enabled", "false").toBoolean
    72. validateArguments()
    73. ······

     4.SparkSubmit#submit->SparkSubmit#doRunMain->SparkSubmit#runMain

    1. private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    2. val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    3. // Let the main class re-initialize the logging system once it starts.
    4. if (uninitLog) {
    5. Logging.uninitialize()
    6. }
    7. if (args.verbose) {
    8. logInfo(s"Main class:\n$childMainClass")
    9. logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
    10. // sysProps may contain sensitive information, so redact before printing
    11. logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
    12. logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
    13. logInfo("\n")
    14. }
    15. val loader = getSubmitClassLoader(sparkConf)
    16. for (jar <- childClasspath) {
    17. addJarToClasspath(jar, loader)
    18. }
    19. var mainClass: Class[_] = null
    20. try {
    21. mainClass = Utils.classForName(childMainClass)
    22. } catch {
    23. case e: ClassNotFoundException =>
    24. logError(s"Failed to load class $childMainClass.")
    25. if (childMainClass.contains("thriftserver")) {
    26. logInfo(s"Failed to load main class $childMainClass.")
    27. logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
    28. }
    29. throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
    30. case e: NoClassDefFoundError =>
    31. logError(s"Failed to load $childMainClass: ${e.getMessage()}")
    32. if (e.getMessage.contains("org/apache/hadoop/hive")) {
    33. logInfo(s"Failed to load hive class.")
    34. logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
    35. }
    36. throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
    37. }
    38. val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
    39. mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    40. } else {
    41. new JavaMainApplication(mainClass)
    42. }
    43. @tailrec
    44. def findCause(t: Throwable): Throwable = t match {
    45. case e: UndeclaredThrowableException =>
    46. if (e.getCause() != null) findCause(e.getCause()) else e
    47. case e: InvocationTargetException =>
    48. if (e.getCause() != null) findCause(e.getCause()) else e
    49. case e: Throwable =>
    50. e
    51. }
    52. try {
    53. app.start(childArgs.toArray, sparkConf)
    54. } catch {
    55. case t: Throwable =>
    56. throw findCause(t)
    57. }
    58. }

    ①prepareSubmitEnvironment(args)

    准备提交的环境

    ②getSubmitClassLoader(sparkConf)

    获取提交的类加载器

    ③Utils.classForName(childMainClass)

    根据类的名称得到一个类的信息(反射)

    ④classOf[SparkApplication].isAssignableFrom(mainClass)

    如果该类是SparkApplication的子类或者子接口就会通过构造器创建实例并转换成SparkApplication,不是就new一个JavaMainApplication,很明显经过上面的分析,该类是YarnClusterApplication继承了SparkApplication

    ⑤app.start(childArgs.toArray, sparkConf)

    调用start启动,也就是调用YarnClusterApplication中的start方法

    5.YarnClusterApplication#start

    1. override def start(args: Array[String], conf: SparkConf): Unit = {
    2. // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    3. // so remove them from sparkConf here for yarn mode.
    4. conf.remove(JARS)
    5. conf.remove(FILES)
    6. new Client(new ClientArguments(args), conf, null).run()
    7. }

    ①conf.remove(JARS) conf.remove(FILES)

    SparkSubmit会使用yarn缓存在yarn模式下分发file和jar,所以在yarn模式下从sparkConf中删除它们。

    ②new Client(new ClientArguments(args), conf, null).run()

    创建一个与yarn集群交互的client并运行

    总览

    本期涉及到的源码流程图如下:

  • 相关阅读:
    leetcode 39. 组合总和 回溯法求解(c++版本)
    使用Harbor作为docker镜像仓库之安装运行Harbor
    使用libmodbus库开发modbusTcp从站(支持多个主站连接)
    线性代数 --- 四个基本子空间(个人学习笔记)
    IDEA版SSM入门到实战(Maven+MyBatis+Spring+SpringMVC) -Maven目录结构和idea的整合
    gorm的自动化工具gen_已设置图床
    人力资源从业者如何提升自己的能力?很实用
    java计算机毕业设计支部党建工作源程序+mysql+系统+lw文档+远程调试
    情人节程序员用HTML网页表白【春娇-志明结婚邀请函】 HTML5七夕情人节表白网页源码 HTML+CSS+JavaScript
    springboot + rabbitmq + redis实现秒杀
  • 原文地址:https://blog.csdn.net/emttxdy/article/details/124643514