• SPark学习笔记:10 SparkSQL 数据源之Spark on Hive


    概述

    Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL 编译时可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF(用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译 Spark SQL 时引入Hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。

    SparkSQL可以连接到一个装好的Hive集群,也可以使用内置的Hive。如果使用内置的Hive,Spark SQL会在当前的工作目录下创建出自己的Hive元数据仓库,叫做metastore_db.Spark-shell默认是支持Hive的,代码中默认是不支持的,需要手动开启。

    内嵌的Hive

    使用外部的Hive

    spark-shell使用外部的Hive

    • 1、安装好hive集群后,启动hive的metasore service。
      Tips: hive-site.xml文件中需要配置好hive.metastore.uris 的属性值,否则不能在外部进行远程连接hive。
    [root@k8s-node3 software]# cd apache-hive-3.1.3-bin/
    [root@k8s-node3 apache-hive-3.1.3-bin]# ls
    bin                      conf      hcatalog  lib      NOTICE             scripts
    binary-package-licenses  examples  jdbc      LICENSE  RELEASE_NOTES.txt
    [root@k8s-node3 apache-hive-3.1.3-bin]# hive --service metastore & 
    [1] 26582
    [root@k8s-node3 apache-hive-3.1.3-bin]# 2022-07-26 11:02:03: Starting Hive Metastore Server
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/home/software/apache-hive-3.1.3-bin/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/home/software/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 2、将hive-site.xml文件拷贝到spark的$SPARK_HOME/conf/ 目录下

    • 3、将hadoop的hdfs-site.xml文件、core-site.xml文件拷贝到spark的$SPARK_HOME/conf/ 目录下

    • 4、将连接mysql驱动库的jar包(由于本人使用的是mysql8.0系列,所以是mysql-connector-java-8.0.11.jar)拷贝到$SPARK_HOME/jars/ 目录下

    • 5、启动spark-shell

    [root@k8s-node3 bin]# ./spark-shell 
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/07/26 11:48:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Spark context Web UI available at http://k8s-node3:4040
    Spark context available as 'sc' (master = local[*], app id = local-1658850502645).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 3.3.0
          /_/
             
    Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_333)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.sql("show databases;").show
    +---------+
    |namespace|
    +---------+
    |  default|
    +---------+
    
    
    scala> spark.sql("show tables;").show
    +---------+---------+-----------+                                               
    |namespace|tableName|isTemporary|
    +---------+---------+-----------+
    |  default|  student|      false|
    |  default| student2|      false|
    |  default| student3|      false|
    |  default| student4|      false|
    +---------+---------+-----------+
    
    scala> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    至此,在spark-shell中使用外部Hive的操作完成,可以打印出我外部Hive系统中表和schema信息。

    Idea中连接外部的Hive

    上面我们已经知道了如何在spark-shell中使用外部hive作为数据源了,那么在idea的代码中我们又将怎么操作呢。

    • 1、添加maven依赖
    <dependency>
        <groupId>org.apache.hivegroupId>
        <artifactId>hive-execartifactId>
        <version>3.1.3version>
    dependency>
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-hive_2.12artifactId>
        <version>3.3.0version>
    dependency>
    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>8.0.23version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 2、将hive-site.xml、hdfs-site.xml、core-site.xml文件拷贝到项目的resource目录中。
    • 3、外部hive同样需要开启hive metastore服务。
    • 4、开始编码
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    object HiveDs {
    
      def main(args: Array[String]): Unit = {
    
        val conf:SparkConf = new SparkConf()
        conf.setAppName("hive-sql")
        conf.setMaster("local")
    
        val sparkSession = SparkSession.builder().config(conf)
          .enableHiveSupport()
          .getOrCreate()
        sparkSession.sql("show tables").show()
        val sql:String=
          """
            |CREATE TABLE SENSOR(
            |ID STRING NOT NULL,
            |TIMESTAMP BIGINT NOT NULL,
            |TEMPERATURE DECIMAL(5,2)
            |)
            |""".stripMargin
    
        sparkSession.sql(sql)
    
        sparkSession.close()
      }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    配置 Spark beeline

    Spark Thrift Server 是 Spark 社区基于 HiveServer2 实现的一个 Thrift 服务。旨在无缝兼容
    HiveServer2。因为 Spark Thrift Server 的接口和协议都和 HiveServer2 完全一致,因此我们部
    署好 Spark Thrift Server 后,可以直接使用 hive 的 beeline 访问 Spark Thrift Server 执行相关
    语句。Spark Thrift Server 的目的也只是取代 HiveServer2,因此它依旧可以和 Hive Metastore
    进行交互,获取到 hive 的元数据

    步骤如下:

    • 1、同spark-shell中访问外部hive一样,需要将那几个xml文件、mysql的jar包按照上面的步骤拷贝到指定目录
    • 2、启动ThriftServer
    [root@k8s-node3 spark-3.3.0-bin-hadoop3]# cd sbin/
    [root@k8s-node3 sbin]# ls
    decommission-slave.sh   start-all.sh                    start-slaves.sh         stop-master.sh                 stop-worker.sh
    decommission-worker.sh  start-history-server.sh         start-thriftserver.sh   stop-mesos-dispatcher.sh       stop-workers.sh
    slaves.sh               start-master.sh                 start-worker.sh         stop-mesos-shuffle-service.sh  workers.sh
    spark-config.sh         start-mesos-dispatcher.sh       start-workers.sh        stop-slave.sh
    spark-daemon.sh         start-mesos-shuffle-service.sh  stop-all.sh             stop-slaves.sh
    spark-daemons.sh        start-slave.sh                  stop-history-server.sh  stop-thriftserver.sh
    [root@k8s-node3 sbin]# ./start-thriftserver.sh 
    starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /home/software/spark-3.3.0-bin-hadoop3/logs/spark-root-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-k8s-node3.out
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 3、使用beeline连接hive
    [root@k8s-node3 bin]# ./beeline -u jdbc:hive2://k8s-node3:10000 -n root
    Connecting to jdbc:hive2://k8s-node3:10000
    Connected to: Spark SQL (version 3.3.0)
    Driver: Hive JDBC (version 2.3.9)
    Transaction isolation: TRANSACTION_REPEATABLE_READ
    Beeline version 2.3.9 by Apache Hive
    0: jdbc:hive2://k8s-node3:10000> show databases;
    +------------+
    | namespace  |
    +------------+
    | default    |
    +------------+
    1 row selected (1.335 seconds)
    0: jdbc:hive2://k8s-node3:10000> show tables;
    +------------+------------+--------------+
    | namespace  | tableName  | isTemporary  |
    +------------+------------+--------------+
    | default    | student    | false        |
    | default    | student2   | false        |
    | default    | student3   | false        |
    | default    | student4   | false        |
    +------------+------------+--------------+
    4 rows selected (0.397 seconds)
    0: jdbc:hive2://k8s-node3:10000> 
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    可能遇到的问题

    • Unable to instantiate SparkSession with Hive support because Hive classes are not found.
    Exception in thread "main" java.lang.IllegalArgumentException: Unable to instantiate SparkSession with Hive support because Hive classes are not found.
    	at org.apache.spark.sql.SparkSession$Builder.enableHiveSupport(SparkSession.scala:891)
    	at com.hjt.yxh.hw.sparksql.HiveDs$.main(HiveDs.scala:14)
    	at com.hjt.yxh.hw.sparksql.HiveDs.main(HiveDs.scala)
    
    • 1
    • 2
    • 3
    • 4

    这个是因为缺少spark-hive的jar包,在pom.xml中添加

    <dependency>
        <groupId>org.apache.hivegroupId>
        <artifactId>hive-execartifactId>
        <version>3.1.3version>
    dependency>
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-hive_2.12artifactId>
        <version>3.3.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    聊聊基于Alink库的特征工程方法
    卷积神经网络 - 从全连接层到卷积
    第十章 JavaScript操作BOM对象
    Rust多线程编程
    将cpu版本的pytorch换成gpu版本
    【Java】实现文件夹复制
    Spring MVC相关异常类
    通过S3协议实现通用的文件存储服务中间件
    [Linux] 网络套接字编程之实现简单的TCP网络程序(下)
    ​Vue + Element UI前端篇(二):Vue + Element 案例 ​
  • 原文地址:https://blog.csdn.net/wangzhongyudie/article/details/126005882