• SparkCore系列-6、RDD 持久化


    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    大数据系列文章目录

    官方网址http://spark.apache.org/https://databricks.com/spark/about
    在这里插入图片描述

    回顾

    上篇文章我们对RDD的函数进行了练习,一方面让我们知道函数是如何使用的,另一方面也加强了我们对函数的认识。

    基本介绍

    在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
    在这里插入图片描述

    缓存函数

    可以将RDD数据直接缓存到内存中,函数声明如下:
    在这里插入图片描述
    但是实际项目中,不会直接使用上述的缓存函数, RDD数据量往往很多,内存放不下的。 在实际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别:

    缓存级别

    在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:
    在这里插入图片描述
    实际项目中缓存数据时,往往选择如下两种级别:
    在这里插入图片描述
    缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发。
    在这里插入图片描述

    释放缓存

    当缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:
    在这里插入图片描述
    此函数属于eager,立即执行。

    何时缓存数据

    在实际项目开发中,什么时候缓存RDD数据,最好呢???

    第一点:当某个RDD被使用多次的时候,建议缓存此RDD数据

    • 比如,从HDFS上读取网站行为日志数据,进行多维度的分析,最好缓存数据

    在这里插入图片描述
    第二点:当某个RDD来之不易,并且使用不止一次,建议缓存此RDD数据

    • 比如,从HBase表中读取历史订单数据,与从MySQL表中商品和用户维度信息数据,进行关联Join等聚合操作,获取RDD: etlRDD,后续的报表分析使用此RDD,此时建议缓存RDD数据
    • 案例: etlRDD.persist(StoageLeval.MEMORY_AND_DISK_2)

    演示范例代码:

    import org.apache.spark.rdd.RDD
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存
     */
    object SparkCacheTest {
      def main(args: Array[String]): Unit = {
        // 创建应用程序入口SparkContext实例对象
        val sc: SparkContext = {
          // 1.a 创建SparkConf对象,设置应用的配置信息
          val sparkConf: SparkConf = new SparkConf()
            .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
            .setMaster("local[2]")
          // 1.b 传递SparkConf对象,构建Context实例
          new SparkContext(sparkConf)
        }
        // 读取文本文件数据
        val inputRDD: RDD[String] = sc.textFile("datas/wordcount/wordcount.data", minPartitions = 2)
        // 缓存数据: 将数据缓存至内存
        inputRDD.cache()
        inputRDD.persist()
        // 使用Action函数触发缓存
        println(s"Count = ${inputRDD.count()}")
        // 释放缓存
        inputRDD.unpersist()
        /* 缓存数据:选择缓存级别
        val NONE = new StorageLevel(false, false, false, false)
        val DISK_ONLY = new StorageLevel(true, false, false, false)
        val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
        val MEMORY_ONLY = new StorageLevel(false, true, false, true)
        val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
        val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
        val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
        val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
        val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
        val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
        val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
        val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
        */
        inputRDD.persist(StorageLevel.MEMORY_AND_DISK)
        println(s"count: ${inputRDD.count()}")
        // 应用程序运行结束,关闭资源
        sc.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    RDD Checkpoint

    RDD 数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。

    Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。

    在Spark Core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;

    在这里插入图片描述
    演示范例代码如下:

    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * RDD数据Checkpoint设置,案例演示
     */
    object SparkCkptTest {
      def main(args: Array[String]): Unit = {
        // 创建应用程序入口SparkContext实例对象
        val sc: SparkContext = {
          // 1.a 创建SparkConf对象,设置应用的配置信息
          val sparkConf: SparkConf = new SparkConf()
            .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
            .setMaster("local[2]")
          // 1.b 传递SparkConf对象,构建Context实例
          new SparkContext(sparkConf)
        }
        // TODO: 设置检查点目录,将RDD数据保存到那个目录
        sc.setCheckpointDir("datas/spark/ckpt/")
        // 读取文件数据
        val datasRDD = sc.textFile("datas/wordcount/wordcount.data")
        // TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
        datasRDD.checkpoint()
        datasRDD.count()
        // TODO: 再次执行count函数, 此时从checkpoint读取数据
        datasRDD.count()
        // 应用程序运行结束,关闭资源
        sc.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    持久化和Checkpoint的区别:

    1)存储位置

    • Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存);
    • Checkpoint 可以保存数据到 HDFS 这类可靠的存储上;

    2)生命周期

    • Cache和Persist的RDD会在程序结束后会被清除或者手动调用unpersist方法;
    • Checkpoint的RDD在程序结束后依然存在,不会被删除;

    3) Lineage(血统、依赖链、依赖关系)

    • Persist和Cache, 不会丢掉RDD间的依赖链/依赖关系,因为这种缓存是不可靠的,如果出现了一些错误(例如 Executor 宕机),需要通过回溯依赖链重新计算出来;
    • Checkpoint会斩断依赖链,因为Checkpoint会把结果保存在HDFS这类存储中,更加的安全可靠,一般不需要回溯依赖链;

    在这里插入图片描述

    下回分解

    看了这篇文章,是不是对RDD的缓存和持久化了解更深刻,下篇完成一个小案例,一个综合小案例。

  • 相关阅读:
    【网络教程】IPtables官方教程--学习笔记1
    surging作者出具压测结果
    C语言--判断年月日是否合理
    动力节点springboot笔记
    Qt5开发从入门到精通——第十二篇三节(Qt5 事件处理及实例——多线程应用、服务器端编程、客户端编程)
    Vue--》过滤器介绍及其使用方法
    pytorch线性代数的基本操作
    C语言学习之路(基础篇)—— 文件操作(下)
    一个cv大师的摆烂之旅
    23种设计模式3
  • 原文地址:https://blog.csdn.net/l848168/article/details/126035654