• spark-core-源码、Worker启动、sparksubmit提交、Driver启动


    sparksubmit源码解析

     在提交我们写好的jar包时候,用到submit命令,他的源码解析流程如上图

    位于deploy里的SparkSubmit里面,根据main方法一点点run进去,分配我们传的参数,尤其是

    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

    MainClass这个东西就是我们要先执行的一个位置,他根据我们设置的deploy-mode来进行选择

    在分配参数的时候,deploy-mode如果是client模式,直接MainClass为我们jar包去执行

    但如果是集群模式,他会启用ClientApp里的start方法,app.start(childArgs.toArray, sparkConf)

    由此生成一个ClientEndpoint,也就是创建一个新的Env环境,由此 还是之前的老规矩,inbox一定会process onStart()方法。

    在这个方法里面 MainClass会被指定为 创建Driver

    val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

    同时,用一个command来存储我们自己的jar包

    val command = new Command(mainClass,
      Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
      sys.env, classPathEntries, libraryPathEntries, javaOpts)

    并将上述信息封装成driverDescription发送Master

    asyncSendToMasterAndForwardReply[SubmitDriverResponse](
      RequestSubmitDriver(driverDescription))

    Master接收到之后,立马调配worker给他分配资源创建Driver

    1. private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    2. logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    3. worker.addDriver(driver)
    4. driver.worker = Some(worker)
    5. worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    6. driver.state = DriverState.RUNNING
    7. }

    对worker的引用  调用send函数让他启动Driver

    worker接受到之后,会立马在本机创建一个新的JVM  DriverWrapper

    这个新进程就负责执行我们自己的程序jar包


    sparkContext解析

     要执行我们自己的代码了,在new sparkContext()的时候,就会有上图的流程,

    发送创建Application的信息给master之后,master就开始分配资源给我们的任务了,也就是分配executor,我们的executor会根据参数先定义好 每一个worker里面要启动几个executor,根据最终算好的结果去worker一个个发消息让他们启动,启动之后 worker就会另开一个新的rpcEnv 也就是新开端点ExecutorBackend,里面的start方法会和之前的driver端点通知,说我要注册了,最终executor里面也会新开一个线程池,这个线程池就是最终我们跑代码执行我们自己东西的地方


    我们自己的代码执行过程

    首先明确,分布式计算是大数据的期望执行过程

    里面会分为相干计算 与 不相干计算

    不相干计算就是,数据在不同的主机里面,大家各跑各的对最终结果不会有影响,比如简单的map flatMap

    相干计算就是我一定要拿到所有主机的数据才能进行的计算,需要shuffle

    所以在进行rdd.map().filter()的执行输出结果,由于没有用到shuffle,大家肯定是各跑各的,所以会有map,filter,map,filter这种交替执行的现象。

     如上图,不相干计算组成了stage,在一台机器上就可以完成,中间的执行逻辑是pipeline模式,迭代器嵌套,就是

    flatMap(map(filter())),这种就是窄依赖模式,stage与stage之间需要shuffle。

    rdd他不存储具体数据,只存逻辑

    描述任务的并行度 也就是task任务数量,一个task就是一个并行度

    一个stage里task的数量就是由stage里最后一个rdd的分区的数量决定


    然后就是具体的计算层执行rdd逻辑了

     最终的执行逻辑:

    从RDD.foreach()那一刻开始算,foreach里面有最终runJob方法,这里开始使用DAGschedule处理我们的Job

    首先是开启ProcessLoop等待接收我们的job,提交之后里面就会处理我们的申请,根据我们传的rdd,它是最后的ResultStage,我们要根据这个ResultStage去递归的切割前面所有rdd,切割成一个个rdd,切割的标准就是,通过栈结构根据每一个rdd的dependices往前,以是否发生shuffle来进行切割,最终递归的回归过程就是每一个stage提交的过程


    举例说明

     首先要明确,从x到g这一步它是窄依赖,并不是shuffle

    因为他俩的分区数一样

    如果还是shuffle操作的话,最终比如msb这个数据b到x是去了分区2,x到g还会是分区2

    没必要是shuffle操作,spark会自动调优,将这一操作放在一台机子上进行,后面的rdd f也会被shuffle到这台机子。

    但是如果分区数改变成4 那么就不是窄依赖了,因为原数据存放的分区号发生了改变。


    missing是判断出是否为shuffle rdd才存放的,他放的是 一个stage里面最后一个rdd


    之后就是通过stage根据分区来划分task任务, 然后让driver端点分配executor给他来执行

  • 相关阅读:
    用户登录后首页不显示数据
    如何使用 LeiaPix 让照片动起来
    vue路由传参的详解1.命名路由的传参和取出2.query传参和取出3.meta传参和取出4.其他方式5.注意点 代码和注释
    把backtrader改造成金融强化学习回测引擎
    【附源码】计算机毕业设计SSM特大城市地铁站卫生防疫系统
    什么时候可以禁用文本选择?
    Pytorch 图像增强的几个方法
    如何在 Windows 10/11 更新中修复错误代码0x80070002
    【MySQL基础|第三篇】--- 详谈SQL中的DQL语句
    PDF转Word怎么转?教你三招快速实现PDF转Word
  • 原文地址:https://blog.csdn.net/weixin_60343465/article/details/127934911