• SparkSQL - 介绍及使用 Scala、Java、Python 三种语言演示


    一、SparkSQL

    前面的文章中使用 RDD 进行数据的处理,优点是非常的灵活,但需要了解各个算子的场景,需要有一定的学习成本,而 SQL 语言是一个大家十分熟悉的语言,如果可以通过编写 SQL 而操作RDD,学习的成本便会大大降低,在大数据领域 SQL 已经是数一个非常重要的范式,在 Hadoop 生态圈中,我们可以通过 Hive 进而转换成 MapReduces 进行数据分析,在后起之秀的 Flink 中也有 FlinkSQL 来简化数据的操作。

    SparkSQL 可以理解成是将 SQL 解析成:RDD + 优化 再执行。

    SparkSQL 对比 Hive
    SparkSQLHive
    计算方式基于 RDD 在内存计算转化为 MapReduces 需要磁盘IO读写
    计算引擎SparkMR、Spark、Tez
    性能
    元数据无自身的元数据,可以与Hive metastore连接Hive metastore
    缓存表支持不支持
    视图支持支持
    ACID不支持支持(hive 0.14)
    分区支持支持
    分桶支持支持
    SparkSQL 的适用场景
    数据类型说明
    结构化数据有固定的 Schema ,例如:关系型数据库的表
    半结构化数据没有固定的 Schema,但是有结构,数据一般是自描述的,例如:JSON 数据
    理解 DataFrame 和 DataSet

    SparkSQL的数据抽象是 DataFrameDataSet ,底层都是RDD

    DataFrame 可以理解为是一个分布式表,包括:RDD - 泛型 + Schema约束(指定了字段名和类型) + SQL操作 + 优化

    DataSetDataFrame 的基础上增加了泛型的概念。

    例如:有文本数据,读取为 RDD 后,可以拥有如下数据:

    1小明110@qq.com
    2小张120@qq.com
    3小王130@qq.com

    如果转化为 DataFrame ,那就就可以拥有下面数据:

    ID:bigint姓名:String邮箱:String
    1小明110@qq.com
    2小张120@qq.com
    3小王130@qq.com

    如果转化为 DataSet ,那就就可以拥有下面数据:

    ID:bigint姓名:String邮箱:String泛型
    1小明110@qq.comuser
    2小张120@qq.comuser
    3小王130@qq.comuser

    DataSetDataFrame还是有挺大区别的,DataFrame开发都是写SQL,但是DataSet可以使用类似RDDAPI。也可以理解成DataSet就是存了个数据类型的RDD

    二、通过 RDD 使用SparkSQL

    如果是使用 ScalaJava 语言开发,需要引入 SparkSQL 的依赖:

    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-sql_2.12artifactId>
        <version>3.0.1version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    假如现在有如下文本文件,分别对应含义为:ID、名称、年龄、邮箱

    1 小明 20 110.@qq.com
    2 小红 29 120.@qq.com
    3 李四 25 130.@qq.com
    4 张三 30 140.@qq.com
    5 王五 35 150.@qq.com
    6 赵六 40 160.@qq.com
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    下面还是使用前面文章的方式读取文本为 RDD ,不过不同的是,我们将 RDD 转为 DataFrame 使用 SQL 的方式处理:

    • Scala:
    object SQLRddScala {
    
      case class User(id: Int, name: String, age: Int, email: String)
    
      def main(args: Array[String]): Unit = {
        //声明 SparkSession
        val spark: SparkSession = SparkSession
          .builder()
          .appName("sparksql")
          .master("local[*]")
          .getOrCreate()
        //通过 SparkSession 获取 SparkContext
        val sc = spark.sparkContext
        //读取文件为 RDD
        val text = sc.textFile("D://test/input1/")
        //根据空格拆分字段
        val rdd = text.map(_.split(" ")).map(s => User(s(0).toInt, s(1), s(2).toInt, s(3)))
        //转化为 DataFrame,并指定 Schema
        val dataFrame = spark.createDataFrame(rdd)
        //打印 Schema
        dataFrame.printSchema()
        //查看数据
        dataFrame.show()
        //DSL 风格查询
        dataFrame.select("id","name").filter("age >= 30").show()
        //SQL 风格
        //注册表
        dataFrame.createOrReplaceTempView("user")
        //执行 SQL 语言
        spark.sql("select * from user where age >= 30").show()
        //关闭资源
        spark.stop()
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • Java:
    public class SQLRddJava {
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class User {
            private Integer id;
            private String name;
            private Integer age;
            private String email;
        }
    
        public static void main(String[] args) {
            // 声明 SparkSession
            SparkSession spark = SparkSession
                    .builder()
                    .appName("sparksql")
                    .master("local[*]")
                    .getOrCreate();
            //  通过 SparkSession 获取 SparkContext
            JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
            // 读取文件为 RDD
            JavaRDD<String> text = sc.textFile("D://test/input1/");
            //根据空格拆分字段
            JavaRDD<User> rdd = text.map(s -> s.split(" ")).map(s -> new User(Integer.parseInt(s[0]), s[1], Integer.parseInt(s[2]), s[3]));
            //转化为 DataFrame,并指定 Schema
            Dataset<Row> dataFrame = spark.createDataFrame(rdd, User.class);
            //打印 Schema
            dataFrame.printSchema();
            // 查看数据
            dataFrame.show();
            //DSL 风格查询
            dataFrame.select("id","name").filter("age >= 30").show();
            // SQL 风格
            dataFrame.createOrReplaceTempView("user");
            // 注册表
            spark.sql("select * from user where age >= 30").show();
            // 执行 SQL 语言
            spark.stop();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • Python:
    from pyspark.sql import SparkSession
    import findspark
    
    if __name__ == '__main__':
        findspark.init()
        # 声明 SparkSession
        spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
        # 通过 SparkSession 获取 SparkContext
        sc = spark.sparkContext
        # 读取文件为 RDD
        text = sc.textFile("D:/test/input1/")
        # 根据空格拆分字段
        rdd = text.map(lambda s: s.split(" "))
        # 转化为 DataFrame,并指定 Schema
        dataFrame = spark.createDataFrame(rdd, ["id", "name", "age", "email"])
        # 打印 Schema
        dataFrame.printSchema()
        # 查看数据
        dataFrame.show()
        # DSL 风格查询
        dataFrame.select(["id","name"]).filter("age >= 30").show()
        # SQL 风格
        # 注册表
        dataFrame.createOrReplaceTempView("user")
        # 执行 SQL 语言
        spark.sql("select * from user where age >= 30").show()
    
        #关闭资源
        spark.stop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    打印的 Schema 信息:
    在这里插入图片描述
    全部数据内容:

    在这里插入图片描述
    DSL 查询结果:

    在这里插入图片描述
    SQL 查询结果:

    在这里插入图片描述

    三、多数据源交互

    SparkSession 中可以通过: spark.read.格式(路径) 的方式, 获取 SparkSQL 中的外部数据源访问框架 DataFrameReaderDataFrameReader 有两种访问方式,一种是使用 load 方法加载,使用 format 指定加载格式, 还有一种是使用封装方法, 类似 csv, json, jdbc 等,这两种方式本质上一样,都是 load 的封装。

    注意:如果使用 load 方法加载数据, 但是没有指定 format 的话, 默认是按照 Parquet 文件格式读取。

    对于写数据SparkSQL 中增加了一个新的数据写入框架 DataFrameWriter ,同样也有两种使用方式,一种是使用 format 配合 save,还有一种是使用封装方法,例如 csv, json, saveAsTable 等,参数如下:

    组件说明
    source写入目标, 文件格式等, 通过 format 方法设定
    mode写入模式, 例如一张表已经存在, 如果通过 DataFrameWriter 向这张表中写入数据, 是覆盖表呢, 还是向表中追加呢? 通过 mode 方法设定
    extraOptions外部参数, 例如 JDBC 的 URL, 通过 options, option 设定
    partitioningColumns类似 Hive 的分区, 保存表的时候使用, 这个地方的分区不是 RDD 的分区, 而是文件的分区, 或者表的分区, 通过 partitionBy 设定
    bucketColumnNames类似 Hive 的分桶, 保存表的时候使用, 通过 bucketBy 设定
    sortColumnNames用于排序的列, 通过 sortBy 设定

    其中一个很重要的参数叫做 mode,表示指定的写入模式,可以传入Scala 对象表示或字符串表示,有如下几种方式:

    Scala 对象表示字符串表示说明
    SaveMode.ErrorIfExists“error”将 DataFrame 保存到 source 时, 如果目标已经存在, 则报错
    SaveMode.Append“append”将 DataFrame 保存到 source 时, 如果目标已经存在, 则添加到文件或者 Table 中
    SaveMode.Overwrite“overwrite”将 DataFrame 保存到 source 时, 如果目标已经存在, 则使用 DataFrame 中的数据完全覆盖目标
    SaveMode.Ignore“ignore”将 DataFrame 保存到 source 时, 如果目标已经存在, 则不会保存 DataFrame 数据, 并且也不修改目标数据集, 类似于 CREATE TABLE IF NOT EXISTS

    注意:如果没有指定 format, 默认的 formatParquet

    1. 读写 CSV 格式

    准备 CSV 文件:

    在这里插入图片描述

    • Scala:
    object SQLCSV {
      def main(args: Array[String]): Unit = {
        //声明 SparkSession
        val spark: SparkSession = SparkSession
          .builder()
          .appName("sparksql")
          .master("local[*]")
          .getOrCreate()
    
        //读取 CSV
        val csv = spark
          .read
          .schema("id int, name string, age int, email string")
          .option("header", "true") //第一行为标题
          .csv("D:/test/input1/test.csv")
        csv.printSchema()
        csv.show()
        // SQL 操作
        csv.createOrReplaceTempView("csv")
        spark.sql("select * from csv where age >= 30").show()
        //如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
        val csv1 = spark
          .read
          .schema("id int, name string, age int, email string")
          .option("delimiter", " ")
          .csv("D:/test/input1/test.txt")
        csv1.printSchema()
        csv1.show()
        //写出CSV文件
        csv1.write.mode(SaveMode.Overwrite).json("D:/test/output")
        //写出查询结果
        spark.sql("select * from csv where age <= 30")
          .write.mode(SaveMode.Overwrite).csv("D:/test/output1")
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • Java:
    public class SQLCSVJava {
        public static void main(String[] args) {
            SparkSession spark = SparkSession
                    .builder()
                    .appName("sparksql")
                    .master("local[*]")
                    .getOrCreate();
            //读取 CSV
            Dataset<Row> csv = spark.read()
                    .schema("id int, name string, age int, email string")
                    .option("header", "true") //第一行为标题
                    .csv("D:/test/input1/test.csv");
            csv.printSchema();
            csv.show();
            // SQL 操作
            csv.createOrReplaceTempView("csv");
            spark.sql("select * from csv where age >= 30").show();
            //如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
            Dataset<Row> csv1 = spark.read()
                    .schema("id int, name string, age int, email string")
                    .option("delimiter", " ") //第一行为标题
                    .csv("D:/test/input1/test.txt");
            csv1.printSchema();
            csv1.show();
            //写出CSV文件
            csv1.write().mode(SaveMode.Overwrite).json("D:/test/output");
            //写出查询结果
            spark.sql("select * from csv where age <= 30")
                    .write().mode(SaveMode.Overwrite).csv("D:/test/output1");
    
            spark.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • Python:
    from pyspark.sql import SparkSession,DataFrameWriter
    import findspark
    
    if __name__ == '__main__':
        findspark.init()
        # 声明 SparkSession
        spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
        # 读取 CSV
        csv = spark.read \
          .schema("id int, name string, age int, email string") \
          .option("header", "true") \
          .csv("D:/test/input1/test.csv")
        csv.printSchema()
        csv.show()
        #  SQL 操作
        csv.createOrReplaceTempView("csv")
        spark.sql("select * from csv where age >= 30").show()
        # 如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
        csv1 = spark.read \
          .schema("id int, name string, age int, email string") \
          .option("delimiter", " ") \
          .csv("D:/test/input1/test.txt")
        csv1.printSchema()
        csv1.show()
        # 写出CSV文件
        csv1.write.mode("overwrite").json("D:/test/output")
        # 写出查询结果
        spark.sql("select * from csv where age <= 30").write.mode("overwrite").csv("D:/test/output1")
        #关闭资源
        spark.stop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    存储的 csv :
    在这里插入图片描述

    在这里插入图片描述

    2. 读写Parquet 格式文件

    先将上面 csv 文件转为 Parquet 文件:

    //读取 CSV
    Dataset<Row> csv = spark.read()
            .schema("id int, name string, age int, email string")
            .option("header", "true") //第一行为标题
            .csv("D:/test/input1/test.csv");
    // 转化为 Parquet 文件
    csv.write().mode(SaveMode.Overwrite).parquet("D:/test/output3");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述
    将该文件名修改为 test.parquet 方便下面测试:

    读取 Parquet 格式文件

    • Scala:
    object SQLParquet {
      def main(args: Array[String]): Unit = {
        //声明 SparkSession
        val spark: SparkSession = SparkSession
          .builder()
          .appName("sparksql")
          .master("local[*]")
          .getOrCreate()
    
        //读取 parquet
        val parquet = spark.read.parquet("D:/test/output3/test.parquet")
        parquet.printSchema()
        parquet.show()
        // SQL 操作
        parquet.createOrReplaceTempView("parquet")
        spark.sql("select * from parquet where age >= 30").show()
        //写入 Parquet 的时候指定分区
        parquet.write.mode(SaveMode.Overwrite).partitionBy("age").csv("D:/test/output5")
    
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • Java
    public class SQLParquetJava {
        public static void main(String[] args) {
            SparkSession spark = SparkSession
                    .builder()
                    .appName("sparksql")
                    .master("local[*]")
                    .getOrCreate();
            //读取 parquet 
            Dataset<Row> parquet = spark.read().parquet("D:/test/output3/test.parquet");
            parquet.printSchema();
            parquet.show();
            // SQL 操作
            parquet.createOrReplaceTempView("parquet");
            spark.sql("select * from parquet where age >= 30").show();
            //写入 Parquet 的时候指定分区
            parquet.write().mode(SaveMode.Overwrite).partitionBy("age").parquet("D:/test/output5");
    
            spark.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • Python:
    from pyspark.sql import SparkSession,DataFrameWriter
    import findspark
    
    if __name__ == '__main__':
        findspark.init()
        # 声明 SparkSession
        spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
    
        #  读取 parquet
        parquet = spark.read.parquet("D:/test/output3/test.parquet")
        parquet.printSchema()
        parquet.show()
        #  SQL操作
        parquet.createOrReplaceTempView("parquet")
        spark.sql("select * from parquet where age >= 30").show()
        # 写入Parquet的时候指定分区
        parquet.write.mode("overwrite").partitionBy("age").csv("D:/test/output5")
       
        #关闭资源
        spark.stop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    SQL 查询结果:

    在这里插入图片描述

    输出目录:
    在这里插入图片描述

    3. 读写 JSON 格式文件

    将上面CSV数据转化为 JSON

    //读取 CSV
    Dataset<Row> csv = spark.read()
            .schema("id int, name string, age int, email string")
            .option("header", "true") //第一行为标题
            .csv("D:/test/input1/test.csv");
    // 转化为 JSON 文件
    csv.write().mode(SaveMode.Overwrite).json("D:/test/output6");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述
    在这里插入图片描述
    读写 JSON 格式文件

    • Scala:
    object SQLJson {
      def main(args: Array[String]): Unit = {
        //声明 SparkSession
        val spark: SparkSession = SparkSession
          .builder()
          .appName("sparksql")
          .master("local[*]")
          .getOrCreate()
    
        //读取 JSON
        val json = spark.read.json("D:/test/output6/test.json")
        json.printSchema()
        json.show()
        // SQL 操作
        json.createOrReplaceTempView("parquet")
        spark.sql("select * from parquet where age >= 30").show()
        //写入 JSON
        json.filter("age < 30 ").write.json("D:/test/output7")
    
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • Java:
    public class SQLJsonJava {
        public static void main(String[] args) {
            SparkSession spark = SparkSession
                    .builder()
                    .appName("sparksql")
                    .master("local[*]")
                    .getOrCreate();
    
            //读取 JSON
            Dataset<Row> json = spark.read().json("D:/test/output6/test.json");
            json.printSchema();
            json.show();
            // SQL 操作
            json.createOrReplaceTempView("parquet");
            spark.sql("select * from parquet where age >= 30").show();
            //写入 JSON
            json.filter("age < 30 ").write().json("D:/test/output7");
    
            spark.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • Python:
    from pyspark.sql import SparkSession,DataFrameWriter
    import findspark
    
    if __name__ == '__main__':
        findspark.init()
        # 声明 SparkSession
        spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
    
        # 读取JSON
        json = spark.read.json("D:/test/output6/test.json")
        json.printSchema()
        json.show()
        # SQL操作
        json.createOrReplaceTempView("parquet")
        spark.sql("select * from parquet where age >= 30").show()
        # 写入JSON
        json.filter("age < 30 ").write.json("D:/test/output7")
       
        #关闭资源
        spark.stop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    4. 读写 MySQL 格式文件

    ScalaScala 项目需要引入 MySQL 的依赖:

    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>8.0.22version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    创建表:

    CREATE TABLE `user` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) DEFAULT NULL,
      `age` int(11) DEFAULT NULL,
      `email` varchar(255) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    写入测试数据:

    INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (1, '小明', 20, '110.@qq.com');
    INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (2, '小红', 29, '120.@qq.com');
    INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (3, '李四', 25, '130.@qq.com');
    INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (4, '张三', 30, '140.@qq.com');
    INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (5, '王五', 35, '150.@qq.com');
    INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (6, '赵六', 40, '160.@qq.com');
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    读写 MySQL 格式文件

    • Scala:
    object SQLMySql {
      def main(args: Array[String]): Unit = {
        //声明 SparkSession
        val spark: SparkSession = SparkSession
          .builder()
          .appName("sparksql")
          .master("local[*]")
          .getOrCreate()
    
        //读取 mysql
        val prop = new Properties
        prop.setProperty("user", "root")
        prop.setProperty("password", "root")
        val user = spark.read.jdbc(
          "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
          "user",
          prop)
        user.printSchema()
        user.show()
        // SQL 操作
        user.createOrReplaceTempView("user")
        spark.sql("select * from user where age >= 30").show()
        //写入表信息,没有表自动创建
        user.filter("age < 30 ")
          .write.mode(SaveMode.Overwrite).jdbc(
          "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
          "user2",
          prop)
    
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • Java:
    public class SQLMySqlJava {
        public static void main(String[] args) {
            SparkSession spark = SparkSession
                    .builder()
                    .appName("sparksql")
                    .master("local[*]")
                    .getOrCreate();
    
            //读取 mysql
            Properties prop = new Properties();
            prop.setProperty("user","root");
            prop.setProperty("password","root");
            Dataset<Row> user = spark.read().jdbc(
                    "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
                    "user",
                    prop);
            user.printSchema();
            user.show();
            // SQL 操作
            user.createOrReplaceTempView("user");
            spark.sql("select * from user where age >= 30").show();
            //写入表信息,没有表自动创建
            user.filter("age < 30 ").write().mode(SaveMode.Overwrite).jdbc(
                    "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
                    "user2",
                    prop
            );
    
            spark.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • Python
      使用 Python 读写 MySql 需要将 MySql的驱动放到 java 安装目录的 jre\lib\ext目录下:

    在这里插入图片描述

    from pyspark.sql import SparkSession, SQLContext
    import findspark
    
    if __name__ == '__main__':
        findspark.init()
        # 声明 SparkSession
        spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
    
        properties = {'user': 'root', 'password': 'root'}
        url = "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
        user = spark.read.jdbc(url=url, table="user", properties=properties)
        user.printSchema()
        user.show()
        #  SQL操作
        user.createOrReplaceTempView("user")
        spark.sql("select * from user where age >= 30").show()
        # 写入表信息,没有表自动创建
        user.filter("age < 30 ").write.mode("overwrite").jdbc(url=url, table="user2", properties=properties)
    
        # 关闭资源
        spark.stop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    SQL 查询结果:
    在这里插入图片描述
    MySQL 表信息:
    在这里插入图片描述

    四、SparkOnHive

    Hive 中可以将运算引擎改为 Spark ,也就是 HiveONSpark 不过这种方式严重依赖 Hive ,已经淘汰,而 SparkOnHvie 是在 SparkSQL 诞生之后提出的,仅仅使用 Hive 的元数据(库、表、字段、位置等),剩下的全部由 Spark 进行语法解析、物理执行计划、SQL优化等。

    由于远程模式下 Hive 的元数据是由 metastore 服务控制,因此确保metastore 服务正常启动,如果对此不了解,可以参考下面文章:

    https://xiaobichao.blog.csdn.net/article/details/127717080

    在这里插入图片描述

    注意spark3.0.1整合hive要求hive版本>=2.3.7

    ScalaJava 项目需要引入 spark-hive 的依赖:

    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-hive_2.12artifactId>
        <version>3.0.1version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • Scala:
    object SparkOnHive {
      def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")
        val spark = SparkSession.builder
          .appName("sparksql")
          .master("local[*]")
          // 实际开发中可以根据集群规模调整大小,默认200
          .config("spark.sql.shuffle.partitions", "8")
          // 指定 Hive 数据库在 HDFS 上的位置
          .config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
          // hive metastore 的地址
          .config("hive.metastore.uris", "thrift://node1:9083")
          // 开启对hive语法的支持
          .enableHiveSupport
          .getOrCreate
    
        // 查询全部数据库
        spark.sql("show databases").show()
        // 使用 bxc 库
        spark.sql("use bxc").show()
        // 查询全部表
        spark.sql("show tables").show()
        // 创建表
        spark.sql("create table if not exists `user2`(" +
          "    id int comment 'ID'," +
          "    name string comment '名称'," +
          "    age int comment '年龄'," +
          "    email string comment '邮箱'" +
          ") comment '用户表'" +
          "row format delimited " +
          "fields terminated by ',' " +
          "lines terminated by '\n' ").show()
        spark.sql("show tables").show()
    
        //查询数据
        spark.sql("select * from `user`").show()
    
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • Java
    public class SparkOnHiveJava {
    
        public static void main(String[] args) {
            System.setProperty("HADOOP_USER_NAME","root");
            SparkSession spark = SparkSession
                    .builder()
                    .appName("sparksql")
                    .master("local[*]")
                    // 实际开发中可以根据集群规模调整大小,默认200
                    .config("spark.sql.shuffle.partitions", "8")
                    // 指定 Hive 数据库在 HDFS 上的位置
                    .config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
                    // hive metastore 的地址
                    .config("hive.metastore.uris", "thrift://node1:9083")
                    // 开启对hive语法的支持
                    .enableHiveSupport()
                    .getOrCreate();
    
            // 查询全部数据库
            spark.sql("show databases").show();
            // 使用 bxc 库
            spark.sql("use bxc").show();
            // 查询全部表
            spark.sql("show tables").show();
            // 创建表
            spark.sql(
                    "create table if not exists `user2`(" +
                    "    id int comment 'ID'," +
                    "    name string comment '名称'," +
                    "    age int comment '年龄'," +
                    "    email string comment '邮箱'" +
                    ") comment '用户表'" +
                    "row format delimited " +
                    "fields terminated by ',' " +
                    "lines terminated by '\n' ").show();
            // 查询全部表
            spark.sql("show tables").show();
            
            //查询数据
            spark.sql("select * from `user`").show();
    
            spark.stop();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • Python:
    from pyspark.sql import SparkSession, SQLContext
    import findspark
    
    if __name__ == '__main__':
        findspark.init()
        # 声明 SparkSession
        spark = SparkSession.builder\
            .appName('sparksql')\
            .master("local[*]") \
            .config("spark.sql.shuffle.partitions", "8") \
            .config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse") \
            .config("hive.metastore.uris", "thrift://node1:9083") \
            .enableHiveSupport() \
            .getOrCreate()
    
        #  查询全部数据库
        spark.sql("show databases").show()
        #  使用bxc库
        spark.sql("use bxc").show()
        #  查询全部表
        spark.sql("show tables").show()
        #  创建表
        spark.sql("create table if not exists `user2`(" +
                  "    id int comment 'ID'," +
                  "    name string comment '名称'," +
                  "    age int comment '年龄'," +
                  "    email string comment '邮箱'" +
                  ") comment '用户表'" +
                  "row format delimited " +
                  "fields terminated by ',' " +
                  "lines terminated by '\n' ").show()
        spark.sql("show tables").show()
    
        #  查询数据
        spark.sql("select * from `user`").show()
    
        spark.stop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    查看所有库:
    在这里插入图片描述
    查看全部表:
    在这里插入图片描述
    查询表信息:

    在这里插入图片描述

    五、SparkOnES

    创建测试索引,向 ES 发送 PUT 请求:

    PUT /user
    {
    	"settings": {},
    	"mappings": {
    		"properties": {
    			"name": {
    				"type": "text",
    				"index": true,
                    "analyzer": "pinyin"
    			},
    			"sex": {
    				"type": "text",
    				"index": true,
                    "analyzer": "ik_smart"
    			},
    			"age": {
    				"type": "long",
    				"index": false
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    写入几条测试数据:

    POST /user/_doc
    
    • 1
    {"name":"张三","age":18,"sex":"男"}
    {"name":"李四","age":20,"sex":"男"}
    {"name":"王五","age":30,"sex":"女"}
    {"name":"赵六","age":40,"sex":"男"}
    {"name":"小王","age":60,"sex":"女"}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    如果是 ScalaJava 项目需要引入 ES_Spark 的依赖包:

    <dependency>
        <groupId>org.elasticsearchgroupId>
        <artifactId>elasticsearch-spark-30_2.12artifactId>
        <version>7.17.5version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • Scala:
    object SparkOnES {
    
      def main(args: Array[String]): Unit = {
    
        val spark: SparkSession = SparkSession
          .builder()
          .appName("SparkOnES")
          .master("local[*]")
          .getOrCreate()
    
        val options = Map(
          "es.nodes" -> "192.168.40.176",
          "es.port" -> "9200",
          "pushdown" -> "true",
          "es.nodes.wan.only" -> "true",
          "es.update.retry.on.conflict" -> "3",
          "es.mapping.date.rich" -> "false",
          "es.index.auto.create" -> "true",
          "es.input.max.docs.per.partition" -> "5000000")
    
        val esDataSet = spark.read
          .format("org.elasticsearch.spark.sql")
          .options(options)
          .load("user")
        esDataSet.printSchema()
        esDataSet.limit(20).show()
        esDataSet.createOrReplaceTempView("user")
        spark.sql("select * from user where age > 30 ").show()
    
        // 写入数据
        esDataSet.filter("age > 30")
          .write.options(options)
          .format("org.elasticsearch.spark.sql")
          .options(options).mode(SaveMode.Overwrite)
          .save("user2")
    
    
        spark.stop()
    
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • Java:
    public class SparkOnESJava {
        public static void main(String[] args) {
    
            SparkSession spark = SparkSession
                    .builder()
                    .appName("sparksql")
                    .master("local[*]")
                    .getOrCreate();
    
            Map<String,String> options = new HashMap<>();
            options.put("es.nodes","192.168.40.176");
            options.put("es.port","9200");
            options.put("pushdown","true");
            options.put("es.nodes.wan.only","true");
            options.put("es.update.retry.on.conflict","3");
            options.put("es.mapping.date.rich","false");
            options.put("es.index.auto.create","true");
            options.put("es.input.max.docs.per.partition","5000000");
    
            Dataset<Row> esDataSet = spark.read()
                    .format("org.elasticsearch.spark.sql")
                    .options(options)
                    .load("user");
            esDataSet.printSchema();
            esDataSet.limit(20).show();
            esDataSet.createOrReplaceTempView("user");
            spark.sql("select * from user where age > 30 ").show();
    
            // 写入数据
            esDataSet.filter("age > 30")
                    .write().options(options)
                    .format("org.elasticsearch.spark.sql")
                    .options(options).mode(SaveMode.Overwrite)
                    .save("user2");
    
            spark.stop();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • python

    python 需要依赖于 elasticsearch-sparkjar 包,可以放在 Spark 的依赖中,或使用 spark-submit 时带上 –jars 指定该 jar包位置。

    from pyspark.sql import SparkSession
    import findspark
    
    if __name__ == '__main__':
        findspark.init()
        # 声明 SparkSession
        spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
    
        options = {
            "es.nodes": "192.168.40.176",
            "es.port": "9200",
            "pushdown": "true",
            "es.nodes.wan.only": "true",
            "es.update.retry.on.conflict": "3",
            "es.mapping.date.rich": "false",
            "es.index.auto.create": "true",
            "es.input.max.docs.per.partition": "5000000"
        }
    
        esDataSet = spark.read \
            .format("org.elasticsearch.spark.sql") \
            .options(**options) \
            .load("user")
        esDataSet.printSchema()
        esDataSet.limit(20).show()
        esDataSet.createOrReplaceTempView("news")
        spark.sql("select * from news where remark like '%贵州省%'").show()
    
        # 写入数据
        esDataSet.filter("age > 30").write \
            .format("org.elasticsearch.spark.sql") \
            .options(**options).mode("overwrite").save("user2")
    
        spark.stop()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    Schema 信息:
    在这里插入图片描述
    索引数据:

    在这里插入图片描述
    筛选后数据:

    在这里插入图片描述
    ES 中写入数据:

    在这里插入图片描述

    更多参数,可以参考官方的介绍:

    https://www.elastic.co/guide/en/elasticsearch/hadoop/8.5/spark.html

  • 相关阅读:
    等保备案主体是谁?在当地网安进行备案是吗?
    输入和输出处理
    力扣打卡:142. 环形链表 II | 快慢指针 | 双指针
    这几个Python实战项目,让大家了解到它的神奇
    手记系列之二 ----- 关于IDEA的一些使用方法经验
    应用分类算法,预测泰坦尼克号乘客幸存结果
    食品经营许可证没过期也要换?详细解读来了~
    web前端网页设计期末课程大作业:旅游网页主题网站设计——紫色的旅游开发景点网站静态模板(4页)HTML+CSS+JavaScript
    Python之列表
    看我在项目里怎么用设计模式,这么学设计模式也太简单了!
  • 原文地址:https://blog.csdn.net/qq_43692950/article/details/128151602