Hive 是 Shark 的前身,Shark 是 SparkSQL 的前身,SparkSQL 产生的根本原因是其完全脱离了 Hive 的限制。
- Spark on Hive: Hive 只作为储存角色,Spark 负责 sql 解析优化,执行。
- Hive on Spark:Hive 即作为存储又负责 sql 的解析优化,Spark 负责执行。
两者数据源均为Hive表,底层人物均为Spark人物,关键区别在于一个是Hive去解析,一个是Spark去解析。

Dataset 也是一个分布式数据容器。与 RDD 类似,然而 Dataset 更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息(元数据),即schema。同时,与 Hive 类似,Dataset 也支持嵌套数据类型(struct、array 和 map)。从 API 易用性的角度上 看, Dataset API 提供的是一套高层的关系操作,比函数式的 RDD API 要更加友好,门槛更低。Dataset 的底层封装的是 RDD,当 RDD 的泛型是 Row 类型的时候,我们也可以称它为 DataFrame。即 Dataset
SparkSQL 的数据源可以是 JSON 类型的字符串,JDBC,Parquent,Hive,HDFS 等。

首先拿到 sql 后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过 SparkPlanner 的策略转化成一批物理计划,随后经过消费模型转换成一个个的 Spark 任务执行。


ds.rdd()/ds.javaRdd()。package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class CreateDSFromJosonFile {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("jsonfile")
.master("local")
.getOrCreate();
/**
* Dataset的底层是一个一个的RDD
* 注意:
* 当 Dataset里的泛型是Row时,我们又可以称之为dataframe
*
* 以下两种方式都可以读取json格式的文件
*
*/
Dataset<Row> ds = sparkSession.read().format("json").load("data/json");
// Dataset ds = sparkSession.read().json("data/json");
ds.show();
/**
* 显示DataSet中的内容,默认显示前20行。如果现实多行要指定多少行show(行数)
* 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。
*/
ds.show(100);
/**
*DataSet转换成RDD
*/
// JavaRDD javaRDD = ds.javaRDD();
/**
* 树形的形式显示schema信息
*/
ds.printSchema();
/**
* dataset自带的API 操作dataset
*/
//select name from table
ds.select("name").show();
//select name ,age+10 as addage from table
ds.select(ds.col("name"),ds.col("age").plus(10).alias("addage")).show();
//select name ,age from table where age>19
ds.select(ds.col("name"),ds.col("age")).where(ds.col("age").gt(19)).show();
//select age,count(*) from table group by age
ds.groupBy(ds.col("age")).count().show();
/**
* 将dataset注册成临时的一张表,这张表相当于临时注册到内存中,逻是辑上的表,不会雾化到磁盘
*/
ds.createOrReplaceTempView("jtable");
//ds.registerTempTable("jtable");
Dataset<Row> sql = sparkSession.sql("select age,count(*) as gg from jtable group by age");
sql.show();
//
// Dataset sql2 = sparkSession.sql("select name,age from jtable");
// sql2.show();
sparkSession.stop();
}
}
+----+--------+
| age| name|
+----+--------+
| 18|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 20|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 28|zhangsan|
|null| lisi|
| 18| wangwu|
+----+--------+
+----+--------+
| age| name|
+----+--------+
| 18|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 20|zhangsan|
|null| lisi|
| 18| wangwu|
| 28| laoliu|
| 28|zhangsan|
|null| lisi|
| 18| wangwu|
+----+--------+
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+--------+
| name|
+--------+
|zhangsan|
| lisi|
| wangwu|
| laoliu|
|zhangsan|
| lisi|
| wangwu|
| laoliu|
|zhangsan|
| lisi|
| wangwu|
+--------+
+--------+------+
| name|addage|
+--------+------+
|zhangsan| 28|
| lisi| null|
| wangwu| 28|
| laoliu| 38|
|zhangsan| 30|
| lisi| null|
| wangwu| 28|
| laoliu| 38|
|zhangsan| 38|
| lisi| null|
| wangwu| 28|
+--------+------+
+--------+---+
| name|age|
+--------+---+
| laoliu| 28|
|zhangsan| 20|
| laoliu| 28|
|zhangsan| 28|
+--------+---+
+----+---+
| age| gg|
+----+---+
|null| 3|
| 28| 3|
| 18| 4|
| 20| 1|
+----+---+
package com.shsxt.java_Test.sql.dataset;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import java.util.Arrays;
/**
* 读取json格式的RDD创建DF
* @author root
*
*/
public class CreateDSFromJsonRDD {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("jsonrdd")
.master("local")
.getOrCreate();
/**
* 注意:
* 1.由于是java版,故通过javaSparkcontext.parallelize来创建json格式的JavaRDD
* 所以我们通过sparkcontext来创建javaSparkcontext
* 2.如果是scala版本,直接通过sparkcontext.parallelize来创建,就无需创建javaSparkcontext
*/
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> nameRDD = jsc.parallelize(Arrays.asList(
"{'name':'zhangsan','age':\"18\"}",
"{\"name\":\"lisi\",\"age\":\"19\"}",
"{\"name\":\"wangwu\",\"age\":\"20\"}"
));
JavaRDD<String> scoreRDD = jsc.parallelize(Arrays.asList(
"{\"name\":\"zhangsan\",\"score\":\"100\"}",
"{\"name\":\"lisi\",\"score\":\"200\"}",
"{\"name\":\"wangwu\",\"score\":\"300\"}"
));
Dataset<Row> nameds = sparkSession.read().json(nameRDD);
Dataset<Row> scoreds = sparkSession.read().json(scoreRDD);
//注册成临时表使用
nameds.createOrReplaceTempView("nameTable");
scoreds.createOrReplaceTempView("scoreTable");
Dataset<Row> result =
sparkSession.sql("select nameTable.name,nameTable.age,scoreTable.score "
+ "from nameTable join scoreTable "
+ "on nameTable.name = scoreTable.name");
result.show();
sc.stop();
}
}
+--------+---+-----+
| name|age|score|
+--------