- <!--SparkSQL依赖-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
创建person.json文件
- {"name":"张三","age":20},
- {"name":"李四","age":22},
- {"name":"王五","age":19},
- {"name":"赵六","age":21},
- {"name":"田七","age":22}
SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要 有临时视图或者全局视图来辅助
- object SparkSQLDemo1 {
- def main(args: Array[String]): Unit = {
- val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
- import sparkSession.implicits._
- //读取 JSON 文件创建 DataFrame
- val df = sparkSession.read.json("input\\person.json")
- //对 DataFrame 创建一个临时表
- df.createOrReplaceTempView("person")
- //通过 SQL 语句实现查询年龄大于20
- val result = sparkSession.sql("select * from person where age > 20")
- //结果展示
- result.show()
- sparkSession.stop()
- }
- }
运行结果:
- +---+----+
- |age|name|
- +---+----+
- | 22| 李四|
- | 21| 赵六|
- | 22| 田七|
- +---+----+
DSL 语法:DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。 可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了
- object SparkSQLDemo2 {
- def main(args: Array[String]): Unit = {
- val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
- import sparkSession.implicits._
- //创建一个 DataFrame
- val df = sparkSession.read.json("input\\person.json")
- //只查看"name"列数据,
- df.select("name").show()
- sparkSession.stop()
- }
- }
运行结果:
- +----+
- |name|
- +----+
- | 张三|
- | 李四|
- | 王五|
- | 赵六|
- | 田七|
- +----+
在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入
import spark.implicits._
这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必 须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。
spark-shell 中无需导入,自动完成此操作。
- object SparkSQLDemo3 {
- def main(args: Array[String]): Unit = {
- val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
- import sparkSession.implicits._
- val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
- val df = rdd.toDF("name","age")
- df.show()
- sparkSession.stop()
- }
- }
运行结果:
- +----+---+
- |name|age|
- +----+---+
- | 张三| 20|
- | 李四| 19|
- | 王五| 21|
- +----+---+
DataFrame 其实是 DataSet 的特例,所以它们之间是可以互相转换的。
- object SparkSQLDemo4 {
- def main(args: Array[String]): Unit = {
- val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
- import sparkSession.implicits._
- val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
- val df = rdd.toDF("name","age")
- val ds = df.as[Person2]
- ds.show()
- sparkSession.stop()
- }
- }
- //样例类
- case class Person2(name:String,age:Int)
运行结果:
- +----+---+
- |name|age|
- +----+---+
- | 张三| 20|
- | 李四| 19|
- | 王五| 21|
- +----+---+
DataFrame 其实就是对 RDD 的封装,所以可以直接获取内部的 RDD
- object SparkSQLDemo5 {
- def main(args: Array[String]): Unit = {
- val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
- import sparkSession.implicits._
- val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
- val df = rdd.toDF("name","age")
- val result = df.rdd
- result.foreach(line=>println(line.getString(0),line.getInt(1)))
- sparkSession.stop()
- }
- }
运行结果:
- (李四,19)
- (张三,20)
- (王五,21)
SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结 构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结 构。
- object SparkSQLDemo6 {
- def main(args: Array[String]): Unit = {
- val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
- import sparkSession.implicits._
- val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
- val ds = rdd.map{
- case(name,age)=>Person2(name,age)
- }.toDS()
- ds.show()
- sparkSession.stop()
- }
- }
- //样例类
- case class Person2(name:String,age:Int)
运行结果:
- +----+---+
- |name|age|
- +----+---+
- | 张三| 20|
- | 李四| 19|
- | 王五| 21|
- +----+---+