• SPark学习笔记:09SparkSQL 数据源之文件和JDBC


    概述

    SparkSQL支持通过DataFrame的接口操作各种各样的数据源。DataFrame既能够使用传统的算子做转换操作,也可以通过注册成临时视图的方式通过SQL来操作。这篇文章将总结Spark的各种数据源,以及如何将数据落地到文件或者传统的关系型数据库中。

    从json文件中load数据

    示例

    val sensorDf:DataFrame = spark.read
      .format("json")
      .option("prefersDecimal",true)
      .option("multiLine",true)
      .option("allowComments",true)
      .option("allowUnquotedFieldNames",true)
      .load("data/sensor.json")
    sensorDf.show()
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    参数说明

    timeZone

    • 说明:设置json文件中的日期时间的时区的值。支持的格式有两种:
    1. 'area/city’这种地区格式:eg.‘America/Los_Angeles’
    2. Zone+偏移量的方式:格式:‘(+|-)HH:mm’ eg.‘-8:00’或者’+1:00
    • 默认值:默认是spark.sql.session.timeZone参数的值

    primitivesAsString

    • 说明:将json文件的所有值都转换成String类型处理
    • 默认值:false
    • 用法:
    val sensorDf:DataFrame = spark.read
      .format("json")
      .option("primitivesAsString",true)
      .load("data/sensor.json")
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出:

    root
     |-- id: string (nullable = true)
     |-- temperature: string (nullable = true)
     |-- timestamp: string (nullable = true)
    
    • 1
    • 2
    • 3
    • 4

    prefersDecimal

    • 说明:将浮点数的值转换成decimal类型处理
    • 默认值:false
    • 用法:

    文件内容:

    {"id": "sensor1","timestamp": 1639968630, "temperature": 12.1}
    {"id": "sensor2", "timestamp": 1639968620,"temperature": 13.1}
    
    • 1
    • 2
    val sensorDf:DataFrame = spark.read
      .format("json")
      .option("prefersDecimal",true)
      .load("data/sensor.json")
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出:

    root
     |-- id: string (nullable = true)
     |-- temperature: decimal(3,1) (nullable = true)
     |-- timestamp: long (nullable = true)
    
    • 1
    • 2
    • 3
    • 4

    multiLine

    • 说明:json文件是否是多行的(用于处理如下格式的json文件)
    • 默认值:false
    • 用法:

    文件内容:

    [
        {"id": "sensor1","timestamp": 1639968630, "temperature": 12.1},
        {"id": "sensor2", "timestamp": 1639968620,"temperature": 13.1}
    ]
    
    • 1
    • 2
    • 3
    • 4
    val sensorDf:DataFrame = spark.read
      .format("json")
      .option("multiLine",true)
      .load("data/sensor.json")
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出:

    +-------+-----------+----------+
    |     id|temperature| timestamp|
    +-------+-----------+----------+
    |sensor1|       12.1|1639968630|
    |sensor6|       20.1|1639968530|
    +-------+-----------+----------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    allowComments

    • 说明:是否允许json文件中出现注释
    • 默认值:false
    • 用法:

    文件内容:

    {"id": "sensor1","timestamp": 1639968630, "temperature": 12.1}
    //这是一段json注释
    {"id": "sensor2", "timestamp": 1639968620,"temperature": 13.1}
    
    • 1
    • 2
    • 3
    val sensorDf:DataFrame = spark.read
      .format("json")
      .option("allowComments",true)
      .load("data/sensor.json")
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出:

    +-------+-----------+----------+
    |     id|temperature| timestamp|
    +-------+-----------+----------+
    |sensor1|       12.1|1639968630|
    |sensor6|       20.1|1639968530|
    +-------+-----------+----------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如果为false,则会报错,处理失败。

    allowUnquotedFieldNames

    • 说明:是否允许json文件数据的Key值不用’引号’括起来
    • 默认值:false,json格式的key必须要用引号包起来
    • 用法:

    文件内容:

    {"id": "sensor1","timestamp": 1639968630, "temperature": 12.1}
    {id: "sensor2", timestamp: 1639968620,"temperature": 13.1}
    
    • 1
    • 2
    val sensorDf:DataFrame = spark.read
      .format("json")
      .option("allowUnquotedFieldNames",true)
      .load("data/sensor.json")
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出:

    +-------+-----------+----------+
    |     id|temperature| timestamp|
    +-------+-----------+----------+
    |sensor1|       12.1|1639968630|
    |sensor6|       20.1|1639968530|
    +-------+-----------+----------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如果为false,则会报错,处理失败。

    allowSingleQuotes

    • 说明:是否允许json文件数据的Key使用“单引号”括起来
    • 默认值:true,json格式的key允许使用“单引号”括起来
    • 用法:

    文件内容:

    {"id": "sensor1","timestamp": 1639968630, "temperature": 12.1}
    {'id': "sensor2", 'timestamp': 1639968620,"temperature": 13.1}
    
    • 1
    • 2
    val sensorDf:DataFrame = spark.read
      .format("json")
      .option("allowSingleQuotes",true)
      .load("data/sensor.json")
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出:

    +-------+-----------+----------+
    |     id|temperature| timestamp|
    +-------+-----------+----------+
    |sensor1|       12.1|1639968630|
    |sensor6|       20.1|1639968530|
    +-------+-----------+----------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如果为false,则会报错,处理失败。

    dropFieldIfAllNull

    • 说明:如果key的值全部为null,就删除这一列(DataFrame中不显示)
    • 默认值:false
    • 用法:

    文件内容:

    {"id": "sensor1","timestamp": 1639968630, "temperature": null}
    {'id': "sensor2", 'timestamp': 1639968620,"temperature": null}
    
    • 1
    • 2
    val sensorDf:DataFrame = spark.read
      .format("json")
      .option("dropFieldIfAllNull",true)
      .load("data/sensor.json")
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出:

    root
     |-- id: string (nullable = true)
     |-- timestamp: long (nullable = true)
    
    • 1
    • 2
    • 3

    temperature的值全部为null,所以被从shcema中删除。

    dateFormat

    文件内容:

    {"id": null,"timestamp": null, "temperature": null,"enterdate": "20220701"}
    {"id": "sensor6","timestamp": 1639968530,"temperature": null,"enterdate": "20220701"}
    
    • 1
    • 2
    val sensorDf:DataFrame = spark.read
      .format("json")
      .option("dateFormat","yyyyMMdd")
      .schema("enterdate date,id string,temperature double,timestamp long")
      .load("data/sensor.json")
    sensorDf.show()
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    输出:

    +----------+-------+-----------+----------+
    | enterdate|     id|temperature| timestamp|
    +----------+-------+-----------+----------+
    |2022-07-01|   null|       null|      null|
    |2022-07-01|sensor6|       null|1639968530|
    +----------+-------+-----------+----------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    从CSV文件中load数据

    示例

    val sensorDf:DataFrame = spark.read
      .format("csv")
      .schema("id string,timestamp long,temperature double")
      .option("sep",";")
      .load("data/sensor.csv")
    sensorDf.show()
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    参数说明

    sep

    • 说明:指定CSV文件的分隔符
    • 默认值:,
    • 用法:

    文件内容:

    sensor_1;1639968630;13.4
    sensor_1;1639968631;42.8
    
    • 1
    • 2
    val sensorDf:DataFrame = spark.read
      .format("csv")
      .schema("id string,timestamp long,temperature double")
      .option("sep",";")
      .load("data/sensor.csv")
    sensorDf.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    输出:

    +--------+----------+-----------+
    |      id| timestamp|temperature|
    +--------+----------+-----------+
    |sensor_1|1639968630|       13.4|
    |sensor_1|1639968631|       42.8|
    +--------+----------+-----------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    comment

    • 说明:指定注释开头的字符,以这个字符开头的行都会被当成注释忽略掉
    • 默认值 :默认是关闭的,
    • 用法:

    文件内容:

    sensor_1;1639968630;13.4
    sensor_1;1639968631;42.8
    #这是一行注释;请忽略
    sensor_1;1639968630;13.4
    sensor_1;1639968631;42.8
    
    • 1
    • 2
    • 3
    • 4
    • 5
        val sensorDf:DataFrame = spark.read
          .format("csv")
          .schema("id string,timestamp long,temperature double")
          .option("sep",";")
          .option("comment","#")
          .load("data/sensor.csv")
        sensorDf.show()
        sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    输出:

    +--------+----------+-----------+
    |      id| timestamp|temperature|
    +--------+----------+-----------+
    |sensor_1|1639968630|       13.4|
    |sensor_1|1639968631|       42.8|
    |sensor_1|1639968630|       13.4|
    |sensor_1|1639968631|       42.8|
    +--------+----------+-----------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    header

    • 说明:表头,是否以第一行作为DataFrame的列名
    • 默认值 :false,
    • 用法:

    文件内容:

    id;timestamp;temperature
    sensor_1;1639968630;13.4
    sensor_1;1639968631;42.8
    
    • 1
    • 2
    • 3
    val sensorDf:DataFrame = spark.read
      .format("csv")
      .option("sep",";")
      .option("header",true)
      .load("data/sensor.csv")
    sensorDf.show()
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    输出:

    +--------+----------+-----------+
    |      id| timestamp|temperature|
    +--------+----------+-----------+
    |sensor_1|1639968630|       13.4|
    |sensor_1|1639968631|       42.8|
    +--------+----------+-----------+
    
    root
     |-- id: string (nullable = true)
     |-- timestamp: string (nullable = true)
     |-- temperature: string (nullable = true)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    lineSep

    • 说明:行分割符号
    • 默认值 :在read时默认是’\r\n’或者’\n’或者’\r’,在write时默认是’\n’
    • 用法:

    文件内容:

    id;timestamp;temperature
    sensor_1;1639968630;13.4
    sensor_1;1639968631;42.8
    
    • 1
    • 2
    • 3
    val sensorDf:DataFrame = spark.read
      .format("csv")
      .option("lineSep","\r\n")
      .option("header",true)
      .load("data/sensor.csv")
    sensorDf.show()
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    输出:

    +--------+----------+-----------+
    |      id| timestamp|temperature|
    +--------+----------+-----------+
    |sensor_1|1639968630|       13.4|
    |sensor_1|1639968631|       42.8|
    +--------+----------+-----------+
    
    root
     |-- id: string (nullable = true)
     |-- timestamp: string (nullable = true)
     |-- temperature: string (nullable = true)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    从ORC文件中load数据

    示例

    val sensorDf:DataFrame = spark.read
      .format("orc")
      .load("data/save/sensor/orc")
    sensorDf.show()
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    从Parquet文件中load数据

    示例

    val sensorDf:DataFrame = spark.read
      .format("parquet")
      .load("data/save/sensor/parquet")
    sensorDf.show()
    sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    从Text文件中load数据

    示例

        val sensorDf:DataFrame = spark.read
          .option("lineSep",";")
          .text("data/sensor.csv")
        sensorDf.show()
        sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    参数说明

    lineSep

    • 说明:行分隔符
    • 默认值:;
    • 用法:

    文件内容:

    sensor_1;1639968630;13.4;sensor_1;1639968631;42.8
    
    • 1
        val sensorDf:DataFrame = spark.read
          .option("lineSep",";")
          .text("data/sensor.csv")
        sensorDf.show()
        sensorDf.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出:

    +----------+
    |     value|
    +----------+
    |  sensor_1|
    |1639968630|
    |      13.4|
    |  sensor_1|
    |1639968631|
    |      42.8|
    +----------+
    
    root
     |-- value: string (nullable = true)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    通过JDBC从关系型数据库load数据

    示例使用

    val df = spark.read.format("jdbc")
      .option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
      .option("query","SELECT ID,TIMESTAMP FROM sensor")
      .option("driver","com.mysql.cj.jdbc.Driver")
      .option("queryTimeout",30)
      .load()
    df.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    内置支持的关系型数据库有:

    • DB2
    • MariaDB
    • MS Sql
    • Oracle
    • PostgreSQL

    参数说明

    url

    • 说明:jdbc连接的url,如果url中没有用户名和密码,则需要在option中添加"user"和“password”选项。
    • 用法:
    val df = spark.read
    .format("jdbc")
    .option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
    .load()
    
    • 1
    • 2
    • 3
    • 4

    query

    • 说明:指定取数据的查询语句,和dbtable只能配置一个(互斥)
    • 用法:
    val df = spark.read
    .format("jdbc")
    .option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
    .option("query","SELECT ID,TIMESTAMP FROM sensor")
    .load()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    dbtable

    • 说明:指定取数据表,和query只能配置一个(互斥)
    • 用法:
    val df = spark.read
    .format("jdbc")
    .option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
    .option("dbtable","sensor")
    .load()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    driver

    • 说明:指定数据库连接驱动,如果不指定,则使用默认的
    • 用法:
    val df = spark.read
    .format("jdbc")
    .option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
    .option("driver","com.mysql.cj.jdbc.Driver")
    .option("dbtable","sensor")
    .load()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    customSchema

    • 说明:自定义查询后的结果表的schema
    • 用法:
    val df = spark.read.format("jdbc")
      .option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
      .option("query","SELECT ID,TIMESTAMP FROM sensor")
      .option("customSchema","ID STRING,TIMESTAMP double")
      .load()
    df.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    官方文档:
    https://spark.apache.org/docs/latest/sql-data-sources-json.html

  • 相关阅读:
    获取当前时间为本周的第几小时,getHourOfWeek
    JG/T 543-2018 铝塑共挤门窗检测
    【Matplotlib】plt.plot() X轴横坐标展示完整整数坐标
    解读数仓常用模糊查询的优化方法
    GAN入门|第二篇:人脸图像生成(DCGAN)
    黑马点评关键业务流程梳理
    敏捷研发规范
    什么是特洛伊木马病毒?
    一笔画问题(中国邮递员问题)
    工业智能网关BL110应用之二十: 如何添加WAN口采集的设备
  • 原文地址:https://blog.csdn.net/wangzhongyudie/article/details/125997525