介绍
特点
SparkSQL出现的原因

理解
特征
RDD和DataFrame的区别
优点
映射关系的转换方式
DataFrame的创建方式
/**
* 10,ACCOUNTING,NEWYORK
* 20,RESEARCH,DALLAS
* 30,SALES,CHICAGO
* 40,OPERATIONS,BOSTON
*/
object HelloSparkCaseClass {
def main(args: Array[String]): Unit = {
//创建对象
val sparkSession: SparkSession =SparkSession.builder().master("local").appName("Hello02SparkSql").getOrCreate()
//日志级别
val sparkContext: SparkContext = sparkSession.sparkContext
sparkContext.setLogLevel("ERROR")
//读取数据
val lines: RDD[String] =sparkContext.textFile("src/main/resources/dept.sql")
val depts: RDD[Dept] = lines.map(_.split(",")).map(ele => newDept(ele(0).toInt, ele(1), ele(2)))
//开始隐式转换
import sparkSession.implicits._
val dataFrame: DataFrame = depts.toDF()
//打印信息-------DSL风格
dataFrame.show()
dataFrame.printSchema()
println(dataFrame.count())
dataFrame.columns.foreach(println)
dataFrame.select("deptno", "dname").show()
dataFrame.select("deptno", "dname").filter("deptno = 20").show()
dataFrame.groupBy("dname").count().show()
//打印信息-------SQL风格
dataFrame.createOrReplaceTempView("t_dept")
sparkSession.sql("select * from t_dept").show()
}
}
case class Dept(deptno: Int, dname: String, loc: String)
/**
* 10,ACCOUNTING,NEWYORK
* 20,RESEARCH,DALLAS
* 30,SALES,CHICAGO
* 40,OPERATIONS,BOSTON
*/
object HelloSparkStruct {
def main(args: Array[String]): Unit = {
//创建对象
val sparkSession: SparkSession =SparkSession.builder().master("local").appName("Hello02SparkSql").getOrCreate()
//日志级别
val sparkContext: SparkContext = sparkSession.sparkContext
sparkContext.setLogLevel("ERROR")
//读取数据
val lines: RDD[String] =sparkContext.textFile("src/main/resources/dept.sql")
val depts = lines.map(_.split(",")).map(ele => Row(ele(0).toInt,ele(1), ele(2)))
//创建DataFrame
val dataFrame: DataFrame = sparkSession.createDataFrame(depts,structType)
//打印数据
dataFrame.show()
}
//创建表格的类型
val structType: StructType = StructType(
List(
StructField("deptno", DataTypes.IntegerType),
StructField("dname", DataTypes.StringType),
StructField("loc", DataTypes.StringType)
)
)
}

理解
DataSet和DataFrame的关系
DataSet和RDD的关系
DataSet的创建实现
import org.apache.spark.sql.{Dataset, SparkSession}
/**
* {"deptno":10,"dname":"ACCOUNTING","loc":"NEWYORK"}
* {"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}
* {"deptno":30,"dname":"SALES","loc":"CHICAGO"}
* {"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}
*/
object HelloDataSetJson {
def main(args: Array[String]): Unit = {
//创建SQL环境
val sparkSession =SparkSession.builder().master("local").appName("Hello02DataFrameAvg").getOrCreate()
import sparkSession.implicits._
val dataSet: Dataset[DeptJson] =sparkSession.read.json("src/main/data/dept.json").as[DeptJson]
//打印数据
dataSet.show()
}
}
在spark2 版本中 dataFram源码已被移除,但是约定 DataFrame-DataSet[Row]
show
show() 以表格的形式在输出中展示DataFrame中的数据,类似于 select * from spark_sql_test 的功能。
show 只显示前20条记录。
show(numRows: Int) 显示 numRows 条
show(truncate: Boolean) 是否最多只显示20个字符,默认为 true 。
show(numRows: Int, truncate: Boolean) 综合前面的显示记录条数,以及对过长字符串的显示格式。
collect
collectAsList
describe(cols: String*)
first, head, take, takeAsList
groupBy :根据字段进行 group by 操作
cube 和 rollup :group by的扩展
GroupedData对象
agg
示例
jdbcDF.agg("id" -> "max", "c4" -> "sum")
作用
使用
dataFrame.write.format("json").mode(SaveMode.Overwrite).save()
数据处理的流程:数据源—>DataSet—>DSL操作数据
Spark任务转化流程
首先拿到 sql 后解析一批未被解决的逻辑计划,
再经过分析得到分析后的逻辑计划,
再经过一批优化规则转换成一批最佳优化的逻辑计划,
再经过 SparkPlanner 的策略转化成一批物理计划,
随后经过消费模型转换成一个个的 Spark 任务执行
SparkSQL执行流程

谓词下推是Catalyst优化器的两个重要优化方案之一
理解
作用
图解
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dU2YUN0h-1658574052102)(C:/Users/18446/AppData/Roaming/Typora/typora-user-images/image-20220723183614389.png)]