• SparkSQL系列-8、分布式SQL引擎和Catalyst 优化器


    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    传送门:大数据系列文章目录

    官方网址http://spark.apache.org/http://spark.apache.org/sql/
    在这里插入图片描述

    回顾一下,如何使用Hive进行数据分析的,提供哪些方式交互分析???

    方式一: 交互式命令行(CLI)

    • bin/hive,编写SQL语句及DDL语句

    方式二: 启动服务HiveServer2(Hive ThriftServer2)

    • 将Hive当做一个服务启动(类似MySQL数据库,启动一个服务),端口为10000
    • 1)交互式命令行, bin/beeline, CDH 版本HIVE建议使用此种方式, CLI方式过时
    • 2)JDBC/ODBC方式,类似MySQL中JDBC/ODBC方式

    SparkSQL模块从Hive框架衍生发展而来,所以Hive提供的所有功能(数据分析交互式方式)
    都支持,文档: http://spark.apache.org/docs/2.4.5/sql-distributed-sql-engine.html。

    Spark SQL CLI

    SparkSQL提供spark-sql命令,类似Hive中bin/hive命令,专门编写SQL分析,启动命令如下:

    SPARK_HOME=/export/server/spark
    ${SPARK_HOME}/bin/spark-sql --master local[2] --conf spark.sql.shuffle.partitions=4
    
    • 1
    • 2

    编写SQL执行,截图如下:
    在这里插入图片描述
    此种方式, 目前企业使用较少, 主要使用下面所述ThriftServer服务,通过Beeline连接执行SQL。

    ThriftServer JDBC/ODBC Server

    Spark Thrift Server将Spark Applicaiton当做一个服务运行,提供Beeline客户端和JDBC方式访
    问,与Hive中HiveServer2服务一样的。此种方式必须掌握:在企业中使用PySpark和SQL分析数据,
    尤其针对数据分析行业。
    在这里插入图片描述
    Spark Thrift JDBC/ODBC Server 依赖于HiveServer2服务(依赖JAR包) ,所有要想使用此功
    能, 在编译Spark源码时,支持Hive Thrift。

    在这里插入图片描述
    注意:启动Spark Thrift JDBC/ODBC Server时,不需要HiveServer2服务。

    在$SPARK_HOME目录下的sbin目录,有相关的服务启动命令:

    SPARK_HOME=/export/server/spark
    $SPARK_HOME/sbin/start-thriftserver.sh \
    --hiveconf hive.server2.thrift.port=10000 \
    --hiveconf hive.server2.thrift.bind.host=node1 \
    --master local[2]
    
    • 1
    • 2
    • 3
    • 4
    • 5

    beeline 客户端

    SparkSQL类似Hive提供beeline客户端命令行连接ThriftServer,启动命令如下:

    /export/server/spark/bin/beeline
    Beeline version 1.2.1.spark2 by Apache Hive
    beeline> !connect jdbc:hive2://node1:10000
    Connecting to jdbc:hive2://node1:10000
    Enter username for jdbc:hive2://node1:10000: root
    Enter password for jdbc:hive2://node1:10000: ****
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在实际大数据分析项目中,使用SparkSQL时,往往启动一个ThriftServer服务,分配较多资源
    (Executor数目和内存、 CPU),不同的用户启动beeline客户端连接,编写SQL语句分析数据。
    在这里插入图片描述

    JDBC/ODBC 客户端

    SparkSQL中提供类似JDBC/ODBC方式,连接Spark ThriftServer服务,执行SQL语句,首先添
    加Maven依赖库:

    <dependency>
    	<groupId>org.apache.sparkgroupId>
    	<artifactId>spark-hive-thriftserver_2.11artifactId>
    	<version>2.4.5version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    参考文档: https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC

    测试数据:

    
    -- 创建表
    -- 雇员表EMP
    create table IF NOT EXISTS db_hive.emp(
    empno int,
    ename string,
    job string,
    mgr int,
    hiredate string,
    sal double,
    comm double,
    deptno int
    )
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
    
    -- 部门表DEPT
    create table IF NOT EXISTS db_hive.dept(
    deptno int,
    dname string,
    loc string
    )
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
    
    -- 加载数据至表中
    load data local inpath '/opt/datas/emp.txt' overwrite into table emp ;
    load data local inpath '/opt/datas/dept.txt' overwrite into table dept ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    dept.txt

    10	ACCOUNTING	NEW YORK
    20	RESEARCH	DALLAS
    30	SALES	CHICAGO
    40	OPERATIONS	BOSTON
    
    • 1
    • 2
    • 3
    • 4

    emp.txt

    7369	SMITH	CLERK	7902	1980-12-17	800.00		20
    7499	ALLEN	SALESMAN	7698	1981-2-20	1600.00	300.00	30
    7521	WARD	SALESMAN	7698	1981-2-22	1250.00	500.00	30
    7566	JONES	MANAGER	7839	1981-4-2	2975.00		20
    7654	MARTIN	SALESMAN	7698	1981-9-28	1250.00	1400.00	30
    7698	BLAKE	MANAGER	7839	1981-5-1	2850.00		30
    7782	CLARK	MANAGER	7839	1981-6-9	2450.00		10
    7788	SCOTT	ANALYST	7566	1987-4-19	3000.00		20
    7839	KING	PRESIDENT		1981-11-17	5000.00		10
    7844	TURNER	SALESMAN	7698	1981-9-8	1500.00	0.00	30
    7876	ADAMS	CLERK	7788	1987-5-23	1100.00		20
    7900	JAMES	CLERK	7698	1981-12-3	950.00		30
    7902	FORD	ANALYST	7566	1981-12-3	3000.00		20
    7934	MILLER	CLERK	7782	1982-1-23	1300.00		10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    范例演示: 采用JDBC方式读取Hive中db_hive.emp表的数据。

    import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
    
    /**
     * SparkSQL 启动ThriftServer服务,通过JDBC方式访问数据分析查询
     * i). 通过Java JDBC的方式,来访问Thrift JDBC/ODBC server,调用Spark SQL,并直接查询Hive中的数据
     * ii). 通过Java JDBC的方式,必须通过HTTP传输协议发送thrift RPC消息, Thrift JDBC/ODBC server必须通过上面命
     * 令启动HTTP模式
     */
    object SparkThriftJDBC {
      def main(args: Array[String]): Unit = {
        // 定义相关实例对象,未进行初始化
        var conn: Connection = null
        var pstmt: PreparedStatement = null
        var rs: ResultSet = null
        try {
          // TODO: a. 加载驱动类
          Class.forName("org.apache.hive.jdbc.HiveDriver")
          // TODO: b. 获取连接Connection
          conn = DriverManager.getConnection(
            "jdbc:hive2://node1:10000/db_hive",
            "root",
            "123456"
          )
          // TODO: c. 构建查询语句
          val sqlStr: String =
            """
              |select e.ename, e.sal, d.dname from emp e join dept d on e.deptno = d.deptno
    """.stripMargin
          pstmt = conn.prepareStatement(sqlStr)
          // TODO: d. 执行查询,获取结果
          rs = pstmt.executeQuery()
          // 打印查询结果
          while (rs.next()) {
            println(s"empno = ${rs.getInt(1)}, ename = ${rs.getString(2)}, sal = ${
              rs.getDouble(3
              )
            }, dname = ${rs.getString(4)}")
          }
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (null != rs) rs.close()
          if (null != pstmt) pstmt.close()
          if (null != conn) conn.close()
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    Catalyst 优化器

    在SparkSQL系列-4、数据处理分析【案例:电影评分数据分析】中,运行应用程序代码, 通过WEB UI界面监控可以看
    出, 无论使用DSL还是SQL, 构建Job的DAG图一样的, 性能是一样的,原因在于SparkSQL中引擎:Catalyst:将SQL和DSL转换为相同逻辑计划
    在这里插入图片描述
    Spark SQL是Spark最新,技术最复杂的组件之一。它为SQL查询和新的DataFrame API
    提供支持。 Spark SQL的核心是Catalyst优化器,它以一种新颖的方式利用高级编程语言功
    能(例如Scala的模式匹配和quasiquotes)来构建可扩展的查询优化器。

    在这里插入图片描述
    SparkSQL的Catalyst优化器是整个SparkSQL pipeline的中间核心部分,其执行策略主要两方向:

    • 基于规则优化/Rule Based Optimizer/RBO;
    • 基于代价优化/Cost Based Optimizer/CBO;

    在这里插入图片描述
    从上图可见,无论是直接使用SQL语句还是使用 ataFrame,都会经过一些列步骤转换成DAG对RDD的操作。

    Catalyst工作流程: SQL语句首先通过Parser模块被解析为语法树,此棵树称为Unresolved
    Logical Plan; Unresolved Logical Plan通过Analyzer模块借助于数据元数据解析为Logical Plan;
    此时再通过各种基于规则的Optimizer进行深入优化,得到Optimized Logical Plan; 优化后的逻辑
    执行计划依然是逻辑的,需要将逻辑计划转化为Physical Plan。
    在这里插入图片描述
    核心三个点:

    • 第一点、 Parser,第三方类库ANTLR实现。将sql字符串切分成Token,根据语义规则解析
      成一颗AST语法树;
    • 第二点、 Analyzer, Unresolved Logical Plan,进行数据类型绑定和函数绑定;
    • 第三点、 Optimizer,规则优化就是模式匹配满足特定规则的节点等价转换为另一颗语
      法树;
      在这里插入图片描述
  • 相关阅读:
    Dubbo-服务暴露
    [JS入门到进阶] 前端开发不能写undefined?这是误区!
    java数据类型与变量的安全性
    金仓数据库 KingbaseES 插件DBMS_UTILITY
    抗洪救灾,共克时艰,城联优品捐赠10万元爱心物资驰援英德
    openbmc开发38:webui开发—增加head栏语言切换
    【算法训练营】 - ⑩ 并查集与图
    HTML进阶
    代码随想录算法训练营第23期day21| 235. 二叉搜索树的最近公共祖先 、701.二叉搜索树中的插入操作、450.删除二叉搜索树中的节点
    C. Decreasing String -思维 + 单调栈
  • 原文地址:https://blog.csdn.net/l848168/article/details/126465188