• RDD的分区、依赖关系、机制


    目录

    一.RDD的分区

    二.RDD的依赖关系

    三.RDD机制

    1、使用persist()方法对RDD进行持久化

    ​编辑

    2、使用cache()方法对RDD进行持久化

    ​编辑

    三、容错机制


    一.RDD的分区

    RDD的分区原则是分区的个数尽量等于集群中的CPU核心(Core)数目。

    各种模式下的默认分区数目如下

    (1) Local模式:默认为本地机器的CPU数目,若设置了local[N].则默认为N.

    (2) Standalone或者Yarn模式:在“集群中所有CPU核数总和"和“2”这两者中取较大值作为默认值。

    (3) Mesos 模式:默认的分区数是8.

    Spark框架为RDD提供了两种分区方式,分别是哈希分区(HashPartitioner)和范围分区(RangePartitioner)。

    Spark也支持自定义分区方式,即通过一个自定义的Partitioner对象来控制RDD的分区,从而进一步减少通信开销。

    如果想要实现自定义分区,就需要定义一个类,使得这个自定义的类继承org. apache. spark. Partitioner类,并实现其中的3个方法,具体如下:

    (1) def numPartitions:Int:用于返回创建的分区个数。

    (2) def getPartition(Key:Any):用于对输人的Key做处理,并返回该Key的分区ID, 分区ID的范围是0~ numPartitions 1.

    (3) equals (other: Any):用于Spark判断自定义的Partitioner 对象和其他的 Partitioner对象是否相同,从而判断两个RDD的分区方式是否相同。

    二.RDD的依赖关系

    RDD之间具有依赖的关系。

    RDD与它所依赖的RDD的依赖关系有两种类型,分别是

    窄依赖(narrow dependency)宽依赖(wide dependency)

    窄依赖是指父RDD的每一个分区最多被一个子RDD的分区使用,即OneToOneDependencies。窄依赖的表现一般分为两类:第一类表现为一个父RDD的分区对应于一个子RDD的分区;第二类表现为多个父RDD的分区对应于一个子RDD的分区,也就是说,一个父RDD的一个分区不可能对应一个子RDD的多个分区。

    当RDD执行map、filter、union、和join操作时,都会产生窄依赖。

    宽依赖是指子RDD的每一个分区都会使用所有父RDD的所有分区或多个分区,即One ToManyDependecies。(当RDD进行groupByKey和join操作时,会产生宽依赖)

    join算子操作既可以属于窄依赖,也可以属于宽依赖。当join算子操作后,分区数量没有变化则为窄依赖(如join with inputs co partitioned,输人协同划分);当join算子操作后,分区数量发生变化则为宽依赖(如join with inputs not corpartitioned,输人非协同划分)。

    三.RDD机制

    Spark为RDD提供了两个重要的机制,分别是特久化机制(即缓存机制)和容错机制

    RDD是采用惰性求值(即每次调用行动算子操作,都会从头开始计算)

    RDD的持久化操作有两种方法,分别是cache()方法和persist()方法。

    ①persist()方法的存储级别是通过StorageLevel对象(Scala Java、Python)设置的。

    ②cache()方法的存储级别是使用默认的存储级别(即StorageLevel. MEMORY ONLY(将反序列化的对象存人内存))。

                     持久化RDD的存储级别

    MEMORY_ ONLY

    默认存储级别。将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则某些分区将不会被缓存,并且每次需要时都会重新计算

    MEMORY_AND_DISK

    将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则将剩余分区存储到磁盘上,并在需要时从磁盘读取

    MEMORY_ONLY_SER

    将RDD作为序列化的Java对象(每个分区序列化为一个字节数组),比反序列化的Java对象节省空间,但读取时更占CPU

    MEMORY_AND_DISK_SER

    与MEMORY ONLY SER类似,但是当内存放不下时则溢出到磁盘,而不是每次需要时重新计算它们

    DISK_ONLY

    仅将RDD分区全部存储到磁盘上

    MEMORY_ONLY_2 MEMORY_AND_DISK_2

    与上面的级别相同。若加上后缀2,代表的是将每个持久化的数据都复制一份副本,并将副本保存到其他节点上

    OFF_HEAP(实验性)

    与MEMORY ONLY SER类似,但将数据存储在堆外内存中(这需要启用堆外内存)

    1、使用persist()方法对RDD进行持久化

    定义一个列表list,通过该列表创建一个RDD,然后通过persist持久化操作和算子操作统计RDD中的元素个数以及打印输出RDD中所有的元素。

    代码:

    1. import org.apache.spark.storage.StorageLevel
    2. val testlist = List("hadoop","Python","Spark","Java")
    3. val listRDD = sc.parallelize(testlist)
    4. listRDD.persist(StorageLevel.DISK_ONLY)
    5. println(listRDD.count())
    6. println(listRDD.collect().mkString(","))

     

    2、使用cache()方法对RDD进行持久化

    代码:

    1. val list = List("hadoop","Python","Spark","Java")
    2. val listRDD = sc.parallelize(list)
    3. listRDD.cache()
    4. println(listRDD.count())
    5. println(listRDD.collect().mkString(","))

     

    三、容错机制

    RDD提供了两种故障恢复的方式,分别是血统(lineage)方式设置检查点(checkpoint)方式

    ①血统方式,主要是根据RDD之间的依赖关系对丢失数据的RDD进行数据恢复。

    ②设置检查点方式,本质上是将RDD写人磁盘进行存储。

  • 相关阅读:
    ClickHouse进阶(十三):Clickhouse数据字典-3-文件数据源及Mysql数据源
    初级软件测试工程师面试
    SwiftUI 原生或利用 Vision 检测限定高度的 Text 视图能否完整显示文本的方法
    clickhouse安装与远程访问
    高等数学(第七版)同济大学 习题7-7 个人解答
    2023年天津美术学院专升本专业课报名缴费时间及考试安排
    上海疫情后一个前端的面试心路历程
    使用Hybrid Flow并添加API访问控制
    暴雨信息|低碳发展共拓数字能源产业绿色空间
    python 如何自动读取含“xx”关键字的excel文件,并返回一个DataFrame
  • 原文地址:https://blog.csdn.net/m0_57781407/article/details/126381115