版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
官方网址:http://spark.apache.org/、https://databricks.com/spark/about
上篇文章我们对RDD的函数进行了练习,一方面让我们知道函数是如何使用的,另一方面也加强了我们对函数的认识。
在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
可以将RDD数据直接缓存到内存中,函数声明如下:
但是实际项目中,不会直接使用上述的缓存函数, RDD数据量往往很多,内存放不下的。 在实际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别:
在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:
实际项目中缓存数据时,往往选择如下两种级别:
缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发。
当缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:
此函数属于eager,立即执行。
在实际项目开发中,什么时候缓存RDD数据,最好呢???
第一点:当某个RDD被使用多次的时候,建议缓存此RDD数据
第二点:当某个RDD来之不易,并且使用不止一次,建议缓存此RDD数据
演示范例代码:
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存
*/
object SparkCacheTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
// 读取文本文件数据
val inputRDD: RDD[String] = sc.textFile("datas/wordcount/wordcount.data", minPartitions = 2)
// 缓存数据: 将数据缓存至内存
inputRDD.cache()
inputRDD.persist()
// 使用Action函数触发缓存
println(s"Count = ${inputRDD.count()}")
// 释放缓存
inputRDD.unpersist()
/* 缓存数据:选择缓存级别
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
*/
inputRDD.persist(StorageLevel.MEMORY_AND_DISK)
println(s"count: ${inputRDD.count()}")
// 应用程序运行结束,关闭资源
sc.stop()
}
}
RDD 数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。
Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。
在Spark Core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;
演示范例代码如下:
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD数据Checkpoint设置,案例演示
*/
object SparkCkptTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
// TODO: 设置检查点目录,将RDD数据保存到那个目录
sc.setCheckpointDir("datas/spark/ckpt/")
// 读取文件数据
val datasRDD = sc.textFile("datas/wordcount/wordcount.data")
// TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
datasRDD.checkpoint()
datasRDD.count()
// TODO: 再次执行count函数, 此时从checkpoint读取数据
datasRDD.count()
// 应用程序运行结束,关闭资源
sc.stop()
}
}
持久化和Checkpoint的区别:
1)存储位置
2)生命周期
3) Lineage(血统、依赖链、依赖关系)
看了这篇文章,是不是对RDD的缓存和持久化了解更深刻,下篇完成一个小案例,一个综合小案例。