• Spark RDD转换成DataFrame的两种方式


    • spark官方提供了两种方法实现从RDD转换到DataFrame。
    • 第一种方法是利用反射机制来推断包含特定类型对象的Schema,这种方式适用于对已知的数据结构的RDD转换;
    •  第二种方法通过编程接口构造一个 Schema ,并将其应用在已知的RDD数据中。

    一、反射机制推断Schema

    实现反射机制Schema需要定义一个case class样例类,定义字段和属性,样例类的参数名称会被反射机制利用作为列名

    1. object RddToDataFrameByReflect {
    2. //定义一个student样例类
    3. case class Student(name:String,age:Int)
    4. def main(args: Array[String]): Unit = {
    5. val conf = new SparkConf().setMaster("local")
    6. //创建sparkSession对象
    7. val sparkSession = SparkSession.builder().appName("RddToDataFrameByReflect").config(conf).getOrCreate()
    8. //获取sparkContext
    9. val sc = sparkSession.sparkContext
    10. //设置打印的日志等级
    11. sc.setLogLevel("WARN")
    12. val dataRDD = sc.parallelize(Array(("java", 18), ("tom", 20), ("libaowen", 30)))
    13. //基于反射直接把包含student对象的dataRDD转成DataFrame
    14. //需要导入隐式转换
    15. import sparkSession.implicits._
    16. val stuDf = dataRDD.map(tup => Student(tup._1, tup._2)).toDF()
    17. //将dataFrame注册成表
    18. stuDf.createOrReplaceTempView("student")
    19. val resDF = sparkSession.sql("select name,age from student")
    20. //将DataFrame转换成RDD
    21. val resRDD = resDF.rdd
    22. resRDD.map(row=>Student(row.getAs[String]("name"),row.getAs("age")))
    23. .collect().foreach(println(_))
    24. sparkSession.stop()
    25. }
    26. }

    1、定义的Student的case类,这是因为在利用反射机制推断RDD模式时,首先需要定义一个case类,spark sql可以自动将包含case类的rdd隐式转换成dataFrame,case类定义了table的结构,case类的属性通过反射机制变成了表的列名

    2、dataRDD.map(tup => Student(tup._1, tup._2)).toDF()方法是把RDD转换成dataFrame,在调用toDF()方法之前需要手动添加spark.implicits._包

    3、val resRDD = resDF.rdd  将DataFrame转换成RDD

    二、编程方式定义schema

    1、创建一个row对象结构的rdd

    2、基于structType类创建schema

    3、通过sparkSession提供的createDataFrame()方法拼接schema

    1. def main(args: Array[String]): Unit = {
    2. val conf = new SparkConf().setMaster("local")
    3. //创建sparkSession对象
    4. val sparkSession =
    5. SparkSession.builder().appName("RddToDataFrameByReflect").config(conf).getOrCreate()
    6. //获取sparkContext
    7. val sc = sparkSession.sparkContext
    8. //设置打印的日志等级
    9. sc.setLogLevel("WARN")
    10. val dataRDD = sc.parallelize(Array(("java", 18), ("tom", 20), ("libaowen", 30)))
    11. //组装rowRDD
    12. val rowRDD = dataRDD.map(tup => Row(tup._1, tup._2))
    13. //指定元数据信息
    14. val schema = StructType(Array(
    15. StructField("name", StringType, true),
    16. StructField("age", IntegerType, true)
    17. ))
    18. val stuDf = sparkSession.createDataFrame(rowRDD, schema)
    19. stuDf.createOrReplaceTempView("student")
    20. val resDF = sparkSession.sql("select name,age from student")
    21. //将DataFrame转换成RDD
    22. val resRDD = resDF.rdd
    23. resRDD.map(row=>Student(row.getAs[String]("name"),row.getAs("age")))
    24. .collect().foreach(println(_))
    25. sparkSession.stop()
    26. }

    结果:

  • 相关阅读:
    DOM与BOM与Echarts
    springboot+vue企业废物回收员工任务管理系统java
    制作github.io学术个人主页
    优雅的记录日志,拒绝打印模糊信息导致bug定位难
    金仓KFS数据级联场景部署
    05 面向对象
    高防DNS如何实现对DDoS攻击的流量清洗?
    Windows 10 安装 Redis
    如何实现云上 Lakehouse 高性能
    第五十六回 徐宁教使钩镰枪 宋江大破连环马-飞桨图像分类套件PaddleClas初探
  • 原文地址:https://blog.csdn.net/libaowen609/article/details/126400504