• Dataset 的基础知识和RDD转换为DataFrame


    目录

    Dataset 的基础知识

    一、Dataset简介

    二、Dataset对象的创建

    RDD转换为DataFrame

    一、反射机制创建DataFrame

    二、编程方式定义Schema,创建DataFrame


    Dataset 的基础知识

    一、Dataset简介

    Dataset是从Spark1.6 Alpha版本中引入的一个新的数据抽线结构,最懂在Spark2.0版本被定义成Spark新特性。

    RDD, DataFrame,Dataset数据对比

    1 RDD数据没有数据类型和元数据信息

    2 DataFrame添加了Schema信息,每一行的类型固定为Row,每一列的值无法直接访问

    3 在RDD的基础上增加了一个数据类型,可以拥有严格的错误检查机制。

    4 Dataset和DataFrame拥有完全相同的成员函数

    二、Dataset对象的创建

    从RDD生成Dataset

    val personDataSet = spark.createDataset(sc.textFile("/spark/person.txt"))

    查看结构

    personDataSet.show

    DateSet 可以直接转换为DataFrame

    personDataSet.toDF()

    personDataSet.toDF().show

    RDD转换为DataFrame

     通常有两种方法实现基于RDD转换到DataFrame,第一种情况当一直RDD的数据结构(元数据信息),可以通过反射机制来推断生产Schema,另外一种情况,如果不清楚Dataset的数据结构,可以通过编程接口实现。

    一、反射机制创建DataFrame

    Step1  添加依赖(在项目里的一个pom.xml的文件里面)

    1. #依赖需要添加到标签
    2. #<dependencies>
    3. <依赖添加位置>
    4. #</dependencies>
    5.     <dependency>
    6.     <groupId>org.apache.spark</groupId>
    7.     <artifactId>spark-sql_2.11</artifactId>
    8.     <version>2.3.2</version>
    9. </dependency>

               

    Step 2  编写代码:

    一:反射创建DataFrame

    创建一个名为CaseClassSchema的scala项目

    1. package cn.itcast
    2. import org.apache.spark.SparkContext
    3. import org.apache.spark.rdd.RDD
    4. import org.apache.spark.sql.{DataFrame, SparkSession}
    5. case class Person(id:Int,name:String,age:Int)
    6. object CaseClassSchema {
    7.   def main(args: Array[String]): Unit = {
    8.     val spark : SparkSession=SparkSession.builder()
    9.       .appName("CaseClassSchema")
    10.       .master("local[2]")
    11.       .getOrCreate()
    12.     //2.获取SparkContext对象
    13.     val sc:SparkContext=spark.sparkContext
    14.     //设置日志打印级别
    15.     sc.setLogLevel("WARN")
    16.     //3.读取文件
    17.     val data: RDD[Array[String]]=
    18.       sc.textFile("F://spark_chapter02//src//main//scala//cn//itcast//person.txt").map(x=>x.split(" "))
    19.     //4.将RDD与样例关联
    20.   
    21.  val personRdd: RDD[Person]=
    22.       data.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
    23.     //5.获取DataFrame
    24.     //手动导入隐式转换
    25.     import spark.implicits._
    26.     val personDF:DataFrame=personRdd.toDF
    27.     //-----------DSL风格操作开始----------------
    28.     //1.显示DataFrame的数据,默认显示20
    29.     personDF.show()
    30.     //2.显示DataFrame的schema信息
    31.     personDF.printSchema()
    32.     //3.统计DataFrame中年龄大于30岁的人数
    33.     println(personDF.filter($"age">30).count())
    34.     //-----------DSL风格操作结束----------------
    35.     //-----------SQL风格操作开始----------------
    36.     //将DataFrame注册成表
    37.     personDF.createOrReplaceTempView("t_person")
    38.     spark.sql("select * from t_person").show()
    39.     spark.sql("select * from t_person where name='zhangsan'").show()
    40.     //-----------SQL风格操作结束----------------
    41.     //关闭资源操作
    42.     sc.stop()
    43.     spark.stop()
    44.   }
    45. }
     
    

    二、编程方式定义Schema,创建DataFrame

    编程方式的步骤

    Step1 :  创建一个Row对象结构的RDD

    创建一个名为SparkSqlSchema的scala文件

    1. package cn.itcast
    2. import org.apache.spark.SparkContext
    3. import org.apache.spark.rdd.RDD
    4. import org.apache.spark.sql.types.{IntegerType, StructType,StructField,StringType}
    5. import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    6. //case class Person(id:Int,name:String,age:Int)
    7. object SparkSqlSchema {
    8.   def main(args: Array[String]): Unit = {
    9.     val spark : SparkSession=SparkSession.builder()
    10.       .appName("SparkSqlSchema")
    11.       .master("local[2]")
    12.       .getOrCreate()
    13.     //2.获取SparkContext对象
    14.     val sc:SparkContext=spark.sparkContext
    15.     //设置日志打印级别
    16.     sc.setLogLevel("WARN")
    17.     //3.读取文件
    18.     val dataRDD: RDD[String]=
    19.       sc.textFile("F://spark_chapter02//src//main//scala//cn//itcast//person.txt")
    20.     //4.将RDD与样例关联
    21.     val dataArrayRDD:RDD[Array[String]]=dataRDD.map(_.split(" "))
    22.     //5.加载数据到Row对象中
    23.     val personRDD: RDD[Row]=dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
    24.     //6.创建Schema
    25.     val schema:StructType=StructType(Seq(
    26.       StructField("id",IntegerType,false),
    27.       StructField("name",StringType,false),
    28.       StructField("age",IntegerType,false)
    29.     ))
    30.     //7.利用personRDD与Schema创建DataFrame
    31.     val personDF:DataFrame = spark.createDataFrame(personRDD,schema)
    32.     //8.DSL操作显示DataFrame的数据结果
    33.     personDF.show()
    34.     //9.将DataFrame注册成表
    35.     personDF.createOrReplaceTempView("t_person")
    36.     //10.sql语句操作
    37.     spark.sql("select * from t_person").show()
    38.     //11.关闭资源
    39.     sc.stop()
    40.     spark.stop()
    41.   }
    42. }

  • 相关阅读:
    大数据处理技术:MapReduce综合实训
    startsWith()方法的使用
    MyBatis和Hibernate的区别
    使用Oracle VM VirtualBox安装Unbuntu虚拟机并安装增强功能(实现双向复制粘贴)
    华为认证云计算专家(HCIE-Cloud Computing)–单选题
    2022最新PPT模板,免费下载
    【EC200U】何为QuecPython以及QPYcom基础操作
    单片机和FreeRTOS上跑机器人ROS的应用
    猿创征文 |【数据结构】3个例题带你搞定图的遍历:深度优先搜索
    window系统 安装 nvm 详细步骤
  • 原文地址:https://blog.csdn.net/m0_57781407/article/details/126514865