版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
传送门:大数据系列文章目录
官方网址:http://spark.apache.org/、 http://spark.apache.org/sql/
回顾一下,如何使用Hive进行数据分析的,提供哪些方式交互分析???
方式一: 交互式命令行(CLI)
方式二: 启动服务HiveServer2(Hive ThriftServer2)
SparkSQL模块从Hive框架衍生发展而来,所以Hive提供的所有功能(数据分析交互式方式)
都支持,文档: http://spark.apache.org/docs/2.4.5/sql-distributed-sql-engine.html。
SparkSQL提供spark-sql命令,类似Hive中bin/hive命令,专门编写SQL分析,启动命令如下:
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-sql --master local[2] --conf spark.sql.shuffle.partitions=4
编写SQL执行,截图如下:
此种方式, 目前企业使用较少, 主要使用下面所述ThriftServer服务,通过Beeline连接执行SQL。
Spark Thrift Server将Spark Applicaiton当做一个服务运行,提供Beeline客户端和JDBC方式访
问,与Hive中HiveServer2服务一样的。此种方式必须掌握:在企业中使用PySpark和SQL分析数据,
尤其针对数据分析行业。
Spark Thrift JDBC/ODBC Server 依赖于HiveServer2服务(依赖JAR包) ,所有要想使用此功
能, 在编译Spark源码时,支持Hive Thrift。
注意:启动Spark Thrift JDBC/ODBC Server时,不需要HiveServer2服务。
在$SPARK_HOME目录下的sbin目录,有相关的服务启动命令:
SPARK_HOME=/export/server/spark
$SPARK_HOME/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10000 \
--hiveconf hive.server2.thrift.bind.host=node1 \
--master local[2]
SparkSQL类似Hive提供beeline客户端命令行连接ThriftServer,启动命令如下:
/export/server/spark/bin/beeline
Beeline version 1.2.1.spark2 by Apache Hive
beeline> !connect jdbc:hive2://node1:10000
Connecting to jdbc:hive2://node1:10000
Enter username for jdbc:hive2://node1:10000: root
Enter password for jdbc:hive2://node1:10000: ****
在实际大数据分析项目中,使用SparkSQL时,往往启动一个ThriftServer服务,分配较多资源
(Executor数目和内存、 CPU),不同的用户启动beeline客户端连接,编写SQL语句分析数据。
SparkSQL中提供类似JDBC/ODBC方式,连接Spark ThriftServer服务,执行SQL语句,首先添
加Maven依赖库:
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-hive-thriftserver_2.11artifactId>
<version>2.4.5version>
dependency>
参考文档: https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC
测试数据:
-- 创建表
-- 雇员表EMP
create table IF NOT EXISTS db_hive.emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
-- 部门表DEPT
create table IF NOT EXISTS db_hive.dept(
deptno int,
dname string,
loc string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
-- 加载数据至表中
load data local inpath '/opt/datas/emp.txt' overwrite into table emp ;
load data local inpath '/opt/datas/dept.txt' overwrite into table dept ;
dept.txt
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
emp.txt
7369 SMITH CLERK 7902 1980-12-17 800.00 20
7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-4-2 2975.00 20
7654 MARTIN SALESMAN 7698 1981-9-28 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-5-1 2850.00 30
7782 CLARK MANAGER 7839 1981-6-9 2450.00 10
7788 SCOTT ANALYST 7566 1987-4-19 3000.00 20
7839 KING PRESIDENT 1981-11-17 5000.00 10
7844 TURNER SALESMAN 7698 1981-9-8 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-5-23 1100.00 20
7900 JAMES CLERK 7698 1981-12-3 950.00 30
7902 FORD ANALYST 7566 1981-12-3 3000.00 20
7934 MILLER CLERK 7782 1982-1-23 1300.00 10
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
/**
* SparkSQL 启动ThriftServer服务,通过JDBC方式访问数据分析查询
* i). 通过Java JDBC的方式,来访问Thrift JDBC/ODBC server,调用Spark SQL,并直接查询Hive中的数据
* ii). 通过Java JDBC的方式,必须通过HTTP传输协议发送thrift RPC消息, Thrift JDBC/ODBC server必须通过上面命
* 令启动HTTP模式
*/
object SparkThriftJDBC {
def main(args: Array[String]): Unit = {
// 定义相关实例对象,未进行初始化
var conn: Connection = null
var pstmt: PreparedStatement = null
var rs: ResultSet = null
try {
// TODO: a. 加载驱动类
Class.forName("org.apache.hive.jdbc.HiveDriver")
// TODO: b. 获取连接Connection
conn = DriverManager.getConnection(
"jdbc:hive2://node1:10000/db_hive",
"root",
"123456"
)
// TODO: c. 构建查询语句
val sqlStr: String =
"""
|select e.ename, e.sal, d.dname from emp e join dept d on e.deptno = d.deptno
""".stripMargin
pstmt = conn.prepareStatement(sqlStr)
// TODO: d. 执行查询,获取结果
rs = pstmt.executeQuery()
// 打印查询结果
while (rs.next()) {
println(s"empno = ${rs.getInt(1)}, ename = ${rs.getString(2)}, sal = ${
rs.getDouble(3
)
}, dname = ${rs.getString(4)}")
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (null != rs) rs.close()
if (null != pstmt) pstmt.close()
if (null != conn) conn.close()
}
}
}
在SparkSQL系列-4、数据处理分析【案例:电影评分数据分析】中,运行应用程序代码, 通过WEB UI界面监控可以看
出, 无论使用DSL还是SQL, 构建Job的DAG图一样的, 性能是一样的,原因在于SparkSQL中引擎:Catalyst:将SQL和DSL转换为相同逻辑计划。
Spark SQL是Spark最新,技术最复杂的组件之一。它为SQL查询和新的DataFrame API
提供支持。 Spark SQL的核心是Catalyst优化器,它以一种新颖的方式利用高级编程语言功
能(例如Scala的模式匹配和quasiquotes)来构建可扩展的查询优化器。
SparkSQL的Catalyst优化器是整个SparkSQL pipeline的中间核心部分,其执行策略主要两方向:
从上图可见,无论是直接使用SQL语句还是使用 ataFrame,都会经过一些列步骤转换成DAG对RDD的操作。
Catalyst工作流程: SQL语句首先通过Parser模块被解析为语法树,此棵树称为Unresolved
Logical Plan; Unresolved Logical Plan通过Analyzer模块借助于数据元数据解析为Logical Plan;
此时再通过各种基于规则的Optimizer进行深入优化,得到Optimized Logical Plan; 优化后的逻辑
执行计划依然是逻辑的,需要将逻辑计划转化为Physical Plan。
核心三个点: