object TestRDDSerializable {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ser")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4), 2)
val user = new User()
rdd.foreach(num => {
println("age = " + (user.age + num))
})
/*
结果:程序执行抛出异常 NotSerializableException
分析:
1.foreach 算子外部的执行是在 Driver 端,内部的操作是在 Executor 端执行
2.foreach 算子的内部操作使用到了 user 对象的属性,所以 user 对象需要从 Driver 发送到 Executor,涉及到网络传输
3.由于 User 类没有混入序列化特质,所以抛出异常
4.解决方法:class User extends Serializable {} 或 case class User {},样例类在编译时会自动混入序列化
*/
println("=================")
val rdd1 = sc.makeRDD(List[Int](), 2)
val user1 = new User()
rdd1.foreach(num => {
println("age = " + (user1.age + num))
})
/*
期望:由于rdd1没有数据,foreach 算子不会实际执行,即使 User 没有混入序列化也不会报错
结果:程序执行抛出异常 NotSerializableException
分析:
1.RDD 算子中如果传递的是函数参数,则会涉及到闭包操作,内部会调用 sc.clean(f)
2.clean 方法底层会进行闭包检测,其中就包含序列化的检测,如果检测到使用的对象没有混入序列化特质,就会抛出异常
*/
}
}
class User {
val age: Int = 30
}
参考地址:https://github.com/EsotericSoftware/kryo
与 Java 序列化的对比:
自定义 Kryo 序列化
/*
简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化
*/
object TestKryoSerializable {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Ser")
// 替换默认的序列化机制
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类,该类必须混入 Serializable 特质
conf.registerKryoClasses(Array(classOf[Searcher]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array(
"hello world", "hello spark",
"kafka", "hive"
), 2)
val searcher = new Searcher("h")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
case class Searcher(val query: String) {
def isMatch(s: String) = {
s.contains(query) // this.query
}
def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch)
}
def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}
}