实现反射机制Schema需要定义一个case class样例类,定义字段和属性,样例类的参数名称会被反射机制利用作为列名
-
- object RddToDataFrameByReflect {
- //定义一个student样例类
- case class Student(name:String,age:Int)
-
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local")
- //创建sparkSession对象
- val sparkSession = SparkSession.builder().appName("RddToDataFrameByReflect").config(conf).getOrCreate()
- //获取sparkContext
- val sc = sparkSession.sparkContext
- //设置打印的日志等级
- sc.setLogLevel("WARN")
-
- val dataRDD = sc.parallelize(Array(("java", 18), ("tom", 20), ("libaowen", 30)))
- //基于反射直接把包含student对象的dataRDD转成DataFrame
- //需要导入隐式转换
- import sparkSession.implicits._
- val stuDf = dataRDD.map(tup => Student(tup._1, tup._2)).toDF()
- //将dataFrame注册成表
- stuDf.createOrReplaceTempView("student")
- val resDF = sparkSession.sql("select name,age from student")
- //将DataFrame转换成RDD
- val resRDD = resDF.rdd
- resRDD.map(row=>Student(row.getAs[String]("name"),row.getAs("age")))
- .collect().foreach(println(_))
-
- sparkSession.stop()
- }
-
-
- }
1、定义的Student的case类,这是因为在利用反射机制推断RDD模式时,首先需要定义一个case类,spark sql可以自动将包含case类的rdd隐式转换成dataFrame,case类定义了table的结构,case类的属性通过反射机制变成了表的列名
2、dataRDD.map(tup => Student(tup._1, tup._2)).toDF()方法是把RDD转换成dataFrame,在调用toDF()方法之前需要手动添加spark.implicits._包
3、val resRDD = resDF.rdd 将DataFrame转换成RDD
1、创建一个row对象结构的rdd
2、基于structType类创建schema
3、通过sparkSession提供的createDataFrame()方法拼接schema
-
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local")
- //创建sparkSession对象
- val sparkSession =
- SparkSession.builder().appName("RddToDataFrameByReflect").config(conf).getOrCreate()
- //获取sparkContext
- val sc = sparkSession.sparkContext
- //设置打印的日志等级
- sc.setLogLevel("WARN")
- val dataRDD = sc.parallelize(Array(("java", 18), ("tom", 20), ("libaowen", 30)))
-
- //组装rowRDD
- val rowRDD = dataRDD.map(tup => Row(tup._1, tup._2))
- //指定元数据信息
- val schema = StructType(Array(
- StructField("name", StringType, true),
- StructField("age", IntegerType, true)
- ))
- val stuDf = sparkSession.createDataFrame(rowRDD, schema)
-
- stuDf.createOrReplaceTempView("student")
- val resDF = sparkSession.sql("select name,age from student")
- //将DataFrame转换成RDD
- val resRDD = resDF.rdd
-
- resRDD.map(row=>Student(row.getAs[String]("name"),row.getAs("age")))
- .collect().foreach(println(_))
- sparkSession.stop()
-
- }
结果: