• 【Apache Spark 】第 5 章Spark SQL 和 DataFrames:与外部数据源交互


     🔎大家好,我是Sonhhxg_柒,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流🔎

    📝个人主页-Sonhhxg_柒的博客_CSDN博客 📃

    🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝​

    📣系列专栏 - 机器学习【ML】 自然语言处理【NLP】  深度学习【DL】

     🖍foreword

    ✔说明⇢本人讲解主要包括Python、机器学习(ML)、深度学习(DL)、自然语言处理(NLP)等内容。

    如果你对这个系列感兴趣的话,可以关注订阅哟👋

    文章目录

    Spark SQL 和 Apache Hive

    用户定义的函数

    Spark SQL UDF

    Spark SQL 中的求值顺序和空值检查

    使用 Pandas UDF 加速和分发 PySpark UDF

    使用 Spark SQL Shell、Beeline 和 Tableau 进行查询

    使用 Spark SQL Shell

    创建表

    向表中插入数据

    运行 Spark SQL 查询

    Working with Beeline

    启动 Thrift 服务器

    通过 Beeline 连接到 Thrift 服务器

    使用 Beeline 执行 Spark SQL 查询

    停止 Thrift 服务器

    使用 Tableau

    启动 Thrift 服务器

    启动 Tableau

    停止 Thrift 服务器

    外部数据源

    JDBC 和 SQL 数据库

    分区的重要性

    PostgreSQL

    MySQL

    Azure Cosmos 数据库

    微软 SQL 服务器

    其他外部来源

    DataFrames 和 Spark SQL 中的高阶函数

    选项 1:爆炸和收集

    选项 2:用户定义的函数

    复杂数据类型的内置函数

    高阶函数

    transform()

    filter()

    exists()

    reduce()

    通用数据帧和 Spark SQL 操作

    Unions

    Joins

    Windowing

    Modifications

    Adding new columns

    Dropping columns

    Renaming columns

    Pivoting(旋转)

    概括


    在上一章中,我们探讨了与 Spark 中内置数据源的交互。我们还仔细研究了 DataFrame API 及其与 Spark SQL 的互操作性。在本章中,我们将关注 Spark SQL 如何与外部组件交互。具体来说,我们将讨论 Spark SQL 如何让您:

    • 为 Apache Hive 和 Apache Spark 使用用户定义的函数。

    • 连接外部数据源,例如 JDBC 和 SQL 数据库、PostgreSQL、MySQL、Tableau、Azure Cosmos DB 和 MS SQL Server。

    • 使用简单和复杂的类型、高阶函数和常见的关系运算符。

    我们还将介绍一些使用 Spark SQL 查询 Spark 的不同选项,例如 Spark SQL shell、Beeline 和 Tableau。

    Spark SQL 和 Apache Hive

    Spark SQL 是 Apache Spark 的一个基础组件,它将关系处理与 Spark 的函数式编程 API 集成在一起。它的起源是在之前对鲨鱼的研究中Shark 最初是在 Apache Spark 1之上的 Hive 代码库上构建的,并成为 Hadoop 系统上第一个交互式 SQL 查询引擎之一。它证明了两全其美是可能的;与企业数据仓库一样快,可扩展性以及 Hive/MapReduce。

    Spark SQL 让 Spark 程序员可以利用更快的性能和关系编程(例如,声明式查询和优化的存储)以及调用复杂的分析库(例如,机器学习)的优势。如前一章所述,从 Apache Spark 2.x 开始,SparkSparkSession提供了一个统一的入口点来操作数据。

    用户定义的函数

    虽然 Apache Spark 有大量的内置函数,但 Spark 的灵活性允许数据工程师和数据科学家也可以定义自己的函数。这些被称为用户定义函数(UDF)。

    Spark SQL UDF

    创建您自己的 PySpark 或 Scala UDF 的好处是您(和其他人)将能够在 Spark SQL 本身中使用它们。例如,数据科学家可以将 ML 模型包装在 UDF 中,以便数据分析师可以在 Spark SQL 中查询其预测,而不必了解模型的内部结构。

    下面是创建 Spark SQL UDF 的简化示例。请注意,UDF 在每个会话中运行,它们不会保留在底层元存储中:

    1. // In Scala
    2. // Create cubed function
    3. val cubed = (s: Long) => {
    4. s * s * s
    5. }
    6. // Register UDF
    7. spark.udf.register("cubed", cubed)
    8. // Create temporary view
    9. spark.range(1, 9).createOrReplaceTempView("udf_test")
    1. # In Python
    2. from pyspark.sql.types import LongType
    3. # Create cubed function
    4. def cubed(s):
    5. return s * s * s
    6. # Register UDF
    7. spark.udf.register("cubed", cubed, LongType())
    8. # Generate temporary view
    9. spark.range(1, 9).createOrReplaceTempView("udf_test")

    您现在可以使用 Spark SQL 执行以下任一cubed()功能:

    1. // In Scala/Python
    2. // Query the cubed UDF
    3. spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()
    4. +---+--------+
    5. | id|id_cubed|
    6. +---+--------+
    7. | 1| 1|
    8. | 2| 8|
    9. | 3| 27|
    10. | 4| 64|
    11. | 5| 125|
    12. | 6| 216|
    13. | 7| 343|
    14. | 8| 512|
    15. +---+--------+

    Spark SQL 中的求值顺序和空值检查

    Spark SQL(包括 SQL、DataFrame API 和 Dataset API)不保证子表达式的求值顺序。例如,以下查询不保证子句在s is NOT NULL子句之前执行strlen(s) > 1

    spark.sql("SELECT s FROM test1 WHERE s IS NOT NULL AND strlen(s) > 1")

    因此,要执行正确的null检查,建议您执行以下操作:

    1. 使 UDF 本身具有null感知能力并null在 UDF 内部进行检查。

    2. 使用IForCASE WHEN表达式进行null检查并在条件分支中调用 UDF。

    使用 Pandas UDF 加速和分发 PySpark UDF

    以前使用 PySpark UDF 的一个普遍问题是它们的性能比 Scala UDF 慢。这是因为 PySpark UDF 需要在 JVM 和 Python 之间移动数据,这非常昂贵。为了解决这个问题,Pandas UDF(也称为矢量化 UDF)作为 Apache Spark 2.3 的一部分被引入。Pandas UDF 使用 Apache Arrow 传输数据,使用 Pandas 处理数据。pandas_udf您可以使用关键字作为装饰器来定义 Pandas UDF ,或者包装函数本身。一旦数据采用Apache Arrow 格式,不再需要序列化/腌制数据,因为它已经是 Python 进程可以使用的格式。您不是在逐行操作单个输入,而是在 Pandas Series 或 DataFrame 上操作(即矢量化执行)。

    从带有 Python 3.6 及更高版本的 Apache Spark 3.0 开始,Pandas UDF 分为两个 API 类别:Pandas UDF 和 Pandas Function API。

    Pandas UDF

    在 Apache Spark 3.0 中,Pandas UDF 从 Pandas UDF 中的 Python 类型提示(例如pandas.Seriespandas.DataFrameTupleIterator. 以前,您需要手动定义和指定每个 Pandas UDF 类型。目前,Pandas UDF 中 Python 类型提示支持的案例有:Series to Series、Iterator of Series to Iterator of Series、Iterator of Multiple Series to Iterator of Series、Series to Scalar(单值)。

    Pandas 函数 API

    Pandas 函数 API 允许您将本地 Python 函数直接应用于 PySpark DataFrame,其中输入和输出都是 Pandas 实例。对于 Spark 3.0,支持的 Pandas 函数 API 是 grouped map、map、co-grouped map。

    有关更多信息,请参阅第 12 章中的“使用 Python 类型提示重新设计的 Pandas UDF”

    以下是 Spark 3.0 的标量 Pandas UDF 示例:2# In Python # Import pandas import pandas as

    1. # In Python
    2. # Import pandas
    3. import pandas as pd
    4. # Import various pyspark SQL functions including pandas_udf
    5. from pyspark.sql.functions import col, pandas_udf
    6. from pyspark.sql.types import LongType
    7. # Declare the cubed function
    8. def cubed(a: pd.Series) -> pd.Series:
    9. return a * a * a
    10. # Create the pandas UDF for the cubed function
    11. cubed_udf = pandas_udf(cubed, returnType=LongType())

    前面的代码片段声明了一个调用的函数,该函数cubed()执行立方操作。这是一个常规的 Pandas 函数,带有额外的cubed_udf = pandas_udf()调用来创建我们的 Pandas UDF。

    让我们从一个简单的 Pandas Series(定义为x)开始,然后应用局部函数cubed()进行立方计算:

    1. # Create a Pandas Series
    2. x = pd.Series([1, 2, 3])
    3. # The function for a pandas_udf executed with local Pandas data
    4. print(cubed(x))

    输出如下:

    0     1
    1     8
    2    27
    dtype: int64

    现在让我们切换到 Spark DataFrame。我们可以将此函数作为 Spark 矢量化 UDF 执行,如下所示:

    1. # Create a Spark DataFrame, 'spark' is an existing SparkSession
    2. df = spark.range(1, 4)
    3. # Execute function as a Spark vectorized UDF
    4. df.select("id", cubed_udf(col("id"))).show()

    这是输出:

    1. +---+---------+
    2. | id|cubed(id)|
    3. +---+---------+
    4. | 1| 1|
    5. | 2| 8|
    6. | 3| 27|
    7. +---+---------+

    与本地函数相反,使用矢量化 UDF 将导致 Spark 作业的执行;前面的本地函数是只在 Spark 驱动程序上执行的 Pandas 函数。在查看此pandas_udf功能的某个阶段的 Spark UI 时,这一点变得更加明显(图 5-1)。

    笔记

    如需更深入地了解 Pandas UDF,请参阅pandas 用户定义函数文档.

    图 5-1。用于在 Spark DataFrame 上执行 Pandas UDF 的 Spark UI 阶段

    与许多 Spark 作业一样,该作业首先parallelize()将本地数据(Arrow 二进制批处理)发送到执行程序,并调用mapPartitions()将 Arrow 二进制批处理转换为 Spark 的内部数据格式,然后将其分发给 Spark 工作人员。有许多WholeStageCodegen步骤代表了性能的基本提升(感谢 Project Tungsten 的全阶段代码生成,显着提高了 CPU 效率和性能)。但它是ArrowEvalPython标识(在这种情况下)正在执行 Pandas UDF 的步骤。

    使用 Spark SQL Shell、Beeline 和 Tableau 进行查询

    查询 Apache Spark 的机制有多种,包括 Spark SQL shell、Beeline CLI 实用程序以及 Tableau 和 Power BI 等报告工具。

    在本节中,我们包括 Tableau 的说明;对于 Power BI,请参阅文档

    使用 Spark SQL Shell

    执行 Spark SQL 查询的便捷工具是spark-sqlCLI。虽然此实用程序在本地模式下与 Hive 元存储服务通信,但它不与Thrift JDBC/ODBC 服务器(又名Spark Thrift 服务器STS)通信。STS 允许 JDBC/ODBC 客户端通过 Apache Spark 上的 JDBC 和 ODBC 协议执行 SQL 查询。

    要启动 Spark SQL CLI,请在文件夹中执行以下命令$SPARK_HOME

    ./bin/spark-sql

    启动 shell 后,您可以使用它以交互方式执行 Spark SQL 查询。让我们看几个例子。

    创建表

    要创建新的永久 Spark SQL 表,请执行以下语句:

    spark-sql> CREATE TABLE people (name STRING,age int);

    您的输出应该与此类似,注意 Spark SQL 表的创建people及其文件位置 ( /user/hive/warehouse/people):

    1. 20/01/11 22:42:16 WARN HiveMetaStore: Location: file:/user/hive/warehouse/people
    2. specified for non-external table:people
    3. Time taken: 0.63 seconds

    向表中插入数据

    您可以通过执行类似于以下内容的语句将数据插入到 Spark SQL 表中:

    INSERT INTO people SELECT name, age FROM ...

    由于您不依赖于从预先存在的表或文件中加载数据,因此您可以使用INSERT...VALUES语句将数据插入到表中。这三个语句将三个人(他们的姓名和年龄,如果知道的话)插入到people表中:

    1. spark-sql> INSERT INTO people VALUES ("Michael", NULL);
    2. Time taken: 1.696 seconds
    3. spark-sql> INSERT INTO people VALUES ("Andy", 30);
    4. Time taken: 0.744 seconds
    5. spark-sql> INSERT INTO people VALUES ("Samantha", 19);
    6. Time taken: 0.637 seconds
    7. spark-sql>

    运行 Spark SQL 查询

    现在您的表中有数据,您可以针对它运行 Spark SQL 查询。让我们首先查看我们的元存储中存在哪些表:

    1. spark-sql> SHOW TABLES;
    2. default people false
    3. Time taken: 0.016 seconds, Fetched 1 row(s)

    接下来,让我们看看我们表中有多少人年龄小于 20 岁:

    1. spark-sql> SELECT * FROM people WHERE age < 20;
    2. Samantha 19
    3. Time taken: 0.593 seconds, Fetched 1 row(s)

    同样,让我们​​看看没有指定年龄的人是谁:

    1. spark-sql> SELECT name FROM people WHERE age IS NULL;
    2. Michael
    3. Time taken: 0.272 seconds, Fetched 1 row(s)

    Working with Beeline

    如果您使用过 Apache Hive,您可能熟悉命令行工具Beeline,这是一种针对 HiveServer2 运行 HiveQL 查询的常用实用程序。Beeline 是一个基于SQLLine CLI的 JDBC 客户端。您可以使用相同的实用程序对 Spark Thrift 服务器执行 Spark SQL 查询。请注意,当前实现的 Thrift JDBC/ODBC 服务器对应于 Hive 1.2.1 中的 HiveServer2。您可以使用 Spark 或 Hive 1.2.1 附带的以下 Beeline 脚本来测试 JDBC 服务器。

    启动 Thrift 服务器

    要启动 Spark Thrift JDBC/ODBC 服务器,请从文件夹中执行以下命令$SPARK_HOME

    ./sbin/start-thriftserver.sh

    笔记

    如果您尚未启动 Spark 驱动程序和工作程序,请在之前执行以下命令start-thriftserver.sh

    ./sbin/start-all.sh

    通过 Beeline 连接到 Thrift 服务器

    要使用 Beeline 测试 Thrift JDBC/ODBC 服务器,请执行以下命令:

    ./bin/beeline

    然后配置 Beeline 连接到本地 Thrift 服务器:

    !connect jdbc:hive2://localhost:10000

    笔记

    默认情况下,直线处于非安全模式。因此,用户名是您的登录名(例如,user@learningspark.org),密码为空。

    使用 Beeline 执行 Spark SQL 查询

    从这里,您可以运行类似于使用 Beeline 运行 Hive 查询的 Spark SQL 查询。以下是一些示例查询及其输出:

    1. 0: jdbc:hive2://localhost:10000> SHOW tables;
    2. +-----------+------------+--------------+
    3. | database | tableName | isTemporary |
    4. +-----------+------------+--------------+
    5. | default | people | false |
    6. +-----------+------------+--------------+
    7. 1 row selected (0.417 seconds)
    8. 0: jdbc:hive2://localhost:10000> SELECT * FROM people;
    9. +-----------+-------+
    10. | name | age |
    11. +-----------+-------+
    12. | Samantha | 19 |
    13. | Andy | 30 |
    14. | Michael | NULL |
    15. +-----------+-------+
    16. 3 rows selected (1.512 seconds)
    17. 0: jdbc:hive2://localhost:10000>

    停止 Thrift 服务器

    完成后,您可以使用以下命令停止 Thrift 服务器:

    ./sbin/stop-thriftserver.sh

    使用 Tableau

    与通过 Beeline 或 Spark SQL CLI 运行查询类似,您可以通过 Thrift JDBC/ODBC 服务器将您最喜欢的 BI 工具连接到 Spark SQL。在本节中,我们将向您展示如何将 Tableau Desktop(版本 2019.2)连接到您的本地 Apache Spark 实例。

    笔记

    您需要已安装Tableau 的 Spark ODBC驱动程序版本 1.2.0 或更高版本。如果您已安装(或升级到)Tableau 2018.1 或更高版本,则应已预安装此驱动程序。

    启动 Thrift 服务器

    要启动 Spark Thrift JDBC/ODBC 服务器,请从文件夹中执行以下命令$SPARK_HOME

    ./sbin/start-thriftserver.sh

    笔记

    如果您尚未启动 Spark 驱动程序和工作程序,请在之前执行以下命令start-thriftserver.sh

    ./sbin/start-all.sh

    启动 Tableau

    如果您是第一次启动 Tableau,您会看到一个“连接”对话框,允许您连接到大量数据源。默认情况下,左侧的“To a Server”菜单中不会包含 Spark SQL 选项(见图 5-2)。

    图 5-2。Tableau 连接对话框

    要访问 Spark SQL 选项,请单击该列表底部的更多...,然后从主面板中显示的列表中选择 Spark SQL,如图 5-3所示。

    图 5-3。选择更多... > Spark SQL 以连接到 Spark SQL

    这将弹出 Spark SQL 对话框(图 5-4)。当您连接到本地 Apache Spark 实例时,您可以使用具有以下参数的非安全用户名身份验证模式:

    • 服务器:本地主机

    • 端口:10000(默认)

    • 类型:SparkThriftServer(默认)

    • 身份验证:用户名

    • 用户名:您的登录名,例如 user@learningspark.org

    • 需要 SSL:未选中

    图 5-4。Spark SQL 对话框

    成功连接到 Spark SQL 数据源后,您将看到类似于图 5-5的 Data Source Connections 视图。

    图 5-5。Tableau 数据源连接视图,连接到本地 Spark 实例

    从左侧的 Select Schema 下拉菜单中,选择“default”。然后输入要查询的表名(见图5-6)。请注意,您可以单击放大镜图标来获取可用表格的完整列表。

    图 5-6。选择要查询的架构和表

    笔记

    有关使用 Tableau 连接到 Spark SQL 数据库的详细信息,请参阅 Tableau 的Spark SQL 文档和 Databricks Tableau 文档

    输入people表格名称,然后将表格从左侧拖放到主对话框中(在标有“将表格拖到此处”的空间中)。您应该看到类似于图 5-7的内容。

    图 5-7。连接到本地 Spark 实例中的人员表

    单击立即更新,Tableau 将在后台查询您的 Spark SQL 数据源(图 5-8)。

    您现在可以对 Spark 数据源、联接表等执行查询,就像对任何其他 Tableau 数据源一样。

    图 5-8。查询本地 Spark 数据源的 Tableau 工作表表视图

    停止 Thrift 服务器

    完成后,您可以使用以下命令停止 Thrift 服务器:

    ./sbin/stop-thriftserver.sh

    外部数据源

    在本节中,我们将重点介绍如何使用 Spark SQL 连接外部数据源,从 JDBC 和 SQL 数据库开始。

    JDBC 和 SQL 数据库

    Spark SQL 包含一个数据源 API,可以使用JDBC从其他数据库读取数据。它简化了对这些数据源的查询,因为它将结果作为 DataFrame 返回,从而提供了 Spark SQL 的所有优点(包括性能和与其他数据源连接的能力)。

    首先,您需要为您的 JDBC 数据源指定 JDBC 驱动程序,并且它需要位于 Spark 类路径中。从$SPARK_HOME文件夹中,您将发出如下命令:

    ./bin/spark-shell --driver-class-path $database.jar --jars $database.jar

    使用数据源 API,可以将远程数据库中的表加载为 DataFrame 或 Spark SQL 临时视图。用户可以在数据源选项中指定 JDBC 连接属性。表 5-1包含 Spark 支持的一些更常见的连接属性(不区分大小写)。

    表 5-1。常用连接属性
    属性名称描述
    userpassword这些通常作为连接属性提供,用于登录数据源。
    urlJDBC 连接 URL,例如jdbc:postgresql://localhost/test?user=fred&password=secret.
    dbtable要读取或写入的 JDBC 表。您不能同时指定dbtablequery选项。
    query用于从 Apache Spark 读取数据的查询,例如SELECT column1, column2, ..., columnN FROM [table|subquery]. 您不能同时指定querydbtable选项。
    driver用于连接到指定 URL 的 JDBC 驱动程序的类名。

    有关连接属性的完整列表,请参阅Spark SQL 文档

    分区的重要性

    在 Spark SQL 和 JDBC 外部源之间传输大量数据时,对数据源进行分区非常重要。您的所有数据都通过一个驱动程序连接,这可能会使您的提取性能饱和并显着减慢,并可能使源系统的资源饱和。虽然这些 JDBC 属性是可选的,但对于任何大规模操作,强烈建议使用表 5-2中显示的属性.

    表 5-2。分区连接属性
    属性名称描述
    numPartitions表读写中可用于并行的最大分区数。这也决定了并发 JDBC 连接的最大数量。
    partitionColumn读取外部源时,partitionColumn是用于确定分区的列;注意,partitionColumn必须是数字、日期或时间戳列。
    lowerBound设置分区步幅的最小值partitionColumn
    upperBound设置分区步幅的最大值partitionColumn

    让我们看一个示例来帮助您了解这些属性是如何工作的。假设我们使用以下设置:

    • numPartitions10

    • lowerBound:0

    • upperBound:10000

    那么步幅等于1000,就会创建10个分区。这相当于执行这 10 个查询(每个分区一个):

    • SELECT * FROM table WHERE partitionColumn BETWEEN 0 and 1000

    • SELECT * FROM table WHERE partitionColumn BETWEEN 1000 and 2000

    • ...

    • SELECT * FROM table WHERE partitionColumn BETWEEN 9000 and 10000

    虽然并非包罗万象,但以下是使用这些属性时要牢记的一些提示:

    • 一个好的起点numPartitions是使用 Spark 工作人员数量的倍数。例如,如果您有四个 Spark 工作节点,那么可能从 4 或 8 个分区开始。但同样重要的是要注意您的源系统处理读取请求的能力。对于有处理窗口的系统,可以最大化对源系统的并发请求数;对于缺少处理窗口的系统(例如,OLTP 系统不断处理数据),您应该减少并发请求的数量以防止源系统饱和。

    • 最初,根据最小和最大实际值计算lowerBound和。例如,如果您选择,但所有值都介于和之间,则 10 个查询中只有 2 个(每个分区一个)将完成所有工作。在这种情况下,更好的配置是.upperBoundpartitionColumn {numPartitions:10, lowerBound: 0, upperBound: 10000}20004000{numPartitions:10, lowerBound: 0, upperBound: 4000}

    • 选择一个partitionColumn可以均匀分布的,避免数据倾斜。例如,如果您的大多数人partitionColumn具有值2500,则{numPartitions:10, lowerBound: 0, upperBound: 10000}大部分工作将由请求介于2000和之间的值的任务执行3000。相反,选择一个不同的partitionColumn,或者如果可能的话生成一个新的(可能是多列的哈希)来更均匀地分布你的分区.

    PostgreSQL

    要连接到 PostgreSQL 数据库,请从Maven构建或下载 JDBC jar并将其添加到您的类路径中。然后启动 Spark shell(spark-shellpyspark),指定该 jar:

    bin/spark-shell --jars postgresql-42.2.6.jar

    以下示例展示了如何在 Scala 中使用 Spark SQL 数据源 API 和 JDBC 从 PostgreSQL 数据库中加载和保存:

    1. // In Scala
    2. // Read Option 1: Loading data from a JDBC source using load method
    3. val jdbcDF1 = spark
    4. .read
    5. .format("jdbc")
    6. .option("url", "jdbc:postgresql:[DBSERVER]")
    7. .option("dbtable", "[SCHEMA].[TABLENAME]")
    8. .option("user", "[USERNAME]")
    9. .option("password", "[PASSWORD]")
    10. .load()
    11. // Read Option 2: Loading data from a JDBC source using jdbc method
    12. // Create connection properties
    13. import java.util.Properties
    14. val cxnProp = new Properties()
    15. cxnProp.put("user", "[USERNAME]")
    16. cxnProp.put("password", "[PASSWORD]")
    17. // Load data using the connection properties
    18. val jdbcDF2 = spark
    19. .read
    20. .jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]", cxnProp)
    21. // Write Option 1: Saving data to a JDBC source using save method
    22. jdbcDF1
    23. .write
    24. .format("jdbc")
    25. .option("url", "jdbc:postgresql:[DBSERVER]")
    26. .option("dbtable", "[SCHEMA].[TABLENAME]")
    27. .option("user", "[USERNAME]")
    28. .option("password", "[PASSWORD]")
    29. .save()
    30. // Write Option 2: Saving data to a JDBC source using jdbc method
    31. jdbcDF2.write
    32. .jdbc(s"jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]", cxnProp)

    以下是在 PySpark 中的操作方法:

    1. # In Python
    2. # Read Option 1: Loading data from a JDBC source using load method
    3. jdbcDF1 = (spark
    4. .read
    5. .format("jdbc")
    6. .option("url", "jdbc:postgresql://[DBSERVER]")
    7. .option("dbtable", "[SCHEMA].[TABLENAME]")
    8. .option("user", "[USERNAME]")
    9. .option("password", "[PASSWORD]")
    10. .load())
    11. # Read Option 2: Loading data from a JDBC source using jdbc method
    12. jdbcDF2 = (spark
    13. .read
    14. .jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",
    15. properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))
    16. # Write Option 1: Saving data to a JDBC source using save method
    17. (jdbcDF1
    18. .write
    19. .format("jdbc")
    20. .option("url", "jdbc:postgresql://[DBSERVER]")
    21. .option("dbtable", "[SCHEMA].[TABLENAME]")
    22. .option("user", "[USERNAME]")
    23. .option("password", "[PASSWORD]")
    24. .save())
    25. # Write Option 2: Saving data to a JDBC source using jdbc method
    26. (jdbcDF2
    27. .write
    28. .jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]",
    29. properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))

    MySQL

    要连接到 MySQL 数据库,请从MavenMySQL(后者更容易!)构建或下载 JDBC jar,并将其添加到您的类路径中。然后启动 Spark shell(spark-shellpyspark),指定该 jar:

    bin/spark-shell --jars mysql-connector-java_8.0.16-bin.jar

    以下示例展示了如何使用 Spark SQL 数据源 API 和 Scala 中的 JDBC 从 MySQL 数据库中加载数据并将其保存到数据库中:

    1. // In Scala
    2. // Loading data from a JDBC source using load
    3. val jdbcDF = spark
    4. .read
    5. .format("jdbc")
    6. .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
    7. .option("driver", "com.mysql.jdbc.Driver")
    8. .option("dbtable", "[TABLENAME]")
    9. .option("user", "[USERNAME]")
    10. .option("password", "[PASSWORD]")
    11. .load()
    12. // Saving data to a JDBC source using save
    13. jdbcDF
    14. .write
    15. .format("jdbc")
    16. .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
    17. .option("driver", "com.mysql.jdbc.Driver")
    18. .option("dbtable", "[TABLENAME]")
    19. .option("user", "[USERNAME]")
    20. .option("password", "[PASSWORD]")
    21. .save()

    下面是如何在 Python 中做到这一点:

    1. # In Python
    2. # Loading data from a JDBC source using load
    3. jdbcDF = (spark
    4. .read
    5. .format("jdbc")
    6. .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
    7. .option("driver", "com.mysql.jdbc.Driver")
    8. .option("dbtable", "[TABLENAME]")
    9. .option("user", "[USERNAME]")
    10. .option("password", "[PASSWORD]")
    11. .load())
    12. # Saving data to a JDBC source using save
    13. (jdbcDF
    14. .write
    15. .format("jdbc")
    16. .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
    17. .option("driver", "com.mysql.jdbc.Driver")
    18. .option("dbtable", "[TABLENAME]")
    19. .option("user", "[USERNAME]")
    20. .option("password", "[PASSWORD]")
    21. .save())

    Azure Cosmos 数据库

    若要连接到 Azure Cosmos DB 数据库,请从MavenGitHub构建或下载 JDBC jar并将其添加到您的类路径中。然后启动一个 Scala 或 PySpark shell,指定这个 jar(注意这个例子使用的是 Spark 2.4):

    bin/spark-shell --jars azure-cosmosdb-spark_2.4.0_2.11-1.3.5-uber.jar

    您还可以选择使用其 Maven 坐标从Spark 包--packages中拉出连接器:

    1. export PKG="com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5"
    2. bin/spark-shell --packages $PKG

    以下示例说明如何使用 Scala 和 PySpark 中的 Spark SQL 数据源 API 和 JDBC 从 Azure Cosmos DB 数据库加载数据并将其保存到。请注意,通常使用query_custom配置来利用 Cosmos DB 中的各种索引:

    1. // In Scala
    2. // Import necessary libraries
    3. import com.microsoft.azure.cosmosdb.spark.schema._
    4. import com.microsoft.azure.cosmosdb.spark._
    5. import com.microsoft.azure.cosmosdb.spark.config.Config
    6. // Loading data from Azure Cosmos DB
    7. // Configure connection to your collection
    8. val query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"
    9. val readConfig = Config(Map(
    10. "Endpoint" -> "https://[ACCOUNT].documents.azure.com:443/",
    11. "Masterkey" -> "[MASTER KEY]",
    12. "Database" -> "[DATABASE]",
    13. "PreferredRegions" -> "Central US;East US2;",
    14. "Collection" -> "[COLLECTION]",
    15. "SamplingRatio" -> "1.0",
    16. "query_custom" -> query
    17. ))
    18. // Connect via azure-cosmosdb-spark to create Spark DataFrame
    19. val df = spark.read.cosmosDB(readConfig)
    20. df.count
    21. // Saving data to Azure Cosmos DB
    22. // Configure connection to the sink collection
    23. val writeConfig = Config(Map(
    24. "Endpoint" -> "https://[ACCOUNT].documents.azure.com:443/",
    25. "Masterkey" -> "[MASTER KEY]",
    26. "Database" -> "[DATABASE]",
    27. "PreferredRegions" -> "Central US;East US2;",
    28. "Collection" -> "[COLLECTION]",
    29. "WritingBatchSize" -> "100"
    30. ))
    31. // Upsert the DataFrame to Azure Cosmos DB
    32. import org.apache.spark.sql.SaveMode
    33. df.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)
    1. # In Python
    2. # Loading data from Azure Cosmos DB
    3. # Read configuration
    4. query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"
    5. readConfig = {
    6. "Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",
    7. "Masterkey" : "[MASTER KEY]",
    8. "Database" : "[DATABASE]",
    9. "preferredRegions" : "Central US;East US2",
    10. "Collection" : "[COLLECTION]",
    11. "SamplingRatio" : "1.0",
    12. "schema_samplesize" : "1000",
    13. "query_pagesize" : "2147483647",
    14. "query_custom" : query
    15. }
    16. # Connect via azure-cosmosdb-spark to create Spark DataFrame
    17. df = (spark
    18. .read
    19. .format("com.microsoft.azure.cosmosdb.spark")
    20. .options(**readConfig)
    21. .load())
    22. # Count the number of flights
    23. df.count()
    24. # Saving data to Azure Cosmos DB
    25. # Write configuration
    26. writeConfig = {
    27. "Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",
    28. "Masterkey" : "[MASTER KEY]",
    29. "Database" : "[DATABASE]",
    30. "Collection" : "[COLLECTION]",
    31. "Upsert" : "true"
    32. }
    33. # Upsert the DataFrame to Azure Cosmos DB
    34. (df.write
    35. .format("com.microsoft.azure.cosmosdb.spark")
    36. .options(**writeConfig)
    37. .save())

    有关详细信息,请参阅Azure Cosmos DB 文档

    微软 SQL 服务器

    要连接到 MS SQL Server 数据库,请下载 JDBC jar并将其添加到您的类路径中。然后启动一个 Scala 或 PySpark shell,指定这个 jar:

    bin/spark-shell --jars mssql-jdbc-7.2.2.jre8.jar

    以下示例展示了如何使用 Scala 和 PySpark 中的 Spark SQL 数据源 API 和 JDBC 从 MS SQL Server 数据库加载数据并将其保存到:

    1. // In Scala
    2. // Loading data from a JDBC source
    3. // Configure jdbcUrl
    4. val jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"
    5. // Create a Properties() object to hold the parameters.
    6. // Note, you can create the JDBC URL without passing in the
    7. // user/password parameters directly.
    8. val cxnProp = new Properties()
    9. cxnProp.put("user", "[USERNAME]")
    10. cxnProp.put("password", "[PASSWORD]")
    11. cxnProp.put("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    12. // Load data using the connection properties
    13. val jdbcDF = spark.read.jdbc(jdbcUrl, "[TABLENAME]", cxnProp)
    14. // Saving data to a JDBC source
    15. jdbcDF.write.jdbc(jdbcUrl, "[TABLENAME]", cxnProp)
    1. # In Python
    2. # Configure jdbcUrl
    3. jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"
    4. # Loading data from a JDBC source
    5. jdbcDF = (spark
    6. .read
    7. .format("jdbc")
    8. .option("url", jdbcUrl)
    9. .option("dbtable", "[TABLENAME]")
    10. .option("user", "[USERNAME]")
    11. .option("password", "[PASSWORD]")
    12. .load())
    13. # Saving data to a JDBC source
    14. (jdbcDF
    15. .write
    16. .format("jdbc")
    17. .option("url", jdbcUrl)
    18. .option("dbtable", "[TABLENAME]")
    19. .option("user", "[USERNAME]")
    20. .option("password", "[PASSWORD]")
    21. .save())

    其他外部来源

    Apache Spark 可以连接的许多外部数据源只是其中的一部分;其他流行的数据源包括:

    DataFrames 和 Spark SQL 中的高阶函数

    因为复杂数据类型是简单数据类型的合并,所以很容易直接操作它们。处理复杂数据类型有两种典型的解决方案:

    • 将嵌套结构分解成单独的行,应用一些函数,然后重新创建嵌套结构

    • 构建用户定义的函数

    这些方法的好处是允许您以表格格式思考问题。它们通常涉及(但不限于)使用效用函数,例如get_json_object()from_json()to_json()explode()selectExpr()

    让我们仔细看看这两个选项。

    选项 1:爆炸和收集

    在这个嵌套的 SQL 语句中,我们首先为 中的每个元素 ( )explode(values)创建一个新行 (带有) :idvaluevalues

    1. -- In SQL
    2. SELECT id, collect_list(value + 1) AS values
    3. FROM (SELECT id, EXPLODE(values) AS value
    4. FROM table) x
    5. GROUP BY id

    虽然collect_list()返回具有重复的对象列表,但该GROUP BY语句需要随机操作,这意味着重新收集的数组的顺序不一定与原始数组的顺序相同。由于values可以是任意数量的维度(一个非常宽和/或非常长的数组)并且我们正在做一个GROUP BY,这种方法可能非常昂贵。

    选项 2:用户定义的函数

    要执行相同的任务(添加1到 中的每个元素values),我们还可以创建一个map()用于遍历每个元素 ( value) 并执行添加操作的 UDF:

    1. // In Scala
    2. def addOne(values: Seq[Int]): Seq[Int] = {
    3. values.map(value => value + 1)
    4. }
    5. val plusOneInt = spark.udf.register("plusOneInt", addOne(_: Seq[Int]): Seq[Int])

    然后我们可以在 Spark SQL 中使用这个 UDF,如下所示:

    spark.sql("SELECT id, plusOneInt(values) A​​S values FROM table").show()

    虽然这比使用更好,explode()而且collect_list()不会有任何排序问题,但序列化和反序列化过程本身可能很昂贵。然而,同样重要的是要注意,这collect_list()可能会导致执行程序遇到大型数据集的内存不足问题,而使用 UDF 可以缓解这些问题。

    复杂数据类型的内置函数

    除了使用这些可能很昂贵的技术外,您还可以使用一些内置函数来处理 Apache Spark 2.4 及更高版本中包含的复杂数据类型。一些比较常见的列在表 5-3(数组类型)和表 5-4(映射类型)中。

    表 5-3。数组类型函数
    功能说明询问输出
    array_distinct(array): array
    删除数组中的重复项
    SELECT array_distinct(array(1, 2, 3, null, 3));[1,2,3,null]
    array_intersect(array, array): array
    返回没有重复的两个数组的交集
    SELECT array_intersect(array(1, 2, 3), array(1, 3, 5));[1,3]
    array_union(array, array): array
    返回没有重复的两个数组的并集
    SELECT array_union(array(1, 2, 3), array(1, 3, 5));[1,2,3,5]
    array_except(array, array): array
    array1返回 in但不在 in中的 元素array2,不重复
    SELECT array_except(array(1, 2, 3), array(1, 3, 5));[2]
    array_join(array, String[, String]): String
    使用分隔符连接数组的元素
    SELECT array_join(array('hello', 'world'), ' ');hello world
    array_max(array): T
    返回数组中的最大值;null元素被跳过
    SELECT array_max(array(1, 20, null, 3));20
    array_min(array): T
    返回数组中的最小值;null元素被跳过
    SELECT array_min(array(1, 20, null, 3));1
    array_position(array, T): Long
    将给定数组的第一个元素的(从 1 开始的)索引返回为Long
    SELECT array_position(array(3, 2, 1), 1);3
    array_remove(array, T): array
    从给定数组中删除所有等于给定元素的元素
    SELECT array_remove(array(1, 2, 3, null, 3), 3);[1,2,null]
    arrays_overlap(array, array): array
    true如果array1包含至少一个null也存在于中的非元素,则 返回array2
    SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5));true
    array_sort(array): array
    按升序对输入数组进行排序,空元素放置在数组的末尾
    SELECT array_sort(array('b', 'd', null, 'c', 'a'));["a","b","c","d",null]
    concat(array, ...): array
    连接字符串、二进制文件、数组等。
    SELECT concat(array(1, 2, 3), array(4, 5), array(6));[1,2,3,4,5,6]
    flatten(array>): array
    将数组数组展平为单个数组
    SELECT flatten(array(array(1, 2), array(3, 4)));[1,2,3,4]
    array_repeat(T, Int): array
    以指定次数返回包含指定元素的数组
    SELECT array_repeat('123', 3);["123","123","123"]
    reverse(array): array
    返回一个反转的字符串或一个元素顺序相反的数组
    SELECT reverse(array(2, 1, 4, 3));[3,4,1,2]
    sequence(T, T[, T]): array
    通过增量步骤生成从开始到停止(包括)的元素数组
    SELECT sequence(1, 5);
    SELECT sequence(5, 1);
    SELECT sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month);
    [1,2,3,4,5]
    [5,4,3,2,1]
    ["2018-01-01", "2018-02-01", "2018-03-01"]
    shuffle(array): array
    返回给定数组的随机排列
    SELECT shuffle(array(1, 20, null, 3));[null,3,20,1]
    slice(array, Int, Int): array
    返回从给定索引开始的给定数组的子集(如果索引为负数,则从末尾开始计数),具有指定长度
    SELECT slice(array(1, 2, 3, 4), -2, 2);[3,4]
    array_zip(array, array, ...): array>
    返回结构的合并数组
    SELECT arrays_zip(array(1, 2), array(2, 3), array(3, 4));[{"0":1,"1":2,"2":3},{"0":2,"1":3,"2":4}]
    element_at(array, Int): T /
    返回给定数组在给定(从 1 开始)索引处的元素
    SELECT element_at(array(1, 2, 3), 2);2
    cardinality(array): Int
    的别名size;返回给定数组或映射的大小
    SELECT cardinality(array('b', 'd', 'c', 'a'));4
    表 5-4。地图功能
    功能说明询问输出
    map_form_arrays(array, array): map
    从给定的键/值数组对创建一个映射;键中的元素不应该是null
    SELECT map_from_arrays(array(1.0, 3.0), array('2', '4'));{"1.0":"2", "3.0":"4"}
    map_from_entries(array>): map
    返回从给定数组创建的地图
    SELECT map_from_entries(array(struct(1, 'a'), struct(2, 'b')));{"1":"a", "2":"b"}
    map_concat(map, ...): map
    返回输入映射的并集
    SELECT map_concat(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));{"1":"a", "2":"c","3":"d"}
    element_at(map, K): V
    返回给定键的值,或者null如果键不包含在映射中
    SELECT element_at(map(1, 'a', 2, 'b'), 2);b
    cardinality(array): Int
    的别名size;返回给定数组或映射的大小
    SELECT cardinality(map(1, 'a', 2, 'b'));2

    高阶函数

    除了前面提到的内置函数之外,还有以匿名 lambda 函数作为参数的高阶函数。高阶函数的示例如下:

    1. -- In SQL
    2. transform(values, value -> lambda expression)

    transform()函数将一个数组 ( values) 和匿名函数 (lambda表达式) 作为输入。该函数通过对每个元素应用匿名函数透明地创建一个新数组,然后将结果分配给输出数组(类似于 UDF 方法,但更有效)。

    让我们创建一个示例数据集,以便我们可以运行一些示例:

    1. # In Python
    2. from pyspark.sql.types import *
    3. schema = StructType([StructField("celsius", ArrayType(IntegerType()))])
    4. t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
    5. t_c = spark.createDataFrame(t_list, schema)
    6. t_c.createOrReplaceTempView("tC")
    7. # Show the DataFrame
    8. t_c.show()
    1. // In Scala
    2. // Create DataFrame with two rows of two arrays (tempc1, tempc2)
    3. val t1 = Array(35, 36, 32, 30, 40, 42, 38)
    4. val t2 = Array(31, 32, 34, 55, 56)
    5. val tC = Seq(t1, t2).toDF("celsius")
    6. tC.createOrReplaceTempView("tC")
    7. // Show the DataFrame
    8. tC.show()

    这是输出:

    1. +--------------------+
    2. | celsius|
    3. +--------------------+
    4. |[35, 36, 32, 30, ...|
    5. |[31, 32, 34, 55, 56]|
    6. +--------------------+

    使用前面的 DataFrame,您可以运行以下高阶函数查询。

    transform()

    transform(array<T>, function<T, U>): array<U>

    transform()函数通过对输入数组的每个元素应用一个函数来生成一个数组(类似于map()函数):

    1. // In Scala/Python
    2. // Calculate Fahrenheit from Celsius for an array of temperatures
    3. spark.sql("""
    4. SELECT celsius,
    5. transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
    6. FROM tC
    7. """).show()
    8. +--------------------+--------------------+
    9. | celsius| fahrenheit|
    10. +--------------------+--------------------+
    11. |[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
    12. |[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
    13. +--------------------+--------------------+

    filter()

    filter(array<T>, function<T, Boolean>): array<T>

    filter()函数生成一个数组,该数组仅由布尔函数为的输入数组的元素组成true

    1. // In Scala/Python
    2. // Filter temperatures > 38C for array of temperatures
    3. spark.sql("""
    4. SELECT celsius,
    5. filter(celsius, t -> t > 38) as high
    6. FROM tC
    7. """).show()
    8. +--------------------+--------+
    9. | celsius| high|
    10. +--------------------+--------+
    11. |[35, 36, 32, 30, ...|[40, 42]|
    12. |[31, 32, 34, 55, 56]|[55, 56]|
    13. +--------------------+--------+

    exists()

    exists(array<T>, function<T, V, Boolean>): Boolean

    如果布尔函数适用于输入数组中的任何元素,则该exists()函数返回:true

    1. // In Scala/Python
    2. // Is there a temperature of 38C in the array of temperatures
    3. spark.sql("""
    4. SELECT celsius,
    5. exists(celsius, t -> t = 38) as threshold
    6. FROM tC
    7. """).show()
    8. +--------------------+---------+
    9. | celsius|threshold|
    10. +--------------------+---------+
    11. |[35, 36, 32, 30, ...| true|
    12. |[31, 32, 34, 55, 56]| false|
    13. +--------------------+---------+

    reduce()

    reduce(array<T>, B, function<B, T, B>, function<B, R>)

    该函数通过将元素合并到缓冲区中来reduce()将数组的元素减少为单个值,并在最终缓冲区上应用整理Bfunctionfunction:

    1. // In Scala/Python
    2. // Calculate average temperature and convert to F
    3. spark.sql("""
    4. SELECT celsius,
    5. reduce(
    6. celsius,
    7. 0,
    8. (t, acc) -> t + acc,
    9. acc -> (acc div size(celsius) * 9 div 5) + 32
    10. ) as avgFahrenheit
    11. FROM tC
    12. """).show()
    13. +--------------------+-------------+
    14. | celsius|avgFahrenheit|
    15. +--------------------+-------------+
    16. |[35, 36, 32, 30, ...| 96|
    17. |[31, 32, 34, 55, 56]| 105|
    18. +--------------------+-------------+

    通用数据帧和 Spark SQL 操作

    Spark SQL 的部分强大功能来自它支持的广泛的 DataFrame 操作(也称为无类型数据集操作)。操作列表非常广泛,包括:

    • 聚合函数

    • 集合函数

    • 日期时间函数

    • 数学函数

    • 杂项功能

    • 非聚合函数

    • 排序功能

    • 字符串函数

    • UDF 函数

    • 窗口函数

    有关完整列表,请参阅Spark SQL 文档

    在本章中,我们将重点关注以下常见的关系操作:

    • Unions and joins

    • Windowing

    • Modifications

    要执行这些 DataFrame 操作,我们将首先准备一些数据。在下面的代码片段中,我们:

    1. 导入两个文件并创建两个 DataFrame,一个用于机场 ( airports) 信息,一个用于美国航班延误 ( departureDelays)。

    2. 使用expr(),将delaydistance列从转换STRINGINT

    3. 创建一个较小的表,foo我们可以专注于我们的演示示例;它仅包含从西雅图 (SEA) 到旧金山 (SFO) 目的地的三个航班的一小段时间范围内的信息。

    让我们开始吧:

    1. // In Scala
    2. import org.apache.spark.sql.functions._
    3. // Set file paths
    4. val delaysPath =
    5. "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
    6. val airportsPath =
    7. "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
    8. // Obtain airports data set
    9. val airports = spark.read
    10. .option("header", "true")
    11. .option("inferschema", "true")
    12. .option("delimiter", "\t")
    13. .csv(airportsPath)
    14. airports.createOrReplaceTempView("airports")
    15. // Obtain departure Delays data set
    16. val delays = spark.read
    17. .option("header","true")
    18. .csv(delaysPath)
    19. .withColumn("delay", expr("CAST(delay as INT) as delay"))
    20. .withColumn("distance", expr("CAST(distance as INT) as distance"))
    21. delays.createOrReplaceTempView("departureDelays")
    22. // Create temporary small table
    23. val foo = delays.filter(
    24. expr("""origin == 'SEA' AND destination == 'SFO' AND
    25. date like '01010%' AND delay > 0"""))
    26. foo.createOrReplaceTempView("foo")
    1. # In Python
    2. # Set file paths
    3. from pyspark.sql.functions import expr
    4. tripdelaysFilePath =
    5. "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
    6. airportsFilePath =
    7. "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
    8. # Obtain airports data set
    9. airports = (spark.read
    10. .format("csv")
    11. .options(header="true", inferSchema="true", sep="\t")
    12. .load(airportsFilePath))
    13. airports.createOrReplaceTempView("airports")
    14. # Obtain departure delays data set
    15. departureDelays = (spark.read
    16. .format("csv")
    17. .options(header="true")
    18. .load(tripdelaysFilePath))
    19. departureDelays = (departureDelays
    20. .withColumn("delay", expr("CAST(delay as INT) as delay"))
    21. .withColumn("distance", expr("CAST(distance as INT) as distance")))
    22. departureDelays.createOrReplaceTempView("departureDelays")
    23. # Create temporary small table
    24. foo = (departureDelays
    25. .filter(expr("""origin == 'SEA' and destination == 'SFO' and
    26. date like '01010%' and delay > 0""")))
    27. foo.createOrReplaceTempView("foo")

    departureDelaysDataFrame 包含超过 130 万次航班的数据,而DataFramefoo仅包含三行,其中包含特定时间范围内从 SEA 到 SFO 的航班信息,如下面的输出所示:

    1. // Scala/Python
    2. spark.sql("SELECT * FROM airports LIMIT 10").show()
    3. +-----------+-----+-------+----+
    4. | City|State|Country|IATA|
    5. +-----------+-----+-------+----+
    6. | Abbotsford| BC| Canada| YXX|
    7. | Aberdeen| SD| USA| ABR|
    8. | Abilene| TX| USA| ABI|
    9. | Akron| OH| USA| CAK|
    10. | Alamosa| CO| USA| ALS|
    11. | Albany| GA| USA| ABY|
    12. | Albany| NY| USA| ALB|
    13. |Albuquerque| NM| USA| ABQ|
    14. | Alexandria| LA| USA| AEX|
    15. | Allentown| PA| USA| ABE|
    16. +-----------+-----+-------+----+
    17. spark.sql("SELECT * FROM departureDelays LIMIT 10").show()
    18. +--------+-----+--------+------+-----------+
    19. | date|delay|distance|origin|destination|
    20. +--------+-----+--------+------+-----------+
    21. |01011245| 6| 602| ABE| ATL|
    22. |01020600| -8| 369| ABE| DTW|
    23. |01021245| -2| 602| ABE| ATL|
    24. |01020605| -4| 602| ABE| ATL|
    25. |01031245| -4| 602| ABE| ATL|
    26. |01030605| 0| 602| ABE| ATL|
    27. |01041243| 10| 602| ABE| ATL|
    28. |01040605| 28| 602| ABE| ATL|
    29. |01051245| 88| 602| ABE| ATL|
    30. |01050605| 9| 602| ABE| ATL|
    31. +--------+-----+--------+------+-----------+
    32. spark.sql("SELECT * FROM foo").show()
    33. +--------+-----+--------+------+-----------+
    34. | date|delay|distance|origin|destination|
    35. +--------+-----+--------+------+-----------+
    36. |01010710| 31| 590| SEA| SFO|
    37. |01010955| 104| 590| SEA| SFO|
    38. |01010730| 5| 590| SEA| SFO|
    39. +--------+-----+--------+------+-----------+

    在接下来的部分中,我们将使用这些数据执行联合、连接和开窗示例。

    Unions

    Apache Spark 中的一个常见模式是将具有相同架构的两个不同 DataFrame 联合在一起。这可以使用以下union()方法来实现:

    1. // Scala
    2. // Union two tables
    3. val bar = delays.union(foo)
    4. bar.createOrReplaceTempView("bar")
    5. bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
    6. AND date LIKE '01010%' AND delay > 0""")).show()
    1. # In Python
    2. # Union two tables
    3. bar = departureDelays.union(foo)
    4. bar.createOrReplaceTempView("bar")
    5. # Show the union (filtering for SEA and SFO in a specific time range)
    6. bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
    7. AND date LIKE '01010%' AND delay > 0""")).show()

    barDataFrame 是foowith的并集delays。在 DataFrame 中使用相同的过滤标准bar,我们看到了重复的foo数据,正如预期的那样:

    1. -- In SQL
    2. spark.sql("""
    3. SELECT *
    4. FROM bar
    5. WHERE origin = 'SEA'
    6. AND destination = 'SFO'
    7. AND date LIKE '01010%'
    8. AND delay > 0
    9. """).show()
    10. +--------+-----+--------+------+-----------+
    11. | date|delay|distance|origin|destination|
    12. +--------+-----+--------+------+-----------+
    13. |01010710| 31| 590| SEA| SFO|
    14. |01010955| 104| 590| SEA| SFO|
    15. |01010730| 5| 590| SEA| SFO|
    16. |01010710| 31| 590| SEA| SFO|
    17. |01010955| 104| 590| SEA| SFO|
    18. |01010730| 5| 590| SEA| SFO|
    19. +--------+-----+--------+------+-----------+

    Joins

    一个常见的 DataFrame 操作是将两个 DataFrame(或表)连接在一起。默认情况下,Spark SQL 连接是inner join,选项为innercrossouterfullfull_outerleftleft_outerrightright_outerleft_semi, 和left_anti文档中提供了更多信息(这适用于 Scala 和 Python)。

    以下代码示例执行和DataFrameinner之间的默认连接:airportsfoo

    1. // In Scala
    2. foo.join(
    3. airports.as('air),
    4. $"air.IATA" === $"origin"
    5. ).select("City", "State", "date", "delay", "distance", "destination").show()
    1. # In Python
    2. # Join departure delays data (foo) with airport info
    3. foo.join(
    4. airports,
    5. airports.IATA == foo.origin
    6. ).select("City", "State", "date", "delay", "distance", "destination").show()
    1. -- In SQL
    2. spark.sql("""
    3. SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination
    4. FROM foo f
    5. JOIN airports a
    6. ON a.IATA = f.origin
    7. """).show()

    上述代码允许您从 DataFrame 中查看日期、延迟、距离和目的地信息,并从fooDataFrame 中加入城市和州信息airports

    1. +-------+-----+--------+-----+--------+-----------+
    2. | City|State| date|delay|distance|destination|
    3. +-------+-----+--------+-----+--------+-----------+
    4. |Seattle| WA|01010710| 31| 590| SFO|
    5. |Seattle| WA|01010955| 104| 590| SFO|
    6. |Seattle| WA|01010730| 5| 590| SFO|
    7. +-------+-----+--------+-----+--------+-----------+

    Windowing

    窗口函数使用窗口中的行(输入行的范围)中的值来返回一组值,通常以另一行的形式。使用窗口函数,可以对一组行进行操作,同时仍为每个输入行返回一个值。在本节中,我们将展示如何使用dense_rank()窗口函数;还有许多其他功能,如表 5-5 所示

    表 5-5。窗口函数
     SQL数据帧 API
    排名功能rank()rank()
     dense_rank()denseRank()
     percent_rank()percentRank()
     ntile()ntile()
     row_number()rowNumber()
    分析函数cume_dist()cumeDist()
     first_value()firstValue()
     last_value()lastValue()
     lag()lag()
     lead()lead()

    让我们先回顾一下从西雅图 (SEA)、旧金山 (SFO) 和纽约市 (JFK) 出发并前往一组特定目的地位置的航班所经历的TotalDelays(由 计算),如以下查询中所述sum(Delay)

    1. -- In SQL
    2. DROP TABLE IF EXISTS departureDelaysWindow;
    3. CREATE TABLE departureDelaysWindow AS
    4. SELECT origin, destination, SUM(delay) AS TotalDelays
    5. FROM departureDelays
    6. WHERE origin IN ('SEA', 'SFO', 'JFK')
    7. AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
    8. GROUP BY origin, destination;
    9. SELECT * FROM departureDelaysWindow
    10. +------+-----------+-----------+
    11. |origin|destination|TotalDelays|
    12. +------+-----------+-----------+
    13. | JFK| ORD| 5608|
    14. | SEA| LAX| 9359|
    15. | JFK| SFO| 35619|
    16. | SFO| ORD| 27412|
    17. | JFK| DEN| 4315|
    18. | SFO| DEN| 18688|
    19. | SFO| SEA| 17080|
    20. | SEA| SFO| 22293|
    21. | JFK| ATL| 12141|
    22. | SFO| ATL| 5091|
    23. | SEA| DEN| 13645|
    24. | SEA| ATL| 4535|
    25. | SEA| ORD| 10041|
    26. | JFK| SEA| 7856|
    27. | JFK| LAX| 35755|
    28. | SFO| JFK| 24100|
    29. | SFO| LAX| 40798|
    30. | SEA| JFK| 4667|
    31. +------+-----------+-----------+

    如果您想为每个始发机场找出延误最多的三个目的地,该怎么办?您可以通过为每个来源运行三个不同的查询然后将结果合并在一起来实现这一点,如下所示:

    1. -- In SQL
    2. SELECT origin, destination, sum(TotalDelays) as sumTotalDelays
    3. FROM departureDelaysWindow
    4. WHERE origin = 'SEA'
    5. GROUP BY origin, destination
    6. ORDER BY sumTotalDelays DESC
    7. LIMIT 3

    其中[ORIGIN]是 、 和 的三个不同的JFK原点SEASFO

    但更好的方法是使用窗口函数dense_rank()来执行以下计算:

    1. -- In SQL
    2. spark.sql("""
    3. SELECT origin, destination, TotalDelays, rank
    4. FROM (
    5. SELECT origin, destination, TotalDelays, dense_rank()
    6. OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
    7. FROM departureDelaysWindow
    8. ) t
    9. WHERE rank <= 3
    10. """).show()
    11. +------+-----------+-----------+----+
    12. |origin|destination|TotalDelays|rank|
    13. +------+-----------+-----------+----+
    14. | SEA| SFO| 22293| 1|
    15. | SEA| DEN| 13645| 2|
    16. | SEA| ORD| 10041| 3|
    17. | SFO| LAX| 40798| 1|
    18. | SFO| ORD| 27412| 2|
    19. | SFO| JFK| 24100| 3|
    20. | JFK| LAX| 35755| 1|
    21. | JFK| SFO| 35619| 2|
    22. | JFK| ATL| 12141| 3|
    23. +------+-----------+-----------+----+

    通过使用dense_rank()窗口函数,我们可以快速确定三个始发城市延误最严重的目的地是:

    • 西雅图 (SEA):旧金山 (SFO)、丹佛 (DEN) 和芝加哥 (ORD)

    • 旧金山 (SFO):洛杉矶 (LAX)、芝加哥 (ORD) 和纽约 (JFK)

    • 纽约 (JFK):洛杉矶 (LAX)、旧金山 (SFO) 和亚特兰大 (ATL)

    需要注意的是,每个窗口分组都需要适合单个执行程序,并且在执行期间将组合成单个分区。因此,您需要确保您的查询不是无界的(即限制您的窗口大小).

    Modifications

    另一个常见的操作是对DataFrame进行修改。虽然 DataFrame 本身是不可变的,但您可以通过创建新的、不同的 DataFrame、具有不同列的操作来修改它们。(回想一下前面的章节,底层的 RDD 是不可变的——即它们不能被改变——以确保 Spark 操作存在数据沿袭。)让我们从之前的小型 DataFrame 示例开始:

    1. // In Scala/Python
    2. foo.show()
    3. --------+-----+--------+------+-----------+
    4. | date|delay|distance|origin|destination|
    5. +--------+-----+--------+------+-----------+
    6. |01010710| 31| 590| SEA| SFO|
    7. |01010955| 104| 590| SEA| SFO|
    8. |01010730| 5| 590| SEA| SFO|
    9. +--------+-----+--------+------+-----------+

    Adding new columns

    要向fooDataFrame 添加新列,请使用以下withColumn()方法:

    1. // In Scala
    2. import org.apache.spark.sql.functions.expr
    3. val foo2 = foo.withColumn(
    4. "status",
    5. expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
    6. )
    1. # In Python
    2. from pyspark.sql.functions import expr
    3. foo2 = (foo.withColumn(
    4. "status",
    5. expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
    6. ))

    新创建foo2的 DataFrame 具有原始fooDataFrame 的内容加上语句定义的附加statusCASE

    1. // In Scala/Python
    2. foo2.show()
    3. +--------+-----+--------+------+-----------+-------+
    4. | date|delay|distance|origin|destination| status|
    5. +--------+-----+--------+------+-----------+-------+
    6. |01010710| 31| 590| SEA| SFO|Delayed|
    7. |01010955| 104| 590| SEA| SFO|Delayed|
    8. |01010730| 5| 590| SEA| SFO|On-time|
    9. +--------+-----+--------+------+-----------+-------+

    Dropping columns

    要删除一列,请使用该drop()方法。例如,让我们删除该delay列,因为我们现在有一个status列,在上一节中添加:

    1. // In Scala
    2. val foo3 = foo2.drop("delay")
    3. foo3.show()
    1. # In Python
    2. foo3 = foo2.drop("delay")
    3. foo3.show()
    4. +--------+--------+------+-----------+-------+
    5. | date|distance|origin|destination| status|
    6. +--------+--------+------+-----------+-------+
    7. |01010710| 590| SEA| SFO|Delayed|
    8. |01010955| 590| SEA| SFO|Delayed|
    9. |01010730| 590| SEA| SFO|On-time|
    10. +--------+--------+------+-----------+-------+

    Renaming columns

    您可以使用以下方法重命名列withColumnRenamed()

    1. // In Scala
    2. val foo4 = foo3.withColumnRenamed("status", "flight_status")
    3. foo4.show()
    1. # In Python
    2. foo4 = foo3.withColumnRenamed("status", "flight_status")
    3. foo4.show()
    4. +--------+--------+------+-----------+-------------+
    5. | date|distance|origin|destination|flight_status|
    6. +--------+--------+------+-----------+-------------+
    7. |01010710| 590| SEA| SFO| Delayed|
    8. |01010955| 590| SEA| SFO| Delayed|
    9. |01010730| 590| SEA| SFO| On-time|
    10. +--------+--------+------+-----------+-------------+

    Pivoting(旋转

    在处理您的数据时,有时您需要将列换成行——即,透视您的数据。让我们抓取一些数据来演示这个概念:

    1. -- In SQL
    2. SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
    3. FROM departureDelays
    4. WHERE origin = 'SEA'
    5. +-----------+-----+-----+
    6. |destination|month|delay|
    7. +-----------+-----+-----+
    8. | ORD| 1| 92|
    9. | JFK| 1| -7|
    10. | DFW| 1| -5|
    11. | MIA| 1| -3|
    12. | DFW| 1| -3|
    13. | DFW| 1| 1|
    14. | ORD| 1| -10|
    15. | DFW| 1| -6|
    16. | DFW| 1| -2|
    17. | ORD| 1| -3|
    18. +-----------+-----+-----+
    19. only showing top 10 rowsspan>

    透视允许您将名称放在month列中(而不是12您可以分别显示JanFeb)以及按目的地和月份对延迟执行聚合计算(在本例中为平均值和最大值):

    1. -- In SQL
    2. SELECT * FROM (
    3. SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
    4. FROM departureDelays WHERE origin = 'SEA'
    5. )
    6. PIVOT (
    7. CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
    8. FOR month IN (1 JAN, 2 FEB)
    9. )
    10. ORDER BY destination
    11. +-----------+------------+------------+------------+------------+
    12. |destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
    13. +-----------+------------+------------+------------+------------+
    14. | ABQ| 19.86| 316| 11.42| 69|
    15. | ANC| 4.44| 149| 7.90| 141|
    16. | ATL| 11.98| 397| 7.73| 145|
    17. | AUS| 3.48| 50| -0.21| 18|
    18. | BOS| 7.84| 110| 14.58| 152|
    19. | BUR| -2.03| 56| -1.89| 78|
    20. | CLE| 16.00| 27| null| null|
    21. | CLT| 2.53| 41| 12.96| 228|
    22. | COS| 5.32| 82| 12.18| 203|
    23. | CVG| -0.50| 4| null| null|
    24. | DCA| -1.15| 50| 0.07| 34|
    25. | DEN| 13.13| 425| 12.95| 625|
    26. | DFW| 7.95| 247| 12.57| 356|
    27. | DTW| 9.18| 107| 3.47| 77|
    28. | EWR| 9.63| 236| 5.20| 212|
    29. | FAI| 1.84| 160| 4.21| 60|
    30. | FAT| 1.36| 119| 5.22| 232|
    31. | FLL| 2.94| 54| 3.50| 40|
    32. | GEG| 2.28| 63| 2.87| 60|
    33. | HDN| -0.44| 27| -6.50| 0|
    34. +-----------+------------+------------+------------+------------+
    35. only showing top 20 rows

    概括

    本章探讨了 Spark SQL 如何与外部组件交互。我们讨论了创建用户定义的函数,包括 Pandas UDF,并提供了一些用于执行 Spark SQL 查询的选项(包括 Spark SQL shell、Beeline 和 Tableau)。然后,我们提供了如何使用 Spark SQL 连接各种外部数据源的示例,例如 SQL 数据库、PostgreSQL、MySQL、Tableau、Azure Cosmos DB、MS SQL Server 等。

    我们探索了 Spark 针对复杂数据类型的内置函数,并给出了一些使用高阶函数的示例。最后,我们讨论了一些常见的关系运算符,并展示了如何执行选择的 DataFrame 操作。

  • 相关阅读:
    玩机搞机-------安卓修改apk apk的组成和编译 一
    微信小程序开发学习笔记
    代码随想录算法训练营29期|day55 任务以及具体安排
    POST请求
    矩阵分析与应用(22)
    修炼离线:(八)Linux使用wget安装出现Unable to establish SSL connection
    录屏软件哪个好?比较好用的录屏软件,这4款值得一试!
    STM32 NAND FLASH知识点
    ros学习笔记10——rostopic中增加时间戳功能
    【含面试】解锁MySQL group_concat的无限可能性:解决长度限制并实现高效查询
  • 原文地址:https://blog.csdn.net/sikh_0529/article/details/127408276