• 大数据随记 —— DataFrame 与 RDD 之间的相互转换


    大数据系列文章👉 目录 👈

    在这里插入图片描述

    Spark SQL 中有两种方式可以在 DataFrame 和 RDD 中进行转换:

    • ① 利用反射机制,推导包含某种类型的 RDD,通过反射将其转换为指定类型的 DataFrame,适用于提前知道 RDD 的 Schema。
    • ② 通过编程借口与 RDD 进行交互获取 Schema,并动态创建 DataFrame,在运行时决定列及其类型。

    DataFrame 中的数据结构信息,即为 Scheme

    ① 通过反射获取 RDD 内的 Scheme

    (使用条件)已知类的 Schema,使用这种基于反射的方法会让代码更加简洁而且效果也更好。Scala 中,使用 case class 类型导入 RDD 并转换为 DataFrame,通过 case class 创建 Schema,case class 的参数名称会被利用反射机制作为列名。case class 可以嵌套组合成 Sequences 或者 Array。这种 RDD 可以高效的转换为 DataFrame 并注册为表。

    其次,如果需要 RDD 与 DFS 或者 DS 之间互相操作,那么需要引入 import sqlContext.implicits._

    • 这里的 sqlContext 不是包名,而是创建的 SparkSession 对象(这里为 SQLContext 对象)的变量名称,所以必须先创建 SparkSession 对象再导入。
    • 这里 sqlContext 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。

    SparkSession 是 Spark 2.0 引入的概念,其封装了 SQLContext 和 HiveContext。

    package sparksql  
      
    import org.apache.spark.sql.SQLContext  
    import org.apache.spark.{SparkConf, SparkContext}  
      
    object DataFrametoRDDofReflection {  
      def main(args: Array[String]): Unit = {  
      
      }  
    
      def method1():Unit = {  
      
        val sparkConf = new SparkConf().setAppName("DataFrametoRDDofReflection").setMaster("local[2]")  
        val sc = new SparkContext(sparkConf)  
        val sqlContext = new SQLContext(sc)  
        
        // 引入 sqlContext.implicits._
        import sqlContext.implicits._  
      
        // 将 RDD 转成 DataFrame    
    	/*val people = sc.textFile("people.txt").toDF()*/    
    	val people = sc.textFile("people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()  
      
        people.show()  
      
        people.registerTempTable("people")  
        val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19")  
        teenagers.show()  
      
        // DataFrame 转成 RDD 进行操作:根据索引号取值  
        teenagers.map(t=>"Name:" + t(0)).collect().foreach(println)  
      
        // DataFrame 转成 RDD 进行操作:根据字段名称取值  
        teenagers.map(t=>"Name:" + t.getAs[String]("name")).collect().foreach(println)  
      
        // DataFrame 转成 RDD 进行操作:一次返回多列的值  
        teenagers.map(_.getValuesMap[Any](List("name","age"))).collect().foreach(println)  
      
        sc.stop()  
      
      }
    
      
      /**  
       * 定义 Person 类  
       * @param name 姓名  
       * @param age 年龄  
       */  
      case class Person(name:String,age:Int)  
      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    ② 通过编程接口执行 Scheme

    通过 Spark SQL 的接口创建 RDD 的 Schema,这种方式会让代码比较冗长。这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成 Schema。可以通过以下三步创建 DataFrame:

    • 第一步将 RDD 转为包含 row 对象的 RDD
    • 第二步基于 structType 类型创建 Schema,与第一步创建的 RDD 想匹配
    • 第三步通过 SQLContext 的 createDataFrame 方法对第一步的 RDD 应用 Schema
    package sparksql  
      
    import org.apache.spark.sql.SQLContext  
    import org.apache.spark.{SparkConf, SparkContext}  
      
    object DataFrametoRDDofInterface {  
      
      def main(args: Array[String]): Unit = {  
        method2()  
      }  
      
      def method2(): Unit = {  
        val sparkConf = new SparkConf().setAppName("DataFrametoRDDofInterface").setMaster("local[2]")  
        val sc = new SparkContext(sparkConf)  
        val sqlContext = new SQLContext(sc)  
      
        import sqlContext.implicits._  
        
        val people = sc.textFile("people.txt")  
      
        // 以字符串的方式定义 DataFrame 的 Schema 信息  
        val schemaString = "name age"  
      
        // 导入所需要的类  
        import org.apache.spark.sql.Row  
        import org.apache.spark.sql.types.{StructType,StructField,StringType}  
      
        // 根据自定义的字符串 schema 信息产生 DataFrame 的 Schema    
        val schema = StructType(  
          schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true)))  
        
        // 将 RDD 转换成 Row    
        val rowRDD = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))  
      
        // 将 Schema 作用到 RDD 上  
        val peopleDataFrame = sqlContext.createDataFrame(rowRDD,schema)  
      
        // 将 DataFrame 注册成临时表  
        peopleDataFrame.registerTempTable("people")  
      
        // 获取 name 字段的值  
        val results = sqlContext.sql("SELECT name FROM people")  
        results.map(t => "Name" + t(0)).collect().foreach(println) 
         
        sc.stop()  
        
      }  
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    在这里插入图片描述

  • 相关阅读:
    企业安全—DevSecOps概述详情
    浅谈EDR绕过
    保姆级教学!!! GIT:将本地文件夹上传到github仓库中
    RK3568 gpio 复用控制使用操作记录
    rpm环境安装dpkg包管理工具
    PWC光流估计网络
    顶礼膜拜!阿里内部出品,全网首发 Spring Security 项目实战搭建
    复习单片机:外部中断(内含:1.外部中断原理图+2 外部中断配置+3 硬件设计+4 软件设计+5.实验现象)
    JVM类加载机制详解
    MySQL 多表关联一对多查询实现取最新一条数据
  • 原文地址:https://blog.csdn.net/qq_21484461/article/details/126647852