DataSet是具有强类型的数据集合,需要提供对应的类型信息。
scala> case class Person(name: String, age: Long)
defined class Person
scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]
scala> caseClassDS.show
+---------+---+
| name|age|
+---------+---+
| zhangsan| 2|
+---------+---+
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
+-----+
注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet
SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seq或者Array等复杂的结构。
# 创建RDD
scala> val userRDD = sc.textFile("data/user.txt")
userRDD: org.apache.spark.rdd.RDD[String] = data/user.txt MapPartitionsRDD[1] at textFile at <console>:24
# 创建样例类
scala> case class User(name:String, age:Int)
defined class User
# 转换
scala> userRDD.map(x=>{var fields = x.split(" ");User(fields(0),fields(1).toInt)}).toDS.show
+--------+---+
| name|age|
+--------+---+
|zhangsan| 20|
| lisi| 30|
| wangwu| 40|
|xiaowang| 50|
+--------+---+
DataSet其实也是对RDD的封装,所以可以直接获取内部的RDD
scala> userRDD.map(x=>{var fields = x.split(" ");User(fields(0),fields(1).toInt)}).toDS
res2: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
# DataSet转换成RDD后,类型为User,强类型
scala> res2.rdd
res3: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[9] at rdd at <console>:26
# DataFrame转换成RDD后,类型为ROW,弱类型
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
scala> df.rdd
res4: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[18] at rdd at <console>:26
scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> case class User(name:String, age:Int)
defined class User
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
scala> ds.toDF
res9: org.apache.spark.sql.DataFrame = [name: string, age: int]
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上 import
spark.implicits._ 不然toDF、toDS无法使用。
在SparkSQL中Spark提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口。

实际开发中,都是使用IDEA进行开发的
org.apache.spark
spark-sql_2.12
3.0.0
object SparkSQL01_Demo {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
//创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//RDD=>DataFrame=>DataSet转换需要引入隐式转换规则,否则无法转换
//spark不是包名,也不是类名,是上下文SparkSession环境对象名
import spark.implicits._
//读取json文件 创建DataFrame
val df: DataFrame = spark.read.json("input/test.json")
//df.show()
//SQL风格语法
df.createOrReplaceTempView("user")
//spark.sql("select avg(age) from user").show
//DSL风格语法
//df.select("username","age").show()
//*****RDD=>DataFrame=>DataSet*****
//创建RDD
val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu",20)))
//RDD转换成DataFrame
val df1: DataFrame = rdd1.toDF("id","name","age")
//df1.show()
//DataFrame转换成DateSet
val ds1: Dataset[User] = df1.as[User]
//ds1.show()
//*****DataSet=>DataFrame=>RDD*****
//DateSet转换成DataFrame
val df2: DataFrame = ds1.toDF()
//DataFrame转换成RDD
//返回的RDD类型为Row,里面提供的getXXX方法可以获取字段值,类似jdbc处理结果集,但是索引从0开始
val rdd2: RDD[Row] = df2.rdd
//rdd2.foreach(a=>println(a.getString(1)))
//*****RDD=>DataSet*****
rdd1.map{
case (id,name,age)=>User(id,name,age)
}.toDS()
//*****DataSet=>=>RDD*****
ds1.rdd
//释放资源
spark.stop()
}
}
case class User(id:Int,name:String,age:Int)
用户可以通过spark.udf功能添加自定义函数,实现自定义功能
输入一行,返回一个结果。在Shell窗口中可以通过spark.udf功能用户可以自定义函数。
需求:自动定义UDF函数,在每一个查询的名字前面添加问候语
def main(args: Array[String]): Unit = {
//创建SparkConf配置文件对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_UDF")
//创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//创建DF
val df: DataFrame = spark.read.json("data/user.json")
//注册自定义函数
spark.udf.register("addSayHello",(username:String)=>{"hello:" + username})
//创建临时视图
df.createOrReplaceTempView("user")
//通过sql语句从临时视图查询数据
spark.sql("select addSayHello(username),age from user").show()
//释放资源
spark.stop()
}
输入多行,返回一行。强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。
需求:求平均年龄
def main(args: Array[String]): Unit = {
//创建SparkConf配置文件对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL02_UDAF")
//创建sc对象
val sc = new SparkContext(conf)
//创建rdd
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("zhangsan",30),("lisi",40),("wangwu",50)))
//转换数据格式,目的是统计总人数
val mapRdd: RDD[(Int, Int)] = rdd.map {
case (name, age) => {
(age, 1)
}
}
//对年龄和总人数进行聚合操作 (ageSum,numSum)
val res: (Int, Int) = mapRdd.reduce(
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
println(res._1 / res._2)
//释放资源
sc.stop()
}
减少Shuffle,提高效率,模仿LongAccumulator累加器
object SparkSQL03_Accumulator {
def main(args: Array[String]): Unit = {
//创建SparkConf配置文件对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL03_Accumulator")
//创建sc对象
val sc = new SparkContext(conf)
//创建rdd
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("zhangsan",30),("lisi",40),("wangwu",50)))
//创建累加器对象
val ma = new MyAccmulator
//注册累加器
sc.register(ma)
//使用累加器
rdd.foreach{
case (name,age) => {
ma.add(age)
}
}
//打印结果
println(ma.value)
//释放资源
sc.stop()
}
}
// 定义累加器
class MyAccmulator extends AccumulatorV2[Int,Double]{
var ageSum : Int = 0
var countSum : Double = 0
override def isZero: Boolean = {
ageSum == 0 && countSum == 0
}
override def copy(): AccumulatorV2[Int, Double] = {
val ma = new MyAccmulator
this.ageSum = ageSum
this.countSum = countSum
ma
}
override def reset(): Unit = {
ageSum = 0
countSum = 0
}
override def add(age: Int): Unit = {
ageSum += age
countSum += 1
}
override def merge(other: AccumulatorV2[Int, Double]): Unit = {
other match{
case ma: MyAccmulator => {
this.ageSum += ma.ageSum
this.countSum += ma.countSum
}
case _ =>
}
}
override def value: Double = {
ageSum / countSum
}
}
应用于DataFrame,执行一些sql风格的DF查询,更加方便。通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数
object SparkSQL04_UDAF {
def main(args: Array[String]): Unit = {
//创建SparkConf配置文件对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_UDF")
//创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//读取json文件以创建DF
val df: DataFrame = spark.read.json("data/user.json")
//创建自定义函数对象
val myAvg = new MyAvg
//注册自定义函数
spark.udf.register("myAvg",myAvg)
//创建临时视图以查询
df.createOrReplaceTempView("user")
//使用聚合函数进行查询
spark.sql("select myAvg(age) from user").show()
//释放资源
spark.stop()
}
}
//自定义UDAF函数--弱类型
class MyAvg extends UserDefinedAggregateFunction {
//聚合函数输入数据类型
//StructType存放的是多列的类型,每一列的数据类型存放在StructField中
//多个StructField组成了StructType
//IntegerType是SparkSQL为DF创建的数据类型,底层实现就是scala中的int
override def inputSchema: StructType = {
StructType(Array(StructField("age",IntegerType)))
}
//缓存的数据类型
//将输入的数据(age,1)存放到缓存中,之后存放(age,2),一次存放一条数据
//这种数据结构称为StructType
override def bufferSchema: StructType = {
StructType(Array(StructField("ageSum",IntegerType),StructField("count",IntegerType)))
}
//聚合函数返回的数据类型
override def dataType: DataType = DoubleType
//函数的稳定性,相同的输入是否会得到相同的输出
//默认不处理,直接返回True
override def deterministic: Boolean = true
//初始化,将缓存设置为初始状态
override def initialize(buffer: MutableAggregationBuffer): Unit = {
//使年龄归0
buffer(0) = 0
//使总人数归0
buffer(1) = 0
}
//更新缓存中的数据
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
//缓存中数据不为空执行操作
if(!buffer.isNullAt(0)){
buffer(0) = buffer.getInt(0) + input.getInt(0)
buffer(1) = buffer.getInt(1) + 1
}
}
//不同节点数据之间的聚合,分区间的合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getInt(0) + buffer2.getInt(0)
buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
}
//计算逻辑
override def evaluate(buffer: Row): Any = {
buffer.getInt(0).toDouble / buffer.getInt(1)
}
}
应用于DataSet,执行DSL风格语句时,更加方便,定义类继承org.apache.spark.sql.expressions.Aggregator
object SparkSQL05_UDAF {
def main(args: Array[String]): Unit = {
//创建SparkConf配置文件对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_UDF")
//创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//读取json文件以创建DF
val df: DataFrame = spark.read.json("data/user.json")
//创建自定义函数对象
val myAvgNew = new MyAvgNew
// 如果自定义UDAF强类型,没有办法应用到SQL风格的DF查询
// //注册自定义函数
// spark.udf.register("myAvgNew",myAvgNew)
// //创建临时视图以查询
// df.createOrReplaceTempView("user")
// //使用聚合函数进行查询
// spark.sql("select myAvgNew(age) from user").show()
//将df转换为ds
val ds: Dataset[User06] = df.as[User06]
//将自定义函数对象转换为查询列
val col: TypedColumn[User06, Double] = myAvgNew.toColumn
//在查询时,会将查询出来的记录(User06类型)交给自定义的函数处理
ds.select(col).show()
//释放资源
spark.stop()
}
}
//输入类型的样例类
case class User06(name:String,age:Long)
//平均年龄缓存的样例类
case class AgeBuffer(var sum:Long,var count:Long)
//自定义UDAF函数 -- 强类型
class MyAvgNew extends Aggregator[User06,AgeBuffer,Double]{
//对缓存数据进行初始化
override def zero: AgeBuffer = {
AgeBuffer(0,0)
}
//对当前分区内数据进行聚合
override def reduce(b: AgeBuffer, a: User06): AgeBuffer = {
b.sum = b.sum + a.age
b.count = b.count + 1
b
}
//分区间的合并
override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {
b1.sum = b1.sum + b2.sum
b1.count = b1.count + b2.count
b1
}
//返回计算结果
override def finish(buff: AgeBuffer): Double = {
buff.sum.toDouble/buff.count
}
//DataSet默认的编、解码器,用于序列化,固定写法
//用户自定义ref类型就是produce
//系统自带类型根据数据类型进行选择
override def bufferEncoder: Encoder[AgeBuffer] = {
Encoders.product
}
override def outputEncoder: Encoder[Double] = {
Encoders.scalaDouble
}
}
输入一行,返回多行(hive);
SparkSQL中没有UDTF,spark中用flatMap即可实现该功能