• 详解 Spark 核心编程之 RDD 序列化


    一、问题引出

    object TestRDDSerializable {
        def main(args: Array[String]): Unit = {
            val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ser")
            val sc = new SparkContext(sparkConf)
            
            val rdd = sc.makeRDD(List(1,2,3,4), 2)
            
            val user = new User()
            
            rdd.foreach(num => {
                println("age = " + (user.age + num))
            })
            
            /*
             结果:程序执行抛出异常 NotSerializableException
             分析:
              1.foreach 算子外部的执行是在 Driver 端,内部的操作是在 Executor 端执行
              2.foreach 算子的内部操作使用到了 user 对象的属性,所以 user 对象需要从 Driver 发送到 Executor,涉及到网络传输
              3.由于 User 类没有混入序列化特质,所以抛出异常
              4.解决方法:class User extends Serializable {} 或 case class User {},样例类在编译时会自动混入序列化
            */
            
            println("=================")
            
            val rdd1 = sc.makeRDD(List[Int](), 2)
            
            val user1 = new User()
            
            rdd1.foreach(num => {
                println("age = " + (user1.age + num))
            })
            
            /*
             期望:由于rdd1没有数据,foreach 算子不会实际执行,即使 User 没有混入序列化也不会报错
             结果:程序执行抛出异常 NotSerializableException
             分析:
              1.RDD 算子中如果传递的是函数参数,则会涉及到闭包操作,内部会调用 sc.clean(f)
              2.clean 方法底层会进行闭包检测,其中就包含序列化的检测,如果检测到使用的对象没有混入序列化特质,就会抛出异常
            */
        }
    }
    
    class User {
        val age: Int = 30
    }
    

    二、Kryo 序列化框架

    • 参考地址:https://github.com/EsotericSoftware/kryo

    • Java 序列化的对比:

      • Java 的序列化比较重,生成的文件字节比较多,而 Kryo 序列化是轻量级的,产生的字节较少,所以 Kryo 速度是 Serializable 的 10 倍
      • Java 的序列化中可以通过 transient 关键字限制不参与序列化的属性,而 transient 关键字在 Kryo 序列化中不产生作用
    • 自定义 Kryo 序列化

      /*
      简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化
      */
      object TestKryoSerializable {
          def main(args: Array[String]): Unit = {
          	val conf = new SparkConf().setMaster("local[*]").setAppName("Ser")
              
              // 替换默认的序列化机制
              conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
              // 注册需要使用  kryo  序列化的自定义类,该类必须混入 Serializable 特质
              conf.registerKryoClasses(Array(classOf[Searcher]))
              
          	val sc = new SparkContext(conf)
              
          	val rdd: RDD[String] = sc.makeRDD(Array(
                  "hello world", "hello spark", 
          		"kafka", "hive"
              ), 2)
              
          	val searcher = new Searcher("h")
              
          	val result: RDD[String] = searcher.getMatchedRDD1(rdd)
              
          	result.collect.foreach(println)
              
          }
      }
      
      case class Searcher(val query: String) { 
          def isMatch(s: String) = {
          	s.contains(query) // this.query
          }
          
          def getMatchedRDD1(rdd: RDD[String]) = {
          	rdd.filter(isMatch) 
          }
          
          def getMatchedRDD2(rdd: RDD[String]) = {
              val q = query
          	rdd.filter(_.contains(q))
          }
      }
      
  • 相关阅读:
    猿如意|手把手教你下载、安装和配置PyCharm社区版
    计算机毕设 机器学习股票大数据量化分析与预测系统 - python 计算机毕设
    又撸了一个开源项目!!!
    springboot在线银行贷款系统
    JavaScript:实现 Polynomials多项式算法 (附完整源码)
    吴恩达卷积神经网络 笔记,吴恩达 深度神经网络
    JAVA毕业设计科研团队管理系统计算机源码+lw文档+系统+调试部署+数据库
    Leetcode—53.最大子数组和【中等】
    内网渗透之Linux反弹shell(三)
    面向对象实验六模板
  • 原文地址:https://blog.csdn.net/weixin_44480009/article/details/139349311