创建DataFrame有三种方式:
1. 读外部设备的文件,返回DataFrame对象
2. 从RDD转换成DataFrame对象
3. 读取Hive中的表,返回DataFrame对象
4. 调用createDataFrame方法,返回DataFrame对象
SparkSession 是 Spark 最新的 SQL 查询起始点
历史版本已弃用
- private val spark: SparkSession = SparkSession.builder().master("local").appName("dataSetDemo").getOrCreate()
- import spark.implicits._
示例直接读取json文件,json文件自带结构
- private val ruleDf: DataFrame = spark.read.json("src/data/AAAS_RULE_20220316.000")
- ruleDf.show()
- ruleDf.createOrReplaceTempView("AAAS_RULE")
- spark.sql("select * from AAAS_RULE where is_pack_buy='1'").show
普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。
使用全局临时表时需要全路径访问: global_temp.AAAS_RULE
- ruleDf.createOrReplaceGlobalTempView("Glb_AAAS_RULE")
- private val globalTemp: SparkSession = spark.newSession()
- globalTemp.sql("select * from global_temp.Glb_AAAS_RULE where is_pack_buy!= 1")
- .show()
- //打印表结构
- ruleDf.printSchema()
- ruleDf.select($"API_id" + 1, $"BILL_MODE_LIST").show()
- ruleDf.select('Api_id + 2, 'API_NAME).show()
- //过滤
- ruleDf.filter('Api_id > 1000).show()
- //分组
- ruleDf.groupBy("IS_PACK_BUY").count().show()
RDD -> RDD[case class] -> DataFrame
1、新增样例类,强类型化数据(城市编码,城市名称,归属大区)
case class CityList(cityCode:Int,cityName:String,from:String)
2、case class -> RDD ,RDD关联样例类
- private val cityListRdd: RDD[CityList] = aaasRuleRdd.map(_.split("\t"))
- .map(data => {
- CityList(data(0).toInt, data(1), data(2))
- })
3、RDD -> DataFrame
- private val cityListDf: DataFrame = cityListRdd.toDF()
- cityListDf.show()
- cityListDf.printSchema()
- println("-----------------Rdd 转换为 DataSet-----------------------")
- private val dogList: RDD[(String, Int)] = sc.makeRDD(List(("hotPot",18), ("lorin",19), ("pokey",28)))
- private val dogDataFrame: DataFrame = dogList.toDF()
- dogDataFrame.show()
- dogDataFrame.printSchema()
- println("toDF给定列名之后,数据就是强类型")
- //toDF给定列名之后,数据就是强类型
- private val dataFrame: DataFrame = dogList.toDF("name", "age")
- dataFrame.printSchema()
- println("-----------------Rdd 转换为 DataSet-----------------------")
- //Rdd 转换为 DataSet
- private val dogDataSet: Dataset[(String, Int)] = dogList.toDS()
- dogDataSet.show
- dogDataSet.printSchema()
- //DataFrame转换为DataSet,as后面跟的是[]
- println("-----------------DataFrame转换为DataSet-----------------------")
- //给固定参数类型
- private val dataSet: Dataset[(String, Int)] = dogDataFrame.as[(String, Int)]
- dataSet.show()
- dataSet.printSchema()
- //给样例类
- /*
- * 注意只有给定列名转换的强类型DataFrame才能通过样例类转化为DataSet
- * */
- private val DataSet: Dataset[DogInfo] = dataFrame.as[DogInfo]
- dogDataSet.show()
- println("-----------------DataSet 转换为 DataFrame-----------------------")
- private val dataFrame1: DataFrame = DataSet.toDF()
- private val dataFrame2: DataFrame = DataSet.toDF("name", "age")
- dataFrame1.printSchema()
- dataFrame2.printSchema()
- println("-----------------DataFrame 转换为 RDD-----------------------")
- dataFrame2.printSchema()
- dataFrame2.rdd.collect().foreach(println)
- println("-----------------DataSet 转换为 RDD-----------------------")
- dataSet.printSchema()
- DataSet.rdd.collect().foreach(println)
SQL风格编程:就是编写sql语句,底层翻译成相关算子进行执行
DSL:domain-specific language:使用 DSL 语法风格不必去创建临时视图了
- //查看 DataFrame 的 Schema 信息
- val df = spark.read.json("data/user.json")
- df.printSchema
- //只查看"username"列数据
- df.select("username").show()