🔎大家好,我是Sonhhxg_柒,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流🔎
📝个人主页-Sonhhxg_柒的博客_CSDN博客 📃
🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝
📣系列专栏 - 机器学习【ML】 自然语言处理【NLP】 深度学习【DL】
🖍foreword
✔说明⇢本人讲解主要包括Python、机器学习(ML)、深度学习(DL)、自然语言处理(NLP)等内容。
如果你对这个系列感兴趣的话,可以关注订阅哟👋
文章目录
使用 Pandas UDF 加速和分发 PySpark UDF
在上一章中,我们探讨了与 Spark 中内置数据源的交互。我们还仔细研究了 DataFrame API 及其与 Spark SQL 的互操作性。在本章中,我们将关注 Spark SQL 如何与外部组件交互。具体来说,我们将讨论 Spark SQL 如何让您:
为 Apache Hive 和 Apache Spark 使用用户定义的函数。
连接外部数据源,例如 JDBC 和 SQL 数据库、PostgreSQL、MySQL、Tableau、Azure Cosmos DB 和 MS SQL Server。
使用简单和复杂的类型、高阶函数和常见的关系运算符。
我们还将介绍一些使用 Spark SQL 查询 Spark 的不同选项,例如 Spark SQL shell、Beeline 和 Tableau。
Spark SQL 是 Apache Spark 的一个基础组件,它将关系处理与 Spark 的函数式编程 API 集成在一起。它的起源是在之前对鲨鱼的研究中。Shark 最初是在 Apache Spark 1之上的 Hive 代码库上构建的,并成为 Hadoop 系统上第一个交互式 SQL 查询引擎之一。它证明了两全其美是可能的;与企业数据仓库一样快,可扩展性以及 Hive/MapReduce。
Spark SQL 让 Spark 程序员可以利用更快的性能和关系编程(例如,声明式查询和优化的存储)以及调用复杂的分析库(例如,机器学习)的优势。如前一章所述,从 Apache Spark 2.x 开始,SparkSparkSession
提供了一个统一的入口点来操作数据。
虽然 Apache Spark 有大量的内置函数,但 Spark 的灵活性允许数据工程师和数据科学家也可以定义自己的函数。这些被称为用户定义函数(UDF)。
创建您自己的 PySpark 或 Scala UDF 的好处是您(和其他人)将能够在 Spark SQL 本身中使用它们。例如,数据科学家可以将 ML 模型包装在 UDF 中,以便数据分析师可以在 Spark SQL 中查询其预测,而不必了解模型的内部结构。
下面是创建 Spark SQL UDF 的简化示例。请注意,UDF 在每个会话中运行,它们不会保留在底层元存储中:
- // In Scala
- // Create cubed function
- val cubed = (s: Long) => {
- s * s * s
- }
-
- // Register UDF
- spark.udf.register("cubed", cubed)
-
- // Create temporary view
- spark.range(1, 9).createOrReplaceTempView("udf_test")
- # In Python
- from pyspark.sql.types import LongType
-
- # Create cubed function
- def cubed(s):
- return s * s * s
-
- # Register UDF
- spark.udf.register("cubed", cubed, LongType())
-
- # Generate temporary view
- spark.range(1, 9).createOrReplaceTempView("udf_test")
您现在可以使用 Spark SQL 执行以下任一cubed()
功能:
- // In Scala/Python
- // Query the cubed UDF
- spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()
-
- +---+--------+
- | id|id_cubed|
- +---+--------+
- | 1| 1|
- | 2| 8|
- | 3| 27|
- | 4| 64|
- | 5| 125|
- | 6| 216|
- | 7| 343|
- | 8| 512|
- +---+--------+
Spark SQL(包括 SQL、DataFrame API 和 Dataset API)不保证子表达式的求值顺序。例如,以下查询不保证子句在s is NOT NULL
子句之前执行strlen(s) > 1
:
spark.sql("SELECT s FROM test1 WHERE s IS NOT NULL AND strlen(s) > 1")
因此,要执行正确的null
检查,建议您执行以下操作:
使 UDF 本身具有null
感知能力并null
在 UDF 内部进行检查。
使用IF
orCASE WHEN
表达式进行null
检查并在条件分支中调用 UDF。
以前使用 PySpark UDF 的一个普遍问题是它们的性能比 Scala UDF 慢。这是因为 PySpark UDF 需要在 JVM 和 Python 之间移动数据,这非常昂贵。为了解决这个问题,Pandas UDF(也称为矢量化 UDF)作为 Apache Spark 2.3 的一部分被引入。Pandas UDF 使用 Apache Arrow 传输数据,使用 Pandas 处理数据。pandas_udf
您可以使用关键字作为装饰器来定义 Pandas UDF ,或者包装函数本身。一旦数据采用Apache Arrow 格式,不再需要序列化/腌制数据,因为它已经是 Python 进程可以使用的格式。您不是在逐行操作单个输入,而是在 Pandas Series 或 DataFrame 上操作(即矢量化执行)。
从带有 Python 3.6 及更高版本的 Apache Spark 3.0 开始,Pandas UDF 分为两个 API 类别:Pandas UDF 和 Pandas Function API。
Pandas UDF
在 Apache Spark 3.0 中,Pandas UDF 从 Pandas UDF 中的 Python 类型提示(例如pandas.Series
、pandas.DataFrame
、Tuple
和Iterator
. 以前,您需要手动定义和指定每个 Pandas UDF 类型。目前,Pandas UDF 中 Python 类型提示支持的案例有:Series to Series、Iterator of Series to Iterator of Series、Iterator of Multiple Series to Iterator of Series、Series to Scalar(单值)。
Pandas 函数 API
Pandas 函数 API 允许您将本地 Python 函数直接应用于 PySpark DataFrame,其中输入和输出都是 Pandas 实例。对于 Spark 3.0,支持的 Pandas 函数 API 是 grouped map、map、co-grouped map。
有关更多信息,请参阅第 12 章中的“使用 Python 类型提示重新设计的 Pandas UDF”。
以下是 Spark 3.0 的标量 Pandas UDF 示例:2# In Python
# Import pandas
import
pandas
as
- # In Python
- # Import pandas
- import pandas as pd
-
- # Import various pyspark SQL functions including pandas_udf
- from pyspark.sql.functions import col, pandas_udf
- from pyspark.sql.types import LongType
-
- # Declare the cubed function
- def cubed(a: pd.Series) -> pd.Series:
- return a * a * a
-
- # Create the pandas UDF for the cubed function
- cubed_udf = pandas_udf(cubed, returnType=LongType())
前面的代码片段声明了一个调用的函数,该函数cubed()
执行立方操作。这是一个常规的 Pandas 函数,带有额外的cubed_udf = pandas_udf()
调用来创建我们的 Pandas UDF。
让我们从一个简单的 Pandas Series(定义为x
)开始,然后应用局部函数cubed()
进行立方计算:
- # Create a Pandas Series
- x = pd.Series([1, 2, 3])
-
- # The function for a pandas_udf executed with local Pandas data
- print(cubed(x))
输出如下:
0 1 1 8 2 27 dtype: int64
现在让我们切换到 Spark DataFrame。我们可以将此函数作为 Spark 矢量化 UDF 执行,如下所示:
- # Create a Spark DataFrame, 'spark' is an existing SparkSession
- df = spark.range(1, 4)
-
- # Execute function as a Spark vectorized UDF
- df.select("id", cubed_udf(col("id"))).show()
这是输出:
- +---+---------+
- | id|cubed(id)|
- +---+---------+
- | 1| 1|
- | 2| 8|
- | 3| 27|
- +---+---------+
与本地函数相反,使用矢量化 UDF 将导致 Spark 作业的执行;前面的本地函数是只在 Spark 驱动程序上执行的 Pandas 函数。在查看此pandas_udf
功能的某个阶段的 Spark UI 时,这一点变得更加明显(图 5-1)。
笔记
如需更深入地了解 Pandas UDF,请参阅pandas 用户定义函数文档.
图 5-1。用于在 Spark DataFrame 上执行 Pandas UDF 的 Spark UI 阶段
与许多 Spark 作业一样,该作业首先parallelize()
将本地数据(Arrow 二进制批处理)发送到执行程序,并调用mapPartitions()
将 Arrow 二进制批处理转换为 Spark 的内部数据格式,然后将其分发给 Spark 工作人员。有许多WholeStageCodegen
步骤代表了性能的基本提升(感谢 Project Tungsten 的全阶段代码生成,显着提高了 CPU 效率和性能)。但它是ArrowEvalPython
标识(在这种情况下)正在执行 Pandas UDF 的步骤。
查询 Apache Spark 的机制有多种,包括 Spark SQL shell、Beeline CLI 实用程序以及 Tableau 和 Power BI 等报告工具。
在本节中,我们包括 Tableau 的说明;对于 Power BI,请参阅文档。
执行 Spark SQL 查询的便捷工具是spark-sql
CLI。虽然此实用程序在本地模式下与 Hive 元存储服务通信,但它不与Thrift JDBC/ODBC 服务器(又名Spark Thrift 服务器或STS)通信。STS 允许 JDBC/ODBC 客户端通过 Apache Spark 上的 JDBC 和 ODBC 协议执行 SQL 查询。
要启动 Spark SQL CLI,请在文件夹中执行以下命令$SPARK_HOME
:
./bin/spark-sql
启动 shell 后,您可以使用它以交互方式执行 Spark SQL 查询。让我们看几个例子。
spark-sql> CREATE TABLE people (name STRING,age int);
您的输出应该与此类似,注意 Spark SQL 表的创建people
及其文件位置 ( /user/hive/warehouse/people
):
- 20/01/11 22:42:16 WARN HiveMetaStore: Location: file:/user/hive/warehouse/people
- specified for non-external table:people
- Time taken: 0.63 seconds
您可以通过执行类似于以下内容的语句将数据插入到 Spark SQL 表中:
INSERT INTO people SELECT name, age FROM ...
由于您不依赖于从预先存在的表或文件中加载数据,因此您可以使用INSERT...VALUES
语句将数据插入到表中。这三个语句将三个人(他们的姓名和年龄,如果知道的话)插入到people
表中:
- spark-sql> INSERT INTO people VALUES ("Michael", NULL);
- Time taken: 1.696 seconds
- spark-sql> INSERT INTO people VALUES ("Andy", 30);
- Time taken: 0.744 seconds
- spark-sql> INSERT INTO people VALUES ("Samantha", 19);
- Time taken: 0.637 seconds
- spark-sql>
现在您的表中有数据,您可以针对它运行 Spark SQL 查询。让我们首先查看我们的元存储中存在哪些表:
- spark-sql> SHOW TABLES;
- default people false
- Time taken: 0.016 seconds, Fetched 1 row(s)
接下来,让我们看看我们表中有多少人年龄小于 20 岁:
- spark-sql> SELECT * FROM people WHERE age < 20;
- Samantha 19
- Time taken: 0.593 seconds, Fetched 1 row(s)
同样,让我们看看没有指定年龄的人是谁:
- spark-sql> SELECT name FROM people WHERE age IS NULL;
- Michael
- Time taken: 0.272 seconds, Fetched 1 row(s)
如果您使用过 Apache Hive,您可能熟悉命令行工具Beeline,这是一种针对 HiveServer2 运行 HiveQL 查询的常用实用程序。Beeline 是一个基于SQLLine CLI的 JDBC 客户端。您可以使用相同的实用程序对 Spark Thrift 服务器执行 Spark SQL 查询。请注意,当前实现的 Thrift JDBC/ODBC 服务器对应于 Hive 1.2.1 中的 HiveServer2。您可以使用 Spark 或 Hive 1.2.1 附带的以下 Beeline 脚本来测试 JDBC 服务器。
要使用 Beeline 测试 Thrift JDBC/ODBC 服务器,请执行以下命令:
./bin/beeline
然后配置 Beeline 连接到本地 Thrift 服务器:
!connect jdbc:hive2://localhost:10000
笔记
默认情况下,直线处于非安全模式。因此,用户名是您的登录名(例如,user@learningspark.org
),密码为空。
从这里,您可以运行类似于使用 Beeline 运行 Hive 查询的 Spark SQL 查询。以下是一些示例查询及其输出:
- 0: jdbc:hive2://localhost:10000> SHOW tables;
-
- +-----------+------------+--------------+
- | database | tableName | isTemporary |
- +-----------+------------+--------------+
- | default | people | false |
- +-----------+------------+--------------+
- 1 row selected (0.417 seconds)
-
- 0: jdbc:hive2://localhost:10000> SELECT * FROM people;
-
- +-----------+-------+
- | name | age |
- +-----------+-------+
- | Samantha | 19 |
- | Andy | 30 |
- | Michael | NULL |
- +-----------+-------+
- 3 rows selected (1.512 seconds)
-
- 0: jdbc:hive2://localhost:10000>
与通过 Beeline 或 Spark SQL CLI 运行查询类似,您可以通过 Thrift JDBC/ODBC 服务器将您最喜欢的 BI 工具连接到 Spark SQL。在本节中,我们将向您展示如何将 Tableau Desktop(版本 2019.2)连接到您的本地 Apache Spark 实例。
笔记
您需要已安装Tableau 的 Spark ODBC驱动程序版本 1.2.0 或更高版本。如果您已安装(或升级到)Tableau 2018.1 或更高版本,则应已预安装此驱动程序。
要启动 Spark Thrift JDBC/ODBC 服务器,请从文件夹中执行以下命令$SPARK_HOME
:
./sbin/start-thriftserver.sh
笔记
如果您尚未启动 Spark 驱动程序和工作程序,请在之前执行以下命令
start-thriftserver.sh
:
./sbin/start-all.sh
如果您是第一次启动 Tableau,您会看到一个“连接”对话框,允许您连接到大量数据源。默认情况下,左侧的“To a Server”菜单中不会包含 Spark SQL 选项(见图 5-2)。
图 5-2。Tableau 连接对话框
要访问 Spark SQL 选项,请单击该列表底部的更多...,然后从主面板中显示的列表中选择 Spark SQL,如图 5-3所示。
图 5-3。选择更多... > Spark SQL 以连接到 Spark SQL
这将弹出 Spark SQL 对话框(图 5-4)。当您连接到本地 Apache Spark 实例时,您可以使用具有以下参数的非安全用户名身份验证模式:
服务器:本地主机
端口:10000(默认)
类型:SparkThriftServer(默认)
身份验证:用户名
用户名:您的登录名,例如 user@learningspark.org
需要 SSL:未选中
图 5-4。Spark SQL 对话框
成功连接到 Spark SQL 数据源后,您将看到类似于图 5-5的 Data Source Connections 视图。
图 5-5。Tableau 数据源连接视图,连接到本地 Spark 实例
从左侧的 Select Schema 下拉菜单中,选择“default”。然后输入要查询的表名(见图5-6)。请注意,您可以单击放大镜图标来获取可用表格的完整列表。
图 5-6。选择要查询的架构和表
笔记
有关使用 Tableau 连接到 Spark SQL 数据库的详细信息,请参阅 Tableau 的Spark SQL 文档和 Databricks Tableau 文档。
输入people
表格名称,然后将表格从左侧拖放到主对话框中(在标有“将表格拖到此处”的空间中)。您应该看到类似于图 5-7的内容。
图 5-7。连接到本地 Spark 实例中的人员表
单击立即更新,Tableau 将在后台查询您的 Spark SQL 数据源(图 5-8)。
您现在可以对 Spark 数据源、联接表等执行查询,就像对任何其他 Tableau 数据源一样。
图 5-8。查询本地 Spark 数据源的 Tableau 工作表表视图
在本节中,我们将重点介绍如何使用 Spark SQL 连接外部数据源,从 JDBC 和 SQL 数据库开始。
Spark SQL 包含一个数据源 API,可以使用JDBC从其他数据库读取数据。它简化了对这些数据源的查询,因为它将结果作为 DataFrame 返回,从而提供了 Spark SQL 的所有优点(包括性能和与其他数据源连接的能力)。
首先,您需要为您的 JDBC 数据源指定 JDBC 驱动程序,并且它需要位于 Spark 类路径中。从$SPARK_HOME
文件夹中,您将发出如下命令:
./bin/spark-shell --driver-class-path $database.jar --jars $database.jar
使用数据源 API,可以将远程数据库中的表加载为 DataFrame 或 Spark SQL 临时视图。用户可以在数据源选项中指定 JDBC 连接属性。表 5-1包含 Spark 支持的一些更常见的连接属性(不区分大小写)。
属性名称 | 描述 |
---|---|
user , password | 这些通常作为连接属性提供,用于登录数据源。 |
url | JDBC 连接 URL,例如jdbc:postgresql://localhost/test?user=fred&password=secret . |
dbtable | 要读取或写入的 JDBC 表。您不能同时指定dbtable 和query 选项。 |
query | 用于从 Apache Spark 读取数据的查询,例如SELECT column1, column2, ..., columnN FROM [table|subquery] . 您不能同时指定query 和dbtable 选项。 |
driver | 用于连接到指定 URL 的 JDBC 驱动程序的类名。 |
有关连接属性的完整列表,请参阅Spark SQL 文档。
在 Spark SQL 和 JDBC 外部源之间传输大量数据时,对数据源进行分区非常重要。您的所有数据都通过一个驱动程序连接,这可能会使您的提取性能饱和并显着减慢,并可能使源系统的资源饱和。虽然这些 JDBC 属性是可选的,但对于任何大规模操作,强烈建议使用表 5-2中显示的属性.
属性名称 | 描述 |
---|---|
numPartitions | 表读写中可用于并行的最大分区数。这也决定了并发 JDBC 连接的最大数量。 |
partitionColumn | 读取外部源时,partitionColumn 是用于确定分区的列;注意,partitionColumn 必须是数字、日期或时间戳列。 |
lowerBound | 设置分区步幅的最小值partitionColumn 。 |
upperBound | 设置分区步幅的最大值partitionColumn 。 |
让我们看一个示例来帮助您了解这些属性是如何工作的。假设我们使用以下设置:
numPartitions
: 10
lowerBound
:0
upperBound
:10000
那么步幅等于1000,就会创建10个分区。这相当于执行这 10 个查询(每个分区一个):
SELECT * FROM table WHERE partitionColumn BETWEEN 0 and 1000
SELECT * FROM table WHERE partitionColumn BETWEEN 1000 and 2000
...
SELECT * FROM table WHERE partitionColumn BETWEEN 9000 and 10000
虽然并非包罗万象,但以下是使用这些属性时要牢记的一些提示:
一个好的起点numPartitions
是使用 Spark 工作人员数量的倍数。例如,如果您有四个 Spark 工作节点,那么可能从 4 或 8 个分区开始。但同样重要的是要注意您的源系统处理读取请求的能力。对于有处理窗口的系统,可以最大化对源系统的并发请求数;对于缺少处理窗口的系统(例如,OLTP 系统不断处理数据),您应该减少并发请求的数量以防止源系统饱和。
最初,根据最小和最大实际值计算lowerBound
和。例如,如果您选择,但所有值都介于和之间,则 10 个查询中只有 2 个(每个分区一个)将完成所有工作。在这种情况下,更好的配置是.upperBound
partitionColumn
{numPartitions:10, lowerBound: 0, upperBound: 10000}
2000
4000
{numPartitions:10, lowerBound: 0, upperBound: 4000}
选择一个partitionColumn
可以均匀分布的,避免数据倾斜。例如,如果您的大多数人partitionColumn
具有值2500
,则{numPartitions:10, lowerBound: 0, upperBound: 10000}
大部分工作将由请求介于2000
和之间的值的任务执行3000
。相反,选择一个不同的partitionColumn
,或者如果可能的话生成一个新的(可能是多列的哈希)来更均匀地分布你的分区.
要连接到 PostgreSQL 数据库,请从Maven构建或下载 JDBC jar并将其添加到您的类路径中。然后启动 Spark shell(spark-shell
或pyspark
),指定该 jar:
bin/spark-shell --jars postgresql-42.2.6.jar
以下示例展示了如何在 Scala 中使用 Spark SQL 数据源 API 和 JDBC 从 PostgreSQL 数据库中加载和保存:
- // In Scala
- // Read Option 1: Loading data from a JDBC source using load method
- val jdbcDF1 = spark
- .read
- .format("jdbc")
- .option("url", "jdbc:postgresql:[DBSERVER]")
- .option("dbtable", "[SCHEMA].[TABLENAME]")
- .option("user", "[USERNAME]")
- .option("password", "[PASSWORD]")
- .load()
-
- // Read Option 2: Loading data from a JDBC source using jdbc method
- // Create connection properties
- import java.util.Properties
- val cxnProp = new Properties()
- cxnProp.put("user", "[USERNAME]")
- cxnProp.put("password", "[PASSWORD]")
-
- // Load data using the connection properties
- val jdbcDF2 = spark
- .read
- .jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]", cxnProp)
-
- // Write Option 1: Saving data to a JDBC source using save method
- jdbcDF1
- .write
- .format("jdbc")
- .option("url", "jdbc:postgresql:[DBSERVER]")
- .option("dbtable", "[SCHEMA].[TABLENAME]")
- .option("user", "[USERNAME]")
- .option("password", "[PASSWORD]")
- .save()
-
- // Write Option 2: Saving data to a JDBC source using jdbc method
- jdbcDF2.write
- .jdbc(s"jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]", cxnProp)
以下是在 PySpark 中的操作方法:
- # In Python
- # Read Option 1: Loading data from a JDBC source using load method
- jdbcDF1 = (spark
- .read
- .format("jdbc")
- .option("url", "jdbc:postgresql://[DBSERVER]")
- .option("dbtable", "[SCHEMA].[TABLENAME]")
- .option("user", "[USERNAME]")
- .option("password", "[PASSWORD]")
- .load())
-
- # Read Option 2: Loading data from a JDBC source using jdbc method
- jdbcDF2 = (spark
- .read
- .jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",
- properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))
-
- # Write Option 1: Saving data to a JDBC source using save method
- (jdbcDF1
- .write
- .format("jdbc")
- .option("url", "jdbc:postgresql://[DBSERVER]")
- .option("dbtable", "[SCHEMA].[TABLENAME]")
- .option("user", "[USERNAME]")
- .option("password", "[PASSWORD]")
- .save())
-
- # Write Option 2: Saving data to a JDBC source using jdbc method
- (jdbcDF2
- .write
- .jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]",
- properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))
要连接到 MySQL 数据库,请从Maven或MySQL(后者更容易!)构建或下载 JDBC jar,并将其添加到您的类路径中。然后启动 Spark shell(spark-shell
或pyspark
),指定该 jar:
bin/spark-shell --jars mysql-connector-java_8.0.16-bin.jar
以下示例展示了如何使用 Spark SQL 数据源 API 和 Scala 中的 JDBC 从 MySQL 数据库中加载数据并将其保存到数据库中:
- // In Scala
- // Loading data from a JDBC source using load
- val jdbcDF = spark
- .read
- .format("jdbc")
- .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
- .option("driver", "com.mysql.jdbc.Driver")
- .option("dbtable", "[TABLENAME]")
- .option("user", "[USERNAME]")
- .option("password", "[PASSWORD]")
- .load()
-
- // Saving data to a JDBC source using save
- jdbcDF
- .write
- .format("jdbc")
- .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
- .option("driver", "com.mysql.jdbc.Driver")
- .option("dbtable", "[TABLENAME]")
- .option("user", "[USERNAME]")
- .option("password", "[PASSWORD]")
- .save()
下面是如何在 Python 中做到这一点:
- # In Python
- # Loading data from a JDBC source using load
- jdbcDF = (spark
- .read
- .format("jdbc")
- .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
- .option("driver", "com.mysql.jdbc.Driver")
- .option("dbtable", "[TABLENAME]")
- .option("user", "[USERNAME]")
- .option("password", "[PASSWORD]")
- .load())
-
- # Saving data to a JDBC source using save
- (jdbcDF
- .write
- .format("jdbc")
- .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
- .option("driver", "com.mysql.jdbc.Driver")
- .option("dbtable", "[TABLENAME]")
- .option("user", "[USERNAME]")
- .option("password", "[PASSWORD]")
- .save())
若要连接到 Azure Cosmos DB 数据库,请从Maven或GitHub构建或下载 JDBC jar并将其添加到您的类路径中。然后启动一个 Scala 或 PySpark shell,指定这个 jar(注意这个例子使用的是 Spark 2.4):
bin/spark-shell --jars azure-cosmosdb-spark_2.4.0_2.11-1.3.5-uber.jar
您还可以选择使用其 Maven 坐标从Spark 包--packages
中拉出连接器:
- export PKG="com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5"
- bin/spark-shell --packages $PKG
以下示例说明如何使用 Scala 和 PySpark 中的 Spark SQL 数据源 API 和 JDBC 从 Azure Cosmos DB 数据库加载数据并将其保存到。请注意,通常使用query_custom
配置来利用 Cosmos DB 中的各种索引:
- // In Scala
- // Import necessary libraries
- import com.microsoft.azure.cosmosdb.spark.schema._
- import com.microsoft.azure.cosmosdb.spark._
- import com.microsoft.azure.cosmosdb.spark.config.Config
-
- // Loading data from Azure Cosmos DB
- // Configure connection to your collection
- val query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"
- val readConfig = Config(Map(
- "Endpoint" -> "https://[ACCOUNT].documents.azure.com:443/",
- "Masterkey" -> "[MASTER KEY]",
- "Database" -> "[DATABASE]",
- "PreferredRegions" -> "Central US;East US2;",
- "Collection" -> "[COLLECTION]",
- "SamplingRatio" -> "1.0",
- "query_custom" -> query
- ))
-
- // Connect via azure-cosmosdb-spark to create Spark DataFrame
- val df = spark.read.cosmosDB(readConfig)
- df.count
-
- // Saving data to Azure Cosmos DB
- // Configure connection to the sink collection
- val writeConfig = Config(Map(
- "Endpoint" -> "https://[ACCOUNT].documents.azure.com:443/",
- "Masterkey" -> "[MASTER KEY]",
- "Database" -> "[DATABASE]",
- "PreferredRegions" -> "Central US;East US2;",
- "Collection" -> "[COLLECTION]",
- "WritingBatchSize" -> "100"
- ))
-
- // Upsert the DataFrame to Azure Cosmos DB
- import org.apache.spark.sql.SaveMode
- df.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)
- # In Python
- # Loading data from Azure Cosmos DB
- # Read configuration
- query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"
- readConfig = {
- "Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",
- "Masterkey" : "[MASTER KEY]",
- "Database" : "[DATABASE]",
- "preferredRegions" : "Central US;East US2",
- "Collection" : "[COLLECTION]",
- "SamplingRatio" : "1.0",
- "schema_samplesize" : "1000",
- "query_pagesize" : "2147483647",
- "query_custom" : query
- }
-
- # Connect via azure-cosmosdb-spark to create Spark DataFrame
- df = (spark
- .read
- .format("com.microsoft.azure.cosmosdb.spark")
- .options(**readConfig)
- .load())
-
- # Count the number of flights
- df.count()
-
- # Saving data to Azure Cosmos DB
- # Write configuration
- writeConfig = {
- "Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",
- "Masterkey" : "[MASTER KEY]",
- "Database" : "[DATABASE]",
- "Collection" : "[COLLECTION]",
- "Upsert" : "true"
- }
-
- # Upsert the DataFrame to Azure Cosmos DB
- (df.write
- .format("com.microsoft.azure.cosmosdb.spark")
- .options(**writeConfig)
- .save())
有关详细信息,请参阅Azure Cosmos DB 文档。
要连接到 MS SQL Server 数据库,请下载 JDBC jar并将其添加到您的类路径中。然后启动一个 Scala 或 PySpark shell,指定这个 jar:
bin/spark-shell --jars mssql-jdbc-7.2.2.jre8.jar
以下示例展示了如何使用 Scala 和 PySpark 中的 Spark SQL 数据源 API 和 JDBC 从 MS SQL Server 数据库加载数据并将其保存到:
- // In Scala
- // Loading data from a JDBC source
- // Configure jdbcUrl
- val jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"
-
- // Create a Properties() object to hold the parameters.
- // Note, you can create the JDBC URL without passing in the
- // user/password parameters directly.
- val cxnProp = new Properties()
- cxnProp.put("user", "[USERNAME]")
- cxnProp.put("password", "[PASSWORD]")
- cxnProp.put("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
-
- // Load data using the connection properties
- val jdbcDF = spark.read.jdbc(jdbcUrl, "[TABLENAME]", cxnProp)
-
- // Saving data to a JDBC source
- jdbcDF.write.jdbc(jdbcUrl, "[TABLENAME]", cxnProp)
- # In Python
- # Configure jdbcUrl
- jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"
-
- # Loading data from a JDBC source
- jdbcDF = (spark
- .read
- .format("jdbc")
- .option("url", jdbcUrl)
- .option("dbtable", "[TABLENAME]")
- .option("user", "[USERNAME]")
- .option("password", "[PASSWORD]")
- .load())
-
- # Saving data to a JDBC source
- (jdbcDF
- .write
- .format("jdbc")
- .option("url", jdbcUrl)
- .option("dbtable", "[TABLENAME]")
- .option("user", "[USERNAME]")
- .option("password", "[PASSWORD]")
- .save())
因为复杂数据类型是简单数据类型的合并,所以很容易直接操作它们。处理复杂数据类型有两种典型的解决方案:
将嵌套结构分解成单独的行,应用一些函数,然后重新创建嵌套结构
构建用户定义的函数
这些方法的好处是允许您以表格格式思考问题。它们通常涉及(但不限于)使用效用函数,例如get_json_object()
、from_json()
、to_json()
、explode()
和selectExpr()
。
让我们仔细看看这两个选项。
在这个嵌套的 SQL 语句中,我们首先为 中的每个元素 ( )explode(values)
创建一个新行 (带有) :id
value
values
- -- In SQL
- SELECT id, collect_list(value + 1) AS values
- FROM (SELECT id, EXPLODE(values) AS value
- FROM table) x
- GROUP BY id
虽然collect_list()
返回具有重复的对象列表,但该GROUP BY
语句需要随机操作,这意味着重新收集的数组的顺序不一定与原始数组的顺序相同。由于values
可以是任意数量的维度(一个非常宽和/或非常长的数组)并且我们正在做一个GROUP BY
,这种方法可能非常昂贵。
要执行相同的任务(添加1
到 中的每个元素values
),我们还可以创建一个map()
用于遍历每个元素 ( value
) 并执行添加操作的 UDF:
- // In Scala
- def addOne(values: Seq[Int]): Seq[Int] = {
- values.map(value => value + 1)
- }
- val plusOneInt = spark.udf.register("plusOneInt", addOne(_: Seq[Int]): Seq[Int])
然后我们可以在 Spark SQL 中使用这个 UDF,如下所示:
spark.sql("SELECT id, plusOneInt(values) AS values FROM table").show()
虽然这比使用更好,explode()
而且collect_list()
不会有任何排序问题,但序列化和反序列化过程本身可能很昂贵。然而,同样重要的是要注意,这collect_list()
可能会导致执行程序遇到大型数据集的内存不足问题,而使用 UDF 可以缓解这些问题。
除了使用这些可能很昂贵的技术外,您还可以使用一些内置函数来处理 Apache Spark 2.4 及更高版本中包含的复杂数据类型。一些比较常见的列在表 5-3(数组类型)和表 5-4(映射类型)中。
功能说明 | 询问 | 输出 |
---|---|---|
array_distinct(array 删除数组中的重复项 | SELECT array_distinct(array(1, 2, 3, null, 3)); | [1,2,3,null] |
array_intersect(array 返回没有重复的两个数组的交集 | SELECT array_intersect(array(1, 2, 3), array(1, 3, 5)); | [1,3] |
array_union(array 返回没有重复的两个数组的并集 | SELECT array_union(array(1, 2, 3), array(1, 3, 5)); | [1,2,3,5] |
array_except(array array1 返回 in但不在 in中的 元素array2 ,不重复 | SELECT array_except(array(1, 2, 3), array(1, 3, 5)); | [2] |
array_join(array 使用分隔符连接数组的元素 | SELECT array_join(array('hello', 'world'), ' '); | hello world |
array_max(array 返回数组中的最大值; null 元素被跳过 | SELECT array_max(array(1, 20, null, 3)); | 20 |
array_min(array 返回数组中的最小值; null 元素被跳过 | SELECT array_min(array(1, 20, null, 3)); | 1 |
array_position(array 将给定数组的第一个元素的(从 1 开始的)索引返回为 Long | SELECT array_position(array(3, 2, 1), 1); | 3 |
array_remove(array 从给定数组中删除所有等于给定元素的元素 | SELECT array_remove(array(1, 2, 3, null, 3), 3); | [1,2,null] |
arrays_overlap(array true 如果array1 包含至少一个null 也存在于中的非元素,则 返回array2 | SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)); | true |
array_sort(array 按升序对输入数组进行排序,空元素放置在数组的末尾 | SELECT array_sort(array('b', 'd', null, 'c', 'a')); | ["a","b","c","d",null] |
concat(array 连接字符串、二进制文件、数组等。 | SELECT concat(array(1, 2, 3), array(4, 5), array(6)); | [1,2,3,4,5,6] |
flatten(array 将数组数组展平为单个数组 | SELECT flatten(array(array(1, 2), array(3, 4))); | [1,2,3,4] |
array_repeat(T, Int): array 以指定次数返回包含指定元素的数组 | SELECT array_repeat('123', 3); | ["123","123","123"] |
reverse(array 返回一个反转的字符串或一个元素顺序相反的数组 | SELECT reverse(array(2, 1, 4, 3)); | [3,4,1,2] |
sequence(T, T[, T]): array 通过增量步骤生成从开始到停止(包括)的元素数组 | SELECT sequence(1, 5); SELECT sequence(5, 1); SELECT sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month); | [1,2,3,4,5] [5,4,3,2,1] ["2018-01-01", "2018-02-01", "2018-03-01"] |
shuffle(array 返回给定数组的随机排列 | SELECT shuffle(array(1, 20, null, 3)); | [null,3,20,1] |
slice(array 返回从给定索引开始的给定数组的子集(如果索引为负数,则从末尾开始计数),具有指定长度 | SELECT slice(array(1, 2, 3, 4), -2, 2); | [3,4] |
array_zip(array 返回结构的合并数组 | SELECT arrays_zip(array(1, 2), array(2, 3), array(3, 4)); | [{"0":1,"1":2,"2":3},{"0":2,"1":3,"2":4}] |
element_at(array 返回给定数组在给定(从 1 开始)索引处的元素 | SELECT element_at(array(1, 2, 3), 2); | 2 |
cardinality(array 的别名 size ;返回给定数组或映射的大小 | SELECT cardinality(array('b', 'd', 'c', 'a')); | 4 |
功能说明 | 询问 | 输出 |
---|---|---|
map_form_arrays(array 从给定的键/值数组对创建一个映射;键中的元素不应该是 null | SELECT map_from_arrays(array(1.0, 3.0), array('2', '4')); | {"1.0":"2", "3.0":"4"} |
map_from_entries(array 返回从给定数组创建的地图 | SELECT map_from_entries(array(struct(1, 'a'), struct(2, 'b'))); | {"1":"a", "2":"b"} |
map_concat(map 返回输入映射的并集 | SELECT map_concat(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); | {"1":"a", "2":"c","3":"d"} |
element_at(map 返回给定键的值,或者 null 如果键不包含在映射中 | SELECT element_at(map(1, 'a', 2, 'b'), 2); | b |
cardinality(array 的别名 size ;返回给定数组或映射的大小 | SELECT cardinality(map(1, 'a', 2, 'b')); | 2 |
除了前面提到的内置函数之外,还有以匿名 lambda 函数作为参数的高阶函数。高阶函数的示例如下:
- -- In SQL
- transform(values, value -> lambda expression)
该transform()
函数将一个数组 ( values
) 和匿名函数 (lambda
表达式) 作为输入。该函数通过对每个元素应用匿名函数透明地创建一个新数组,然后将结果分配给输出数组(类似于 UDF 方法,但更有效)。
让我们创建一个示例数据集,以便我们可以运行一些示例:
- # In Python
- from pyspark.sql.types import *
- schema = StructType([StructField("celsius", ArrayType(IntegerType()))])
-
- t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
- t_c = spark.createDataFrame(t_list, schema)
- t_c.createOrReplaceTempView("tC")
-
- # Show the DataFrame
- t_c.show()
- // In Scala
- // Create DataFrame with two rows of two arrays (tempc1, tempc2)
- val t1 = Array(35, 36, 32, 30, 40, 42, 38)
- val t2 = Array(31, 32, 34, 55, 56)
- val tC = Seq(t1, t2).toDF("celsius")
- tC.createOrReplaceTempView("tC")
-
- // Show the DataFrame
- tC.show()
这是输出:
- +--------------------+
- | celsius|
- +--------------------+
- |[35, 36, 32, 30, ...|
- |[31, 32, 34, 55, 56]|
- +--------------------+
使用前面的 DataFrame,您可以运行以下高阶函数查询。
transform(array<T>, function<T, U>): array<U>
该transform()
函数通过对输入数组的每个元素应用一个函数来生成一个数组(类似于map()
函数):
- // In Scala/Python
- // Calculate Fahrenheit from Celsius for an array of temperatures
- spark.sql("""
- SELECT celsius,
- transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
- FROM tC
- """).show()
-
- +--------------------+--------------------+
- | celsius| fahrenheit|
- +--------------------+--------------------+
- |[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
- |[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
- +--------------------+--------------------+
filter(array<T>, function<T, Boolean>): array<T>
该filter()
函数生成一个数组,该数组仅由布尔函数为的输入数组的元素组成true
:
- // In Scala/Python
- // Filter temperatures > 38C for array of temperatures
- spark.sql("""
- SELECT celsius,
- filter(celsius, t -> t > 38) as high
- FROM tC
- """).show()
-
- +--------------------+--------+
- | celsius| high|
- +--------------------+--------+
- |[35, 36, 32, 30, ...|[40, 42]|
- |[31, 32, 34, 55, 56]|[55, 56]|
- +--------------------+--------+
exists(array<T>, function<T, V, Boolean>): Boolean
如果布尔函数适用于输入数组中的任何元素,则该exists()
函数返回:true
- // In Scala/Python
- // Is there a temperature of 38C in the array of temperatures
- spark.sql("""
- SELECT celsius,
- exists(celsius, t -> t = 38) as threshold
- FROM tC
- """).show()
-
- +--------------------+---------+
- | celsius|threshold|
- +--------------------+---------+
- |[35, 36, 32, 30, ...| true|
- |[31, 32, 34, 55, 56]| false|
- +--------------------+---------+
reduce(array<T>, B, function<B, T, B>, function<B, R>)
该函数通过将元素合并到缓冲区中来reduce()
将数组的元素减少为单个值,并在最终缓冲区上应用整理B
function
function
:
- // In Scala/Python
- // Calculate average temperature and convert to F
- spark.sql("""
- SELECT celsius,
- reduce(
- celsius,
- 0,
- (t, acc) -> t + acc,
- acc -> (acc div size(celsius) * 9 div 5) + 32
- ) as avgFahrenheit
- FROM tC
- """).show()
-
- +--------------------+-------------+
- | celsius|avgFahrenheit|
- +--------------------+-------------+
- |[35, 36, 32, 30, ...| 96|
- |[31, 32, 34, 55, 56]| 105|
- +--------------------+-------------+
Spark SQL 的部分强大功能来自它支持的广泛的 DataFrame 操作(也称为无类型数据集操作)。操作列表非常广泛,包括:
聚合函数
集合函数
日期时间函数
数学函数
杂项功能
非聚合函数
排序功能
字符串函数
UDF 函数
窗口函数
有关完整列表,请参阅Spark SQL 文档。
在本章中,我们将重点关注以下常见的关系操作:
Unions and joins
Windowing
Modifications
要执行这些 DataFrame 操作,我们将首先准备一些数据。在下面的代码片段中,我们:
导入两个文件并创建两个 DataFrame,一个用于机场 ( airports
) 信息,一个用于美国航班延误 ( departureDelays
)。
创建一个较小的表,foo
我们可以专注于我们的演示示例;它仅包含从西雅图 (SEA) 到旧金山 (SFO) 目的地的三个航班的一小段时间范围内的信息。
让我们开始吧:
- // In Scala
- import org.apache.spark.sql.functions._
-
- // Set file paths
- val delaysPath =
- "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
- val airportsPath =
- "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
-
- // Obtain airports data set
- val airports = spark.read
- .option("header", "true")
- .option("inferschema", "true")
- .option("delimiter", "\t")
- .csv(airportsPath)
- airports.createOrReplaceTempView("airports")
-
- // Obtain departure Delays data set
- val delays = spark.read
- .option("header","true")
- .csv(delaysPath)
- .withColumn("delay", expr("CAST(delay as INT) as delay"))
- .withColumn("distance", expr("CAST(distance as INT) as distance"))
- delays.createOrReplaceTempView("departureDelays")
-
- // Create temporary small table
- val foo = delays.filter(
- expr("""origin == 'SEA' AND destination == 'SFO' AND
- date like '01010%' AND delay > 0"""))
- foo.createOrReplaceTempView("foo")
- # In Python
- # Set file paths
- from pyspark.sql.functions import expr
- tripdelaysFilePath =
- "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
- airportsFilePath =
- "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
-
- # Obtain airports data set
- airports = (spark.read
- .format("csv")
- .options(header="true", inferSchema="true", sep="\t")
- .load(airportsFilePath))
-
- airports.createOrReplaceTempView("airports")
-
- # Obtain departure delays data set
- departureDelays = (spark.read
- .format("csv")
- .options(header="true")
- .load(tripdelaysFilePath))
-
- departureDelays = (departureDelays
- .withColumn("delay", expr("CAST(delay as INT) as delay"))
- .withColumn("distance", expr("CAST(distance as INT) as distance")))
-
- departureDelays.createOrReplaceTempView("departureDelays")
-
- # Create temporary small table
- foo = (departureDelays
- .filter(expr("""origin == 'SEA' and destination == 'SFO' and
- date like '01010%' and delay > 0""")))
- foo.createOrReplaceTempView("foo")
departureDelays
DataFrame 包含超过 130 万次航班的数据,而DataFramefoo
仅包含三行,其中包含特定时间范围内从 SEA 到 SFO 的航班信息,如下面的输出所示:
- // Scala/Python
- spark.sql("SELECT * FROM airports LIMIT 10").show()
-
- +-----------+-----+-------+----+
- | City|State|Country|IATA|
- +-----------+-----+-------+----+
- | Abbotsford| BC| Canada| YXX|
- | Aberdeen| SD| USA| ABR|
- | Abilene| TX| USA| ABI|
- | Akron| OH| USA| CAK|
- | Alamosa| CO| USA| ALS|
- | Albany| GA| USA| ABY|
- | Albany| NY| USA| ALB|
- |Albuquerque| NM| USA| ABQ|
- | Alexandria| LA| USA| AEX|
- | Allentown| PA| USA| ABE|
- +-----------+-----+-------+----+
-
- spark.sql("SELECT * FROM departureDelays LIMIT 10").show()
-
- +--------+-----+--------+------+-----------+
- | date|delay|distance|origin|destination|
- +--------+-----+--------+------+-----------+
- |01011245| 6| 602| ABE| ATL|
- |01020600| -8| 369| ABE| DTW|
- |01021245| -2| 602| ABE| ATL|
- |01020605| -4| 602| ABE| ATL|
- |01031245| -4| 602| ABE| ATL|
- |01030605| 0| 602| ABE| ATL|
- |01041243| 10| 602| ABE| ATL|
- |01040605| 28| 602| ABE| ATL|
- |01051245| 88| 602| ABE| ATL|
- |01050605| 9| 602| ABE| ATL|
- +--------+-----+--------+------+-----------+
-
- spark.sql("SELECT * FROM foo").show()
-
- +--------+-----+--------+------+-----------+
- | date|delay|distance|origin|destination|
- +--------+-----+--------+------+-----------+
- |01010710| 31| 590| SEA| SFO|
- |01010955| 104| 590| SEA| SFO|
- |01010730| 5| 590| SEA| SFO|
- +--------+-----+--------+------+-----------+
在接下来的部分中,我们将使用这些数据执行联合、连接和开窗示例。
Apache Spark 中的一个常见模式是将具有相同架构的两个不同 DataFrame 联合在一起。这可以使用以下union()
方法来实现:
- // Scala
- // Union two tables
- val bar = delays.union(foo)
- bar.createOrReplaceTempView("bar")
- bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
- AND date LIKE '01010%' AND delay > 0""")).show()
- # In Python
- # Union two tables
- bar = departureDelays.union(foo)
- bar.createOrReplaceTempView("bar")
-
- # Show the union (filtering for SEA and SFO in a specific time range)
- bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
- AND date LIKE '01010%' AND delay > 0""")).show()
bar
DataFrame 是foo
with的并集delays
。在 DataFrame 中使用相同的过滤标准bar
,我们看到了重复的foo
数据,正如预期的那样:
- -- In SQL
- spark.sql("""
- SELECT *
- FROM bar
- WHERE origin = 'SEA'
- AND destination = 'SFO'
- AND date LIKE '01010%'
- AND delay > 0
- """).show()
-
- +--------+-----+--------+------+-----------+
- | date|delay|distance|origin|destination|
- +--------+-----+--------+------+-----------+
- |01010710| 31| 590| SEA| SFO|
- |01010955| 104| 590| SEA| SFO|
- |01010730| 5| 590| SEA| SFO|
- |01010710| 31| 590| SEA| SFO|
- |01010955| 104| 590| SEA| SFO|
- |01010730| 5| 590| SEA| SFO|
- +--------+-----+--------+------+-----------+
一个常见的 DataFrame 操作是将两个 DataFrame(或表)连接在一起。默认情况下,Spark SQL 连接是inner join
,选项为inner
, cross
, outer
, full
, full_outer
, left
, left_outer
, right
, right_outer
, left_semi
, 和left_anti
. 文档中提供了更多信息(这适用于 Scala 和 Python)。
以下代码示例执行和DataFrameinner
之间的默认连接:airports
foo
- // In Scala
- foo.join(
- airports.as('air),
- $"air.IATA" === $"origin"
- ).select("City", "State", "date", "delay", "distance", "destination").show()
- # In Python
- # Join departure delays data (foo) with airport info
- foo.join(
- airports,
- airports.IATA == foo.origin
- ).select("City", "State", "date", "delay", "distance", "destination").show()
- -- In SQL
- spark.sql("""
- SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination
- FROM foo f
- JOIN airports a
- ON a.IATA = f.origin
- """).show()
上述代码允许您从 DataFrame 中查看日期、延迟、距离和目的地信息,并从foo
DataFrame 中加入城市和州信息airports
:
- +-------+-----+--------+-----+--------+-----------+
- | City|State| date|delay|distance|destination|
- +-------+-----+--------+-----+--------+-----------+
- |Seattle| WA|01010710| 31| 590| SFO|
- |Seattle| WA|01010955| 104| 590| SFO|
- |Seattle| WA|01010730| 5| 590| SFO|
- +-------+-----+--------+-----+--------+-----------+
窗口函数使用窗口中的行(输入行的范围)中的值来返回一组值,通常以另一行的形式。使用窗口函数,可以对一组行进行操作,同时仍为每个输入行返回一个值。在本节中,我们将展示如何使用dense_rank()
窗口函数;还有许多其他功能,如表 5-5 所示。
SQL | 数据帧 API | |
---|---|---|
排名功能 | rank() | rank() |
dense_rank() | denseRank() | |
percent_rank() | percentRank() | |
ntile() | ntile() | |
row_number() | rowNumber() | |
分析函数 | cume_dist() | cumeDist() |
first_value() | firstValue() | |
last_value() | lastValue() | |
lag() | lag() | |
lead() | lead() |
让我们先回顾一下从西雅图 (SEA)、旧金山 (SFO) 和纽约市 (JFK) 出发并前往一组特定目的地位置的航班所经历的TotalDelays
(由 计算),如以下查询中所述sum(Delay)
:
- -- In SQL
- DROP TABLE IF EXISTS departureDelaysWindow;
-
- CREATE TABLE departureDelaysWindow AS
- SELECT origin, destination, SUM(delay) AS TotalDelays
- FROM departureDelays
- WHERE origin IN ('SEA', 'SFO', 'JFK')
- AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
- GROUP BY origin, destination;
-
- SELECT * FROM departureDelaysWindow
-
- +------+-----------+-----------+
- |origin|destination|TotalDelays|
- +------+-----------+-----------+
- | JFK| ORD| 5608|
- | SEA| LAX| 9359|
- | JFK| SFO| 35619|
- | SFO| ORD| 27412|
- | JFK| DEN| 4315|
- | SFO| DEN| 18688|
- | SFO| SEA| 17080|
- | SEA| SFO| 22293|
- | JFK| ATL| 12141|
- | SFO| ATL| 5091|
- | SEA| DEN| 13645|
- | SEA| ATL| 4535|
- | SEA| ORD| 10041|
- | JFK| SEA| 7856|
- | JFK| LAX| 35755|
- | SFO| JFK| 24100|
- | SFO| LAX| 40798|
- | SEA| JFK| 4667|
- +------+-----------+-----------+
如果您想为每个始发机场找出延误最多的三个目的地,该怎么办?您可以通过为每个来源运行三个不同的查询然后将结果合并在一起来实现这一点,如下所示:
- -- In SQL
- SELECT origin, destination, sum(TotalDelays) as sumTotalDelays
- FROM departureDelaysWindow
- WHERE origin = 'SEA'
- GROUP BY origin, destination
- ORDER BY sumTotalDelays DESC
- LIMIT 3
其中[ORIGIN]
是 、 和 的三个不同的JFK
原点SEA
值SFO
。
但更好的方法是使用窗口函数dense_rank()
来执行以下计算:
- -- In SQL
- spark.sql("""
- SELECT origin, destination, TotalDelays, rank
- FROM (
- SELECT origin, destination, TotalDelays, dense_rank()
- OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
- FROM departureDelaysWindow
- ) t
- WHERE rank <= 3
- """).show()
-
- +------+-----------+-----------+----+
- |origin|destination|TotalDelays|rank|
- +------+-----------+-----------+----+
- | SEA| SFO| 22293| 1|
- | SEA| DEN| 13645| 2|
- | SEA| ORD| 10041| 3|
- | SFO| LAX| 40798| 1|
- | SFO| ORD| 27412| 2|
- | SFO| JFK| 24100| 3|
- | JFK| LAX| 35755| 1|
- | JFK| SFO| 35619| 2|
- | JFK| ATL| 12141| 3|
- +------+-----------+-----------+----+
通过使用dense_rank()
窗口函数,我们可以快速确定三个始发城市延误最严重的目的地是:
西雅图 (SEA):旧金山 (SFO)、丹佛 (DEN) 和芝加哥 (ORD)
旧金山 (SFO):洛杉矶 (LAX)、芝加哥 (ORD) 和纽约 (JFK)
纽约 (JFK):洛杉矶 (LAX)、旧金山 (SFO) 和亚特兰大 (ATL)
需要注意的是,每个窗口分组都需要适合单个执行程序,并且在执行期间将组合成单个分区。因此,您需要确保您的查询不是无界的(即限制您的窗口大小).
另一个常见的操作是对DataFrame进行修改。虽然 DataFrame 本身是不可变的,但您可以通过创建新的、不同的 DataFrame、具有不同列的操作来修改它们。(回想一下前面的章节,底层的 RDD 是不可变的——即它们不能被改变——以确保 Spark 操作存在数据沿袭。)让我们从之前的小型 DataFrame 示例开始:
- // In Scala/Python
- foo.show()
-
- --------+-----+--------+------+-----------+
- | date|delay|distance|origin|destination|
- +--------+-----+--------+------+-----------+
- |01010710| 31| 590| SEA| SFO|
- |01010955| 104| 590| SEA| SFO|
- |01010730| 5| 590| SEA| SFO|
- +--------+-----+--------+------+-----------+
要向foo
DataFrame 添加新列,请使用以下withColumn()
方法:
- // In Scala
- import org.apache.spark.sql.functions.expr
- val foo2 = foo.withColumn(
- "status",
- expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
- )
- # In Python
- from pyspark.sql.functions import expr
- foo2 = (foo.withColumn(
- "status",
- expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
- ))
新创建foo2
的 DataFrame 具有原始foo
DataFrame 的内容加上语句定义的附加status
列CASE
:
- // In Scala/Python
- foo2.show()
-
- +--------+-----+--------+------+-----------+-------+
- | date|delay|distance|origin|destination| status|
- +--------+-----+--------+------+-----------+-------+
- |01010710| 31| 590| SEA| SFO|Delayed|
- |01010955| 104| 590| SEA| SFO|Delayed|
- |01010730| 5| 590| SEA| SFO|On-time|
- +--------+-----+--------+------+-----------+-------+
要删除一列,请使用该drop()
方法。例如,让我们删除该delay
列,因为我们现在有一个status
列,在上一节中添加:
- // In Scala
- val foo3 = foo2.drop("delay")
- foo3.show()
- # In Python
- foo3 = foo2.drop("delay")
- foo3.show()
-
- +--------+--------+------+-----------+-------+
- | date|distance|origin|destination| status|
- +--------+--------+------+-----------+-------+
- |01010710| 590| SEA| SFO|Delayed|
- |01010955| 590| SEA| SFO|Delayed|
- |01010730| 590| SEA| SFO|On-time|
- +--------+--------+------+-----------+-------+
您可以使用以下方法重命名列withColumnRenamed()
:
- // In Scala
- val foo4 = foo3.withColumnRenamed("status", "flight_status")
- foo4.show()
- # In Python
- foo4 = foo3.withColumnRenamed("status", "flight_status")
- foo4.show()
-
- +--------+--------+------+-----------+-------------+
- | date|distance|origin|destination|flight_status|
- +--------+--------+------+-----------+-------------+
- |01010710| 590| SEA| SFO| Delayed|
- |01010955| 590| SEA| SFO| Delayed|
- |01010730| 590| SEA| SFO| On-time|
- +--------+--------+------+-----------+-------------+
在处理您的数据时,有时您需要将列换成行——即,透视您的数据。让我们抓取一些数据来演示这个概念:
- -- In SQL
- SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
- FROM departureDelays
- WHERE origin = 'SEA'
-
- +-----------+-----+-----+
- |destination|month|delay|
- +-----------+-----+-----+
- | ORD| 1| 92|
- | JFK| 1| -7|
- | DFW| 1| -5|
- | MIA| 1| -3|
- | DFW| 1| -3|
- | DFW| 1| 1|
- | ORD| 1| -10|
- | DFW| 1| -6|
- | DFW| 1| -2|
- | ORD| 1| -3|
- +-----------+-----+-----+
- only showing top 10 rowsspan>
透视允许您将名称放在month
列中(而不是1
和2
您可以分别显示Jan
和Feb
)以及按目的地和月份对延迟执行聚合计算(在本例中为平均值和最大值):
- -- In SQL
- SELECT * FROM (
- SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
- FROM departureDelays WHERE origin = 'SEA'
- )
- PIVOT (
- CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
- FOR month IN (1 JAN, 2 FEB)
- )
- ORDER BY destination
-
- +-----------+------------+------------+------------+------------+
- |destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
- +-----------+------------+------------+------------+------------+
- | ABQ| 19.86| 316| 11.42| 69|
- | ANC| 4.44| 149| 7.90| 141|
- | ATL| 11.98| 397| 7.73| 145|
- | AUS| 3.48| 50| -0.21| 18|
- | BOS| 7.84| 110| 14.58| 152|
- | BUR| -2.03| 56| -1.89| 78|
- | CLE| 16.00| 27| null| null|
- | CLT| 2.53| 41| 12.96| 228|
- | COS| 5.32| 82| 12.18| 203|
- | CVG| -0.50| 4| null| null|
- | DCA| -1.15| 50| 0.07| 34|
- | DEN| 13.13| 425| 12.95| 625|
- | DFW| 7.95| 247| 12.57| 356|
- | DTW| 9.18| 107| 3.47| 77|
- | EWR| 9.63| 236| 5.20| 212|
- | FAI| 1.84| 160| 4.21| 60|
- | FAT| 1.36| 119| 5.22| 232|
- | FLL| 2.94| 54| 3.50| 40|
- | GEG| 2.28| 63| 2.87| 60|
- | HDN| -0.44| 27| -6.50| 0|
- +-----------+------------+------------+------------+------------+
- only showing top 20 rows
本章探讨了 Spark SQL 如何与外部组件交互。我们讨论了创建用户定义的函数,包括 Pandas UDF,并提供了一些用于执行 Spark SQL 查询的选项(包括 Spark SQL shell、Beeline 和 Tableau)。然后,我们提供了如何使用 Spark SQL 连接各种外部数据源的示例,例如 SQL 数据库、PostgreSQL、MySQL、Tableau、Azure Cosmos DB、MS SQL Server 等。
我们探索了 Spark 针对复杂数据类型的内置函数,并给出了一些使用高阶函数的示例。最后,我们讨论了一些常见的关系运算符,并展示了如何执行选择的 DataFrame 操作。