SparkSQL支持通过DataFrame的接口操作各种各样的数据源。DataFrame既能够使用传统的算子做转换操作,也可以通过注册成临时视图的方式通过SQL来操作。这篇文章将总结Spark的各种数据源,以及如何将数据落地到文件或者传统的关系型数据库中。
val sensorDf:DataFrame = spark.read
.format("json")
.option("prefersDecimal",true)
.option("multiLine",true)
.option("allowComments",true)
.option("allowUnquotedFieldNames",true)
.load("data/sensor.json")
sensorDf.show()
sensorDf.printSchema()
val sensorDf:DataFrame = spark.read
.format("json")
.option("primitivesAsString",true)
.load("data/sensor.json")
sensorDf.printSchema()
输出:
root
|-- id: string (nullable = true)
|-- temperature: string (nullable = true)
|-- timestamp: string (nullable = true)
文件内容:
{"id": "sensor1","timestamp": 1639968630, "temperature": 12.1}
{"id": "sensor2", "timestamp": 1639968620,"temperature": 13.1}
val sensorDf:DataFrame = spark.read
.format("json")
.option("prefersDecimal",true)
.load("data/sensor.json")
sensorDf.printSchema()
输出:
root
|-- id: string (nullable = true)
|-- temperature: decimal(3,1) (nullable = true)
|-- timestamp: long (nullable = true)
文件内容:
[
{"id": "sensor1","timestamp": 1639968630, "temperature": 12.1},
{"id": "sensor2", "timestamp": 1639968620,"temperature": 13.1}
]
val sensorDf:DataFrame = spark.read
.format("json")
.option("multiLine",true)
.load("data/sensor.json")
sensorDf.printSchema()
输出:
+-------+-----------+----------+
| id|temperature| timestamp|
+-------+-----------+----------+
|sensor1| 12.1|1639968630|
|sensor6| 20.1|1639968530|
+-------+-----------+----------+
文件内容:
{"id": "sensor1","timestamp": 1639968630, "temperature": 12.1}
//这是一段json注释
{"id": "sensor2", "timestamp": 1639968620,"temperature": 13.1}
val sensorDf:DataFrame = spark.read
.format("json")
.option("allowComments",true)
.load("data/sensor.json")
sensorDf.printSchema()
输出:
+-------+-----------+----------+
| id|temperature| timestamp|
+-------+-----------+----------+
|sensor1| 12.1|1639968630|
|sensor6| 20.1|1639968530|
+-------+-----------+----------+
如果为false,则会报错,处理失败。
文件内容:
{"id": "sensor1","timestamp": 1639968630, "temperature": 12.1}
{id: "sensor2", timestamp: 1639968620,"temperature": 13.1}
val sensorDf:DataFrame = spark.read
.format("json")
.option("allowUnquotedFieldNames",true)
.load("data/sensor.json")
sensorDf.printSchema()
输出:
+-------+-----------+----------+
| id|temperature| timestamp|
+-------+-----------+----------+
|sensor1| 12.1|1639968630|
|sensor6| 20.1|1639968530|
+-------+-----------+----------+
如果为false,则会报错,处理失败。
文件内容:
{"id": "sensor1","timestamp": 1639968630, "temperature": 12.1}
{'id': "sensor2", 'timestamp': 1639968620,"temperature": 13.1}
val sensorDf:DataFrame = spark.read
.format("json")
.option("allowSingleQuotes",true)
.load("data/sensor.json")
sensorDf.printSchema()
输出:
+-------+-----------+----------+
| id|temperature| timestamp|
+-------+-----------+----------+
|sensor1| 12.1|1639968630|
|sensor6| 20.1|1639968530|
+-------+-----------+----------+
如果为false,则会报错,处理失败。
文件内容:
{"id": "sensor1","timestamp": 1639968630, "temperature": null}
{'id': "sensor2", 'timestamp': 1639968620,"temperature": null}
val sensorDf:DataFrame = spark.read
.format("json")
.option("dropFieldIfAllNull",true)
.load("data/sensor.json")
sensorDf.printSchema()
输出:
root
|-- id: string (nullable = true)
|-- timestamp: long (nullable = true)
temperature的值全部为null,所以被从shcema中删除。
文件内容:
{"id": null,"timestamp": null, "temperature": null,"enterdate": "20220701"}
{"id": "sensor6","timestamp": 1639968530,"temperature": null,"enterdate": "20220701"}
val sensorDf:DataFrame = spark.read
.format("json")
.option("dateFormat","yyyyMMdd")
.schema("enterdate date,id string,temperature double,timestamp long")
.load("data/sensor.json")
sensorDf.show()
sensorDf.printSchema()
输出:
+----------+-------+-----------+----------+
| enterdate| id|temperature| timestamp|
+----------+-------+-----------+----------+
|2022-07-01| null| null| null|
|2022-07-01|sensor6| null|1639968530|
+----------+-------+-----------+----------+
val sensorDf:DataFrame = spark.read
.format("csv")
.schema("id string,timestamp long,temperature double")
.option("sep",";")
.load("data/sensor.csv")
sensorDf.show()
sensorDf.printSchema()
文件内容:
sensor_1;1639968630;13.4
sensor_1;1639968631;42.8
val sensorDf:DataFrame = spark.read
.format("csv")
.schema("id string,timestamp long,temperature double")
.option("sep",";")
.load("data/sensor.csv")
sensorDf.show()
输出:
+--------+----------+-----------+
| id| timestamp|temperature|
+--------+----------+-----------+
|sensor_1|1639968630| 13.4|
|sensor_1|1639968631| 42.8|
+--------+----------+-----------+
文件内容:
sensor_1;1639968630;13.4
sensor_1;1639968631;42.8
#这是一行注释;请忽略
sensor_1;1639968630;13.4
sensor_1;1639968631;42.8
val sensorDf:DataFrame = spark.read
.format("csv")
.schema("id string,timestamp long,temperature double")
.option("sep",";")
.option("comment","#")
.load("data/sensor.csv")
sensorDf.show()
sensorDf.printSchema()
输出:
+--------+----------+-----------+
| id| timestamp|temperature|
+--------+----------+-----------+
|sensor_1|1639968630| 13.4|
|sensor_1|1639968631| 42.8|
|sensor_1|1639968630| 13.4|
|sensor_1|1639968631| 42.8|
+--------+----------+-----------+
文件内容:
id;timestamp;temperature
sensor_1;1639968630;13.4
sensor_1;1639968631;42.8
val sensorDf:DataFrame = spark.read
.format("csv")
.option("sep",";")
.option("header",true)
.load("data/sensor.csv")
sensorDf.show()
sensorDf.printSchema()
输出:
+--------+----------+-----------+
| id| timestamp|temperature|
+--------+----------+-----------+
|sensor_1|1639968630| 13.4|
|sensor_1|1639968631| 42.8|
+--------+----------+-----------+
root
|-- id: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- temperature: string (nullable = true)
文件内容:
id;timestamp;temperature
sensor_1;1639968630;13.4
sensor_1;1639968631;42.8
val sensorDf:DataFrame = spark.read
.format("csv")
.option("lineSep","\r\n")
.option("header",true)
.load("data/sensor.csv")
sensorDf.show()
sensorDf.printSchema()
输出:
+--------+----------+-----------+
| id| timestamp|temperature|
+--------+----------+-----------+
|sensor_1|1639968630| 13.4|
|sensor_1|1639968631| 42.8|
+--------+----------+-----------+
root
|-- id: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- temperature: string (nullable = true)
val sensorDf:DataFrame = spark.read
.format("orc")
.load("data/save/sensor/orc")
sensorDf.show()
sensorDf.printSchema()
val sensorDf:DataFrame = spark.read
.format("parquet")
.load("data/save/sensor/parquet")
sensorDf.show()
sensorDf.printSchema()
val sensorDf:DataFrame = spark.read
.option("lineSep",";")
.text("data/sensor.csv")
sensorDf.show()
sensorDf.printSchema()
文件内容:
sensor_1;1639968630;13.4;sensor_1;1639968631;42.8
val sensorDf:DataFrame = spark.read
.option("lineSep",";")
.text("data/sensor.csv")
sensorDf.show()
sensorDf.printSchema()
输出:
+----------+
| value|
+----------+
| sensor_1|
|1639968630|
| 13.4|
| sensor_1|
|1639968631|
| 42.8|
+----------+
root
|-- value: string (nullable = true)
val df = spark.read.format("jdbc")
.option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
.option("query","SELECT ID,TIMESTAMP FROM sensor")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("queryTimeout",30)
.load()
df.show()
内置支持的关系型数据库有:
val df = spark.read
.format("jdbc")
.option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
.load()
val df = spark.read
.format("jdbc")
.option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
.option("query","SELECT ID,TIMESTAMP FROM sensor")
.load()
val df = spark.read
.format("jdbc")
.option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
.option("dbtable","sensor")
.load()
val df = spark.read
.format("jdbc")
.option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("dbtable","sensor")
.load()
val df = spark.read.format("jdbc")
.option("url","jdbc:mysql://127.0.0.1:3307/news?user=root&password=root")
.option("query","SELECT ID,TIMESTAMP FROM sensor")
.option("customSchema","ID STRING,TIMESTAMP double")
.load()
df.show()
官方文档:
https://spark.apache.org/docs/latest/sql-data-sources-json.html