• 30-Spark入门之Spark技术栈讲解、分区、系统架构、算子和任务提交方式


    17.1 Spark介绍

    17.1.1 什么是Spark

    1. 概念理解

      • 并行计算框架
        • Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。 Spark 是加州大学伯克利分校的AMP实验室所开源的类 Hadoop MapReduce 的通用并行计算框架
      • 任务的中间结果可以缓存在内存中,减少磁盘数据交互
        • Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以缓存在内存中,从而不再需要读写 HDFS ,减少磁盘数据交互,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的算法
      • Spark诞生于2009年美国加州伯克利分校的AMP实验室,基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序
    2. Spark和Hadoop的比较

      image-20220719161155589

    17.1.2 总体技术栈讲解

    Spark 提供了 Sparkcore RDD 、 Spark SQL 、 Spark Streaming 、 Spark MLlib 、 Spark、GraphX等技术组件,可以一站式地完成大数据领域的离线批处理、交互式查询、流式计算、机器学习、图计算等常见的任务。这就是 spark 一站式开发的特点

    17.1.3 Spark 与 MR 的区别

    1. MR

      • MR只能做离线计算,如果实现复杂计算逻辑,一个MR搞不定,就需要将多个MR按照先后顺序连成一串,一个MR计算完成后会将计算结果写入到HDFS中,下一个MR将上一个MR的输出作为输入,这样就要频繁读写HDFS,网络IO和磁盘IO会成为性能瓶颈。从而导致效率低下。
    2. Spark

      • spark既可以做离线计算,有可以做实时计算,提供了抽象的数据集(RDD、Dataset、DataFrame、DStream)有高度封装的API,算子丰富,并且使用了更先进的DAG有向无环图调度思想,可以对执行计划优化后在执行,并且可以数据可以cache到内存中进行复用
    3. 归根结底最重要的区别还是在于 下图为 MapReduce 执行任务流程(MR基于磁盘,Spark基于内存)

      • 都是分布式计算框架, Spark 计算中间结果基于内存缓存, MapReduce 基于 HDFS 存储。也正因此,Spark 处理数据的能力一般是 MR 的三到五倍以上, Spark 中除了基于内存计算这一个计算快的原因,
      • 还有 DAG(DAGShecdule) 有向无环图来切分任务的执行先后顺序。

    image-20220719151116859

    • MR的数据读取流程

      image-20220719151148291

    • Spark的数据读取流程

      image-20220719151206535

    17.1.4 Spark API

    多种编程语言的支持: Scala,Java,Python,R,SQL 。

    17.1.5 Spark运行模式

    1. Local
      • 多用于本地测试,如在 eclipse , idea 中写程序测试等
    2. Standalone
      • Standalone 是 Spark 自带的一个资源调度框架,它支持完全分布式
      • 由Master负责资源的分配
    3. Yarn
      • Hadoop 生态圈里面的一个资源调度框架, Spark 也是可以基于 Yarn 来计算的
      • 若要基于 Yarn 来进行资源调度,必须实现 AppalicationMaster 接口, Spark 实现了这个接口,所以可以基于 Yarn 来进行资源调度
      • 由Yarn中的ResourceManager负责资源的分配
    4. Mesos
      • 资源调度框架
      • 由Messos中的Messos Master负责资源管理

    17.1.6 Spark总结

    image-20220719151855401

    17.2 Spark Core

    17.2.1 Partition

    1. 概念

    1. 分区的原因

      • 单节点处理不了大量的数据
      • Spark RDD 是一种分布式的数据集,由于数据量很大,因此要它被切分并存储在各个结点的分区当中
    2. RDD的理解

      • Spark中,RDD(Resilient Distributed Dataset 弹性分布式数据集),是最基本的抽象数据集,
      • 其中每个RDD由若干个Partition组成,不同的分区可能在集群的不同节点上
      • 多个Partition是并行操作的
      • 一个Partition对应一个Task

      image-20220719152341362

    2. 分区方式

    对于Spark Shuffle阶段的理解

    • Spark Shuffle阶段共分为Shuffle Write阶段和Shuffle Read阶段,其中在Shuffle Write阶段中,Shuffle Map Task 对数据进行处理并产生中间数据

    • 然后再根据数据分区方式对中间数据进行分区

    • 最终Shuffle Read阶段中的Shuffle Read Task会拉取Shuffle Write阶段中产生的并已经分好区的中间数据

      image-20220719152853137

    Spark包含两种数据分区方式:HashPartitioner(哈希分区)和RangePartitioner(范围分区)

    1. HashPartitioner分区

      • Hash分区
      • HashPartitioner采用哈希的方式对键值对数据进行分区
      • 其数据分区规则为 partitionId = Key.hashCode % numPartitions
        • partitionId代表该Key对应的键值对数据应当分配到的Partition标识
        • Key.hashCode表示该Key的哈希值
        • numPartitions表示包含的Partition个数
      • 可能会造成数据倾斜(数据量不均衡)
    2. RangePartitioner分区

      • 范围分区

      • 引入RangePartitioner的原因

        • Spark引入RangePartitioner的目的是为了解决HashPartitioner所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题
      • HashPartitioner数据倾斜产生的原因

        • HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,当某几种类型数据量较多时,就会造成若干Partition中包含的数据过大
        • 在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢
      • RangePartitioner基于抽样的思想来对数据进行分区

        image-20220719153353012

    3. HDFS-Block与Spark-Partition

    spark本身并没有提供分布式文件系统,因此spark的分析大多依赖于Hadoop的分布式文件系统HDFS

    1. HDFS-Block

      • hdfs中的block是分布式存储的最小单元,类似于盛放文件的盒子,一个文件可能要占多个盒子,但一个盒子里的内容只可能来自同一份文件
    2. Spark-Partition

      • spark中的partition 是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partition组成的
      • partition 是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partition 大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定的
    3. BLock和Partition的联系

      • Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。
      • 如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235
    4. Block和Partition的区别

      BlockPartition
      位置存储空间计算空间
      大小固定不固定
      数据冗余有冗余的、不会轻易丢失没有冗余设计、丢失之后重新计算得到

    17.2.2 RDD⭐️

    RDD(Resilient Distributed Dataset) 弹性分布式数据集,Spark计算流程中的一个算子,类似于Storm中的Bolt,对数据进行计算,得到临时结果(partition)

    1. RDD五大属性

    1. RDD是由一系列的partition组成的
    2. RDD中的每一个task运行在自己的Partition上
    3. RDD之间有一系列的依赖关系(依赖其他的RDD)
    4. 分区器是作用在(K,V)格式的RDD上
    5. RDD默认会寻找最佳的计算位置
      • 计算向数据靠拢,尽可能少的进行数据的拉取操作

    2. RDD流程图

    image-20220719155726961

    • 理解
      • TextFile方法底层封装的是MR读取文件的方式,读取文件之前先进行split切片,默认split大小是一个block大小
      • RDD 实际上不存储数据,这里方便理解,暂时理解为存储数据
        • 真正存储数据的是partition,RDD不存储数据,RDD就是对这个partition的抽象
      • 什么是 K,V格式的RDD ?
        • 如果 RDD 里面存储的数据都是二元组对象,那么这个 RDD 我们就叫做 K,V格式的RDD
      • 哪里体现 RDD 的弹性(容错)?
        • partition 数量,大小没有限制,体现了 RDD 的弹性。
        • RDD 之间依赖关系,可以基于上一个 RDD 重新计算出 RDD
      • 哪里体现 RDD 的分布式?
        • RDD 是由 Partition 组成, partition 是分布在不同节点上的。
        • RDD 提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。

    3. Lineage血统

    利用内存加快数据加载,在其它的In-Memory类数据库或Cache类系统中也有实现。Spark的主要区别在于它采用血统(Lineage)来时实现分布式运算环境下的数据容错性(节点失效、数据丢失)问题

    RDD 的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得来的。如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来

    当这个RDD的部分分区数据丢失时,它可以通过Lineage找到丢失的父RDD的分区进行局部计算来恢复丢失的数据,这样可以节省资源提高运行效率

    17.2.3 系统架构

    系统架构图一:

    image-20220719160539002

    系统架构图二:

    image-20220719160651449

    1. Master (Standalone)

    • 作用
      • 资源管理的主节点(进程)
      • Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程
      • 接受用户的请求

    2. Cluster Manager

    • 作用

      • 在集群上获取资源的外部服务
      • 例如:standalone ; yarn ; mesos
    • 在Standalone模式下

      • Cluster Manager是Master(主节点),控制整个集群,监控Worker
    • 在Yarn模式下

      • Cluster Manager是资源管理者

    3. Worker计算节点(Standalone)

    1. 理解
      • Worker是Standalone资源调度框架里面资源管理的从节点。也是JVM进程。
      • 资源管理的从节点(进程),或者说是管理本机资源的进程
    2. 作用
      • Standalone模式
        • 资源管理的从节点,负责控制计算节点,启动Executor
      • Yarn模式
        • 指的是NodeManager节点

    4. Application

    • 概念理解
      • 基于Spark的用户程序,包含driver程序和运行在集群上的executor程序,即一个完整的spark应用

    5. Driver(program)

    • 作用
      • 用来连接工作进程(worker)的程序
    • 概念理解
      • 驱动程序,Application中的main函数并创建SparkContext

    6. Executor

    • 概念理解
      • 是在一个worker进程所管理的节点上为某个Application启动的一个个进程,
    • 作用
      • 这个进程负责运行任务,并且负责将数据存在内存或者磁盘上,每个应用之间都有各自独立的executor

    7. Task

    • 概念理解
      • 被发送到executor上的工作单元

    8. Job

    • 概念理解
      • 包含很多任务(Task) 的组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个Job

    9. Stage

    • 概念理解
      • 一个job会被拆分成很多组任务(Task),每组任务(Task)被称为Stage
      • 就像MapReduce分为MapTask和ReduceTask一样
    Spark代码流程
    创建SparkConf对象
    可以设置Application name。
    可以设置运行模式及资源需求。
    创建SparkContext对象
    基于Spark的上下文创建一个RDD,对RDD进行处理。
    应用程序中要有Action类算子来触发Transformation类算子执行。
    关闭Spark上下文对象SparkContext。
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    17.2.4 Spark代码流程

    1. 创建SparkConf对象

      • 可以设置Application name。
      • 可以设置运行模式及资源需求。
      val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
      
      • 1
    2. 创建SparkContext对象

      val sparkContext: SparkContext = new SparkContext(sparkConf)
      
      • 1
    3. 基于Spark的上下文创建一个RDD,对RDD进行处理。

      val value: RDD[(String,Int)] = sparkContext.textFile("src/main/resources/user.log")
      
      • 1
    4. 应用程序中要有Action类算子来触发Transformation类算子执行。

      println(lines.count())
          lines.foreach(ele => println("foreach:" + ele))
          lines.take(5).foreach(ele => println("take:" + ele))
          println(lines.first())
          lines.collect().foreach(ele => println("collect:" + ele))
      
      • 1
      • 2
      • 3
      • 4
      • 5
    5. 关闭Spark上下文对象SparkContext。

      sparkContext.stop()
      
      • 1

    17.3 算子(单文件)⭐️

    什么是算子?
    可以理解成spark RDD的方法,这些方法作用于RDD的每一个partition。因为spark的RDD是一个 lazy的计算过程,只有得到特定触发才会进行计算,否则不会产生任何结果

    Spark 记录了RDD之间的生成和依赖关系,但是只有当F进行行动操作时,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算,如下图的A、B、C、D分别是一个个RDD

    image-20220719174235256

    image-20220719174002082

    17.3.1 转换算子

    1. 概念理解
      • Transformations 类算子叫做转换算子(本质就是函数),Transformations算子是延迟执行,也叫 懒加载 执行
    2. 常见的Transformations 类算子
      • filter: 过滤符合条件的记录数,true保留,false过滤掉
      • map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素
        • 特点:输入一条数据,输出一条数据
      • faltMap:先map后flat,与map类似,每个输入项可以映射0到多个输出项
      • sample:随机抽样算子,根据传进去的小数按比例进行有放回或者无放回的抽样
      • reduceByKey:将相同的key根据相应的逻辑进行处理
      • sortByKey/sortBy:作用在k,v格式的RDD上,对key进行升序或者降序排序

    17.3.2 行动算子

    1. 概念理解
      • Action类算子叫做行动算子,Action类算子是触发执行
      • 一个Application应用程序中有几个Action类算子执行,就有几个job运行
    2. 常见Action类算子
      • count:返回数据集中的元素数,会在结果计算完成后回收到Driver端
      • take(n) :返回一个包含数据集前n个元素的集合
      • first:效果等同于 take(1) ,返回数据集中的第一个元素
      • foreach :循环遍历数据集中的每个元素,运行相应的逻辑
      • collect :将计算结果回收到 Driver 端

    17.3.3 控制算子

    • 概念理解
      • 将RDD持久化,持久化的单位是Partition
      • 控制算子有3种,cache、persist、checkpoint,其中cache和persist都是 懒执行 的,必须有一个Action类算子来触发执行
      • checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系

    1. cache

    • 默认将RDD的数据持久化到内存中,cache是 懒执行

      • cache() = persist() = persist(StorageLevel.Memory_Only)
    • rdd.cache().count() 返回的不是持久化的RDD,而是一个数值

      /**
      * 第一次并不会使用到缓存数据,因为是懒执行所以等到第一次Action才会开始缓存数据
      * 第二次用到的就是缓存数据
      */
      object Hello04Cache {
        def main(args: Array[String]): Unit = {
          //1.配置并创建对象
          val sparkContext = new SparkContext((new SparkConf().setMaster("local").setAppName("Hello04Cache" + System.currentTimeMillis())))
          //2.开始读取数据
          var lines: RDD[String] = sparkContext.textFile("src/main/resources/NASA_access_log_Aug95")
          //3.开始进行缓存
          lines = lines.cache()
          //4.开始进行计算
          val startTime = System.currentTimeMillis
          val count = lines.count
          val endTime = System.currentTimeMillis
          System.out.println("第一次共" + count + "条数据," + "计算时间=" + (endTime - startTime))
          val cacheStartTime = System.currentTimeMillis
          val cacheResult = lines.count
          val cacheEndTime = System.currentTimeMillis
          System.out.println("第二次共" + cacheResult + "条数据," + "计算时间=" + (cacheEndTime - cacheStartTime))
          //关闭sparkContext
          sparkContext.stop()
      
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26

    2. persist

    1. 特点:

      • 可以指定持久化的级别,最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK。
    2. 持久化级别

      1. MEMORY_ONLY
      • 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
      1. MEMORY_AND_DISK
      • 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
      1. MEMORY_ONLY_SER
      • 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
      1. MEMORY_AND_DISK_SER
      • 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
      1. DISK_ONLY
      • 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
      1. MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等。
    3. 代码实现

      object Hello05Persist {
        def main(args: Array[String]): Unit = {
          //1.配置并创建对象
          val sparkContext = new SparkContext((new SparkConf().setMaster("local").setAppName("Hello05Persist" + System.currentTimeMillis())))
          //2.开始读取数据
          var lines: RDD[String] = sparkContext.textFile("src/main/resources/NASA_access_log_Aug95")
          //3.开始进行缓存
          lines = lines.persist(StorageLevel.DISK_ONLY)
          //4.开始进行计算
          val startTime = System.currentTimeMillis
          val count = lines.count
          val endTime = System.currentTimeMillis
          System.out.println("第一次共" + count + "条数据," + "计算时间=" + (endTime - startTime))
          val cacheStartTime = System.currentTimeMillis
          val cacheResult = lines.count
          val cacheEndTime = System.currentTimeMillis
          System.out.println("第二次共" + cacheResult + "条数据," + "计算时间=" + (cacheEndTime - cacheStartTime))
          //关闭sparkContext
          sparkContext.stop()
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21

    3. checkpoint

    1. 特点

      • checkpoint将RDD持久化到磁盘,还可以切断 RDD 之间的依赖关系,也是 懒执行
      • 当RDD使用cache机制从内存中读取数据,如果数据没有读到,会使用checkpoint机制读取数据。此时如果没有checkpoint机制,那么就需要找到父RDD重新计算数据了,因此checkpoint是个很重要的容错机制
      • checkpoint就是对于一个RDD chain(链)如果后面需要反复使用某些中间结果RDD,可能因为一些故障导致该中间数据丢失,那么就可以针对该RDD启动checkpoint机制
    2. 执行原理

      • 当RDD的 job 执行完毕之后,会从finalRDD从后往前回溯
      • 当回溯到某一个 RDD 调用了 checkpoint 方法,会对当前的RDD做一个标记
      • Spark 框架会自动启动一个新的 job ,从头开始重新计算这个 RDD 的数据,并将计算出的数据持久化到Checkpoint目录中
        • 以便下次可以快速访问到被标记了checkpoint的RDD,切断了RDD之间的依赖性
    3. 使用Checkpoint时常用的优化手段

      • 对RDD执行Checkpoint之前,最好对这个RDD先执行cache
      • 这样新启动的 job 只需要将内存中的数据拷贝到Checkpoint目录中就可以,省去了重新计算这一步
    4. 代码实现

      def main(args: Array[String]): Unit = {
          val sparkConf = new
          SparkConf().setMaster("local").setAppName("SparkCheckPoint" + System.currentTimeMillis())
          val sparkContext = new SparkContext(sparkConf)
          sparkContext.setCheckpointDir("./checkpoint")
          val lines: RDD[String] =
          sparkContext.textFile("src/main/resources/NASA_access_log_Aug95")
          val words: RDD[String] = lines.flatMap(_.split(" "))
          println("words" + words.getNumPartitions)
          words.checkpoint
          words.count
          sparkContext.stop
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13

    17.4 Spark集群搭建

    17.4.1 安装环境检测

    1. 搭建之前确认对应的 java 版本为8版本

    2. 搭建之前确认对应的 scala 版本为2.12.x版本。

      • [root@node01 ~]# rpm -ivh scala-2.12.11.rpm

      • [root@node01 ~]# whereis scala

      • [root@node01 ~]# vim /etc/profile

        export SCALA_HOME=/usr/share/scala
        export PATH=$SCALA_HOME/bin:$PATH
        
        • 1
        • 2
      • [root@node01 ~]# source /etc/profile

      • 三台计算机Node01 Node02 Node03都需要安装Scala

    17.4.2 standalone(Single)

    启动spark的集群

    [root@node01 ~]# cd /opt/yjx/spark-2.4.6/sbin/
    [root@node01 sbin]# ./start-all.sh

    访问

    http://192.168.88.101:8080/

    运行案例

    spark-submit --master spark://node01:7077 --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
    
    • 1

    17.4.3 standalone(HA)

    • 启动集群
      • Zookeeper :
        • 【123】zkServer.sh start
      • 主节点:
        • [root@node01 ~]# cd /opt/yjx/spark-2.4.6/sbin/
        • [root@node01 sbin]# ./start-all.sh
      • 备用节点
        • [root@node02 ~]# cd /opt/yjx/spark-2.4.6/sbin/
        • [root@node02 sbin]# ./start-master.sh

    17.4.4 standalone(UI)

    17.4.5 SparkShell

    17.4.6 yarn模式

    • 启动集群

      • 启动Zookeeper:
        • 【123】zkServer.sh start
      • 启动Hadoop :
        • [root@node01 ~]# start-all.sh
      • 启动Spark:
        • [root@node01 ~]# cd /opt/yjx/spark-2.4.6/sbin/ [root@node01 sbin]# ./start-all.sh
    • 访问

      • spark: http://192.168.88.101:8080/
        hdfs: http://192.168.88.101:9870/
        yarn: http://192.168.88.101:8088
    • 提交任务

      spark-submit --master yarn --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
      
      • 1

    17.5 任务提交方式⭐️

    Standalon 和 Yarn 的任务提交方式图解

    image-20220720142938504

    17.5.1 Standalone-client

    1. 提交命令

      spark-submit --master spark://node01:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
      
      • 1
    2. 执行流程

      image-20220719193505229

      • client模式提交任务后,会在客户端启动Driver进程,来进行任务调度
      • Driver会向Master申请启动Application启动的资源,资源申请成功后
      • Driver端将task分发到worker端执行,启动executor进程(任务的分发)
      • worker端(executor进程)将task执行结果返回到Driver端(任务结果的回收)
    3. 总结

      • client模式适用于测试调试程序,Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况
      • 生产环境下不能使用client 模式,是因为:假设要提交100个 application 到集群运行,Driver 每次都会在 client 端(单个节点)启动,那么就会导致客户端100次网卡流量暴增的问题

    17.5.2 Standalone-cluster

    1. 提交命令

      spark-submit --master spark://node01:7077 --deploy-mode cluster --classorg.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
      
      • 1
    2. 执行流程

      image-20220719194238367

      • cluster模式提交应用程序后,会向Master请求启动Driver
      • Master接受请求后,随机在集群中的一台节点来启动Driver进程
      • Driver启动后为当前应用程序申请资源
      • Driver端发送task到worker节点上执行(任务的分发)
      • Worker上的executor进程将执行情况和执行结果返回给Driver端(任务结果的回收)
    3. 总结

      • Standalone-cluster 提交方式,应用程序使用的所有 jar 包和文件,必须保证所有的worker 节点都要有,因为此种方式, spark 不会自动上传包
      • 两种保证所有的Worker节点都有应用程序所需的jar包和文件
        • 将所有的依赖包和文件打到同一个包中,然后放在 hdfs 上。
        • 将所有的依赖包和文件各放一份在 worker 节点上

    17.5.3 yarn-client

    1. 提交命令

      spark-submit --master yarn --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
      
      spark-submit --master yarn–client --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
      
      spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
      
      • 1
      • 2
      • 3
      • 4
      • 5
    2. 执行流程

      image-20220719195537575

      版本一:

      • 客户端提交一个Application,在客户端启动一个Driver进程
      • 应用程序启动后会向RS(ResourceManager)(相当于Standalone模式下的master进程)发送请求
      • RS收到请求后,随机选择一台NM(NodeManager)启动AM
        • 这里的NM相当于Standalone中的Worker进程
      • AM启动后,会向RS请求一批container资源,用于启动Executor
      • RS会找到一批NM(包含container)返回给AM,用于启动Executor
      • AM会向NM发送命令启动Executor
      • Executor启动后,会 反向注册 给Driver,Driver发送task到Executor,执行情况和结果返回个Driver端

      版本二:

      • 在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动 ApplicationMaster, 随后 ResourceManager 分配 container,在合适的 NodeManager上启动 ApplicationMaster,此时的 ApplicationMaster 就是 Driver
      • Driver 启动后向 ResourceManager 申请 Executor内存,ResourceManager接到ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动 Executor 进程,Executor 进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,之后执行到 Action 算子时,触发一个 job,并根据宽依赖开始划分 stage,每个stage生成对应的taskSet,之后将 task 分发到各个Executor上执行

      版本三:

      1. 在client端启动Driver进程,初始化作业,解析程序,初始化两个类:DAGScheduler,TaskScheduler.
        – 初始化作业: 判断路径是否存在,权限校验等
        – DAGScheduler将程序的执行流程解析成DAG图,并划分阶段,根据阶段内的分区初始化Task
        – TaskScheduler接收Task,等待分配Task给executor
      2. Driver会向ResourceManager,申请资源,想要启动该应用程序的AppMaster
      3. ResourceManager会分配一个节点,来运行AppMaster,由NodeManager负责真正分配资源运行AppMaster
      4. AppMaster会向ResourceManager申请整个程序所需要的其他资源,准备运行executor进程
      5. 在各个节点上运行的executor会向Driver进行反向注册,要求分配任务
      6. TaskScheduler将Task分配到不同的executor,并监控实时状态,executor开始执行任务,
      7. TaskScheduler收到executor执行完的信息后,表示整个应用程序完成,会向ResouceManager申请注销
    3. 总结

      • Yarn-client 模式同样是适用于测试,因为 Driver 运行在本地, Driver 会与 yarn 集群中的 Executor 进行大量的通信
      • ApplicationMaster (executorLauncher)的在此模式中的作用:
        • 为当前的 Application 申请资源
        • 给 NodeManager 发送消息启动 Executor
        • 注意: ApplicationMaster 在此种模式下没有作业调度的功能

    17.5.4 yarn-cluster

    1. 提交命令

      spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
      
      spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
      
      • 1
      • 2
      • 3
    2. 执行流程

      image-20220719214653414

      版本一:

      • 客户机提交Application应用程序,发送请求到RS(ApplicationMaster),请求启动AM(ApplicationMaster)
      • RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)
      • ApplicationMaster启动后,ApplicationMaster发送请求到RS,请求一批container用于启动Executor
      • ApplicationMaster返回一批NM节点给ApplicationMaster
      • ApplicationMaster连接到NM,发送请求到NM启动Executor
      • Executor反向注册到AM所在的节点的Driver,Driver发送task到Executor

      版本二:

      1. client会首先向ResourceManager申请资源,要求启动AppMaster进程
      2. ResouceManager会分配一个节点,由NodeManager来运行AppMaster,并在AppMaster所在节点运行Driver进程
        Driver进程的作用:初始化作业,解析程序,初始化两个DAGScheduler,TaskScheduler.
        – 初始化作业: 判断路径是否存在,权限校验等
        – DAGScheduler将程序的执行流程解析成DAG图,并划分阶段,根据阶段内的分区初始化Task
        – TaskScheduler接收Task,等待分配Task给executor
      3. AppMaster会向ResourceManager申请整个程序所需要的其他资源,准备运行executor进程
      4. 在各个节点上运行的executor会向Driver进行反向注册,要求分配任务
      5. TaskScheduler将Task分配到不同的executor,并监控实时状态,executor开始执行任务,
      6. TaskScheduler收到executor执行完的信息后,表示整个应用程序完成,会向ResouceManager申请注销
    3. 总结

      • Yarn-Cluster主要用于生产环境中,因为 Driver 运行在 Yarn 集群中某一台 NodeManager中,每次提交任务的 Driver 所在的机器都不再是提交任务的客户端节点,而是多个 NM 节点中的一台,不会产生某一台机器网卡流量激增的现象,但同样也有缺点,任务提交后不能看到日志。只能通过 yarn 查看日志
      • ApplicationMaster 在此模式中的的作用:
        • 为当前的 Application 申请资源
        • 给 NodeManger 发送消息启动 Executor
        • 任务调度

    17.5.5 Standalone和Yarn的对比

    1. 相同点

      • standalone是spark自身携带的资源管理框架
      • yarn是hadoop中的资源管理框架。
      • 都是对核心和内存进行管理和分配。
    2. 作业方式不同

      • spark 的standalone模式使用的是spark自身的集群管理器

      • yarn模式是将spark作业运行在yarn上。

    3. 多用户支持不同

      • standalone对于多用户多application支持的不好,只能支持fifo模式进行资源调度,先来的任务先执行,后来的任务就后执行。也可以通过配置,让先来的任务不占用所有资源,给后来的任务留点资源

      • yarn模式对于多用户多任务支持比较好,arn中有fifo调度器,容量调度器,公平调度器这三种资源分配策略,可以动态实现资源的扩缩,更灵活,更重

    4. Yarn和Spark的Standalone调度模式对比

      YarnStandalone节点功能
      ResouceManagerMaster管理子节点、资源调度、接收任务请求
      NodeMangerWorker管理当前节点,并管理子进程
      YarnChildExecutor运行真正的计算逻辑的(Task)
      ClientClient提交app,管理该任务的Executor
      ApplicaitonMasterApplicaitonMaster管理任务,包含driver程序和运行在集群上的executor程序
      • Spark-Standalone
        • client:提交任务的节点就是Drive
        • Cluster:由集群选择一个节点作为Driver
      • Spark-Yarn
        • Client:提交的节点就是Driver,本身还是AM,但任务调度归属于Driver
        • Cluster:Driver和AM二合一,AM是负责任务调度的
  • 相关阅读:
    ElementUI之登陆+注册->饿了吗完成用户登录界面搭建,axios之get请求,axios之post请求,跨域,注册界面
    从官方文档中探索 Android App 架构演进的方向
    Typescript学习
    04-跨域请求
    从 0 搭建 Vite 3 + Vue 3 前端工程化项目
    c# entity freamwork 判断是否存在
    指针面试问题
    Smartbi入选Gartner ABI领域“客户之声”,荣获“卓越表现者”
    【笔试】2022/9/3 京东开发岗编程满分
    X11 Xlib截屏问题及深入分析二 —— 源码实现1
  • 原文地址:https://blog.csdn.net/weixin_50627985/article/details/125881997