• 测试PySpark


    文章最前: 我是Octopus,这个名字来源于我的中文名--章鱼;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github ;这博客是记录我学习的点点滴滴,如果您对 Python、Java、AI、算法有兴趣,可以关注我的动态,一起学习,共同进步。

     相关文章:

    1. PySpark 概述
    2. Spark连接快速入门
    3. Spark上使用pandas API快速入门

    这篇文章旨在帮你写出健壮的pyspark 代码。

    在这里,通过它写pyspark单元测试,看这个代码通过PySpark built,下载该目录代码,查看JIRA 看板票的pyspark测试

    创建PySpark应用

    这边一个例子是怎么创建pyspark应用,如果你的应用已经测试,你可以跳过这一段,测试你的pyspark程序。

    现在,开始测试你的spark session

    1. from pyspark.sql import SparkSession
    2. from pyspark.sql.functions import col
    3. # Create a SparkSession
    4. spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()

    接下来,创建一个DataFrame

    1. sample_data = [{"name": "John D.", "age": 30},
    2. {"name": "Alice G.", "age": 25},
    3. {"name": "Bob T.", "age": 35},
    4. {"name": "Eve A.", "age": 28}]
    5. df = spark.createDataFrame(sample_data)

    现在,我们对我们的DataFrame来定义转换算子

    1. from pyspark.sql.functions import col, regexp_replace
    2. # Remove additional spaces in name
    3. def remove_extra_spaces(df, column_name):
    4. # Remove extra spaces from the specified column
    5. df_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))
    6. return df_transformed
    7. transformed_df = remove_extra_spaces(df, "name")
    8. transformed_df.show()
    +---+--------+
    |age|    name|
    +---+--------+
    | 30| John D.|
    | 25|Alice G.|
    | 35|  Bob T.|
    | 28|  Eve A.|
    +---+--------+

    测试你的pyspark应用

    现在来测试你的pyspark转换算子。一个选择简化DataFrame测试结果,可以简化数据或者输入数据。更好的方式写测试例子,这里有一些例子怎么去测试我们的代码,这些代码是基于spark 3.5以下版本。对于这些例子做笔记是非常值得的,可以通过测试框架,不管你是使用unittest or pytest; built-in PySpark 测试是单机的,意味着他兼容测试框架和CI测试

    选项1: 仅仅使用PySpark Built-in 测试方法

    1. import pyspark.testing
    2. from pyspark.testing.utils import assertDataFrameEqual
    3. # Example 1
    4. df1 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
    5. df2 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
    6. assertDataFrameEqual(df1, df2) # pass, DataFrames are identical
    1. # Example 2
    2. df1 = spark.createDataFrame(data=[("1", 0.1), ("2", 3.23)], schema=["id", "amount"])
    3. df2 = spark.createDataFrame(data=[("1", 0.109), ("2", 3.23)], schema=["id", "amount"])
    4. assertDataFrameEqual(df1, df2, rtol=1e-1) # pass, DataFrames are approx equal by rtol

     您还可以简单地比较两个 DataFrame 模式:

    1. from pyspark.testing.utils import assertSchemaEqual
    2. from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType
    3. s1 = StructType([StructField("names", ArrayType(DoubleType(), True), True)])
    4. s2 = StructType([StructField("names", ArrayType(DoubleType(), True), True)])
    5. assertSchemaEqual(s1, s2) # pass, schemas are identical

    选项 2:使用单元测试

    对于更复杂的测试场景,您可能需要使用测试框架。

    最流行的测试框架选项之一是单元测试。让我们逐步了解如何使用内置 Pythonunittest库来编写 PySpark 测试。有关该unittest库的更多信息,请参阅此处: https: //docs.python.org/3/library/unittest.html

    首先,您需要一个 Spark 会话。您可以使用包@classmethod中的装饰器unittest来负责设置和拆除 Spark 会话。

    1. import unittest
    2. class PySparkTestCase(unittest.TestCase):
    3. @classmethod
    4. def setUpClass(cls):
    5. cls.spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()
    6. @classmethod
    7. def tearDownClass(cls):
    8. cls.spark.stop()

     现在我们来写一个unittest类。

    1. from pyspark.testing.utils import assertDataFrameEqual
    2. class TestTranformation(PySparkTestCase):
    3. def test_single_space(self):
    4. sample_data = [{"name": "John D.", "age": 30},
    5. {"name": "Alice G.", "age": 25},
    6. {"name": "Bob T.", "age": 35},
    7. {"name": "Eve A.", "age": 28}]
    8. # Create a Spark DataFrame
    9. original_df = spark.createDataFrame(sample_data)
    10. # Apply the transformation function from before
    11. transformed_df = remove_extra_spaces(original_df, "name")
    12. expected_data = [{"name": "John D.", "age": 30},
    13. {"name": "Alice G.", "age": 25},
    14. {"name": "Bob T.", "age": 35},
    15. {"name": "Eve A.", "age": 28}]
    16. expected_df = spark.createDataFrame(expected_data)
    17. assertDataFrameEqual(transformed_df, expected_df)
    运行时,unittest将选取名称以“test”开头的所有函数。

    选项 3:使用Pytest

    pytest我们还可以使用最流行的 Python 测试框架之一来编写测试。有关 的更多信息pytest,请参阅此处的文档: https: //docs.pytest.org/en/7.1.x/contents.html

    使用pytest固定装置允许我们在测试之间共享 Spark 会话,并在测试完成时将其拆除。

    1. import pytest
    2. @pytest.fixture
    3. def spark_fixture():
    4. spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()
    5. yield spark

    然后我们可以这样定义我们的测试:

    1. import pytest
    2. from pyspark.testing.utils import assertDataFrameEqual
    3. def test_single_space(spark_fixture):
    4. sample_data = [{"name": "John D.", "age": 30},
    5. {"name": "Alice G.", "age": 25},
    6. {"name": "Bob T.", "age": 35},
    7. {"name": "Eve A.", "age": 28}]
    8. # Create a Spark DataFrame
    9. original_df = spark.createDataFrame(sample_data)
    10. # Apply the transformation function from before
    11. transformed_df = remove_extra_spaces(original_df, "name")
    12. expected_data = [{"name": "John D.", "age": 30},
    13. {"name": "Alice G.", "age": 25},
    14. {"name": "Bob T.", "age": 35},
    15. {"name": "Eve A.", "age": 28}]
    16. expected_df = spark.createDataFrame(expected_data)
    17. assertDataFrameEqual(transformed_df, expected_df)

    当您使用该pytest命令运行测试文件时,它将选取名称以“test”开头的所有函数。

    把它们放在一起!

    让我们在单元测试示例中一起查看所有步骤。

    1. # pkg/etl.py
    2. import unittest
    3. from pyspark.sql import SparkSession
    4. from pyspark.sql.functions import col
    5. from pyspark.sql.functions import regexp_replace
    6. from pyspark.testing.utils import assertDataFrameEqual
    7. # Create a SparkSession
    8. spark = SparkSession.builder.appName("Sample PySpark ETL").getOrCreate()
    9. sample_data = [{"name": "John D.", "age": 30},
    10. {"name": "Alice G.", "age": 25},
    11. {"name": "Bob T.", "age": 35},
    12. {"name": "Eve A.", "age": 28}]
    13. df = spark.createDataFrame(sample_data)
    14. # Define DataFrame transformation function
    15. def remove_extra_spaces(df, column_name):
    16. # Remove extra spaces from the specified column using regexp_replace
    17. df_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))
    18. return df_transformed
    1. # pkg/test_etl.py
    2. import unittest
    3. from pyspark.sql import SparkSession
    4. # Define unit test base class
    5. class PySparkTestCase(unittest.TestCase):
    6. @classmethod
    7. def setUpClass(cls):
    8. cls.spark = SparkSession.builder.appName("Sample PySpark ETL").getOrCreate()
    9. @classmethod
    10. def tearDownClass(cls):
    11. cls.spark.stop()
    12. # Define unit test
    13. class TestTranformation(PySparkTestCase):
    14. def test_single_space(self):
    15. sample_data = [{"name": "John D.", "age": 30},
    16. {"name": "Alice G.", "age": 25},
    17. {"name": "Bob T.", "age": 35},
    18. {"name": "Eve A.", "age": 28}]
    19. # Create a Spark DataFrame
    20. original_df = spark.createDataFrame(sample_data)
    21. # Apply the transformation function from before
    22. transformed_df = remove_extra_spaces(original_df, "name")
    23. expected_data = [{"name": "John D.", "age": 30},
    24. {"name": "Alice G.", "age": 25},
    25. {"name": "Bob T.", "age": 35},
    26. {"name": "Eve A.", "age": 28}]
    27. expected_df = spark.createDataFrame(expected_data)
    28. assertDataFrameEqual(transformed_df, expected_df)
    unittest.main(argv=[''], verbosity=0, exit=False)
    
    在 1.734 秒内完成 1 次测试
    
  • 相关阅读:
    xml文件(mybatis映射文件)中特殊字符转义
    JavaScript面试刷题指南
    15812字教你从零入门node.js(基础篇)
    [JSOI2016] 炸弹攻击1
    Machine Learning With Go 第4章:回归
    nodejs模板引擎(一)
    基于Java毕业设计影院网上售票系统演示录像源码+系统+mysql+lw文档+部署软件
    供应链管理的基本方法
    基于JAVA快递物流管理计算机毕业设计源码+系统+mysql数据库+lw文档+部署
    正则匹配删除指令
  • 原文地址:https://blog.csdn.net/zy345293721/article/details/133849557