• 从0开始学人工智能测试节选:Spark -- 结构化数据领域中测试人员的万金油技术(二)


    Dataframe

    dataframe 是spark中参考pandas设计出的一套高级API,用户可以像操作pandas一样方便的操作结构化数据。毕竟纯的RDD操作是十分原始且麻烦的。而dataframe的出现可以让熟悉pandas的从业人员能用非常少的成本完成分布式的数据分析工作, 毕竟跟数据打交道的人很少有不懂dataframe的。

    初始化dataframe的方法

    1. from pyspark import SparkContext, SparkConf, SQLContext
    2. from pyspark.sql import Row
    3. logFile = "/Users/xxxx/tools/spark-3.0.3-bin-hadoop2.7/README.md"
    4. conf = SparkConf().setMaster("local").setAppName("My App")
    5. sc = SparkContext(conf=conf)
    6. sqlContext = SQLContext(sc)
    7. dataA = sqlContext.read.csv("路径")
    8. dicts = [{'col1': 'a', 'col2': 1}, {'col1': 'b', 'col2': 2}]
    9. dataf = sqlContext.createDataFrame(dicts)
    10. dataf.show()
    11. dicts = [['a', 1], ['b', 2]]
    12. rdd = sc.parallelize(dicts)
    13. dataf = sqlContext.createDataFrame(rdd, ['col1','col2'])
    14. dataf.show()
    15. rows = [Row(col1='a', col2=1), Row(col1='b', col2=2)]
    16. dataf= sqlContext.createDataFrame(rows)
    17. dataf.show()
    18. dataf.write.csv(path="/Users/cainsun/Downloads/test_spark", header=True, sep=",", mode='overwrite')

    可以看到创建dataframe有多种方式, 可以从文件中读取, 可以从列表中初始化,可以用简单的方式指定列信息, 也可以使用Row类来初始化列信息。

    dataframe常用操作

    读取数据:

    1. df = spark.read.json("data.json")
    2. df = spark.read.csv("data.csv", header=True, inferSchema=True)
    3. df = spark.read.parquet("data.parquet")

    显示数据:

    # 显示前 n 行数据,默认为 20 行

    df.show(n=5)

    # 打印 DataFrame 的 schema

    df.printSchema()

    选择和过滤数据:

    # 选择特定列

    selected_df = df.select("column1", "column2")

    # 使用条件过滤数据

    filtered_df = df.filter(df["age"] > 30)

    聚合和分组数据:

    1. from pyspark import SparkContext, SparkConf, SQLContext
    2. conf = SparkConf().setMaster("local").setAppName("My App")
    3. sc = SparkContext(conf=conf)
    4. sqlContext = SQLContext(sc)
    5. dicts = [
    6. ['teacher', 202355, 16, '336051551@qq.com'],
    7. ['student', 2035, 16, '336051551@qq.com'],
    8. ['qa', 2355, 16, '336051551@qq.com'],
    9. ['qa', 20235, 16, '336051551@qq.com'],
    10. ['teacher', 35, 16, '336051asdf'],
    11. ['student', 453, 16, '336051asdf'],
    12. ]
    13. rdd = sc.parallelize(dicts, 3)
    14. data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])
    15. result = data.groupBy("title").max("sales").alias("max_sales")
    16. resultA = data.groupBy("title").sum("sales").alias("sum_sales")
    17. # 显示结果
    18. result.show()
    19. resultA.show()
    20. +-------+----------+
    21. | title|max(sales)|
    22. +-------+----------+
    23. |teacher| 202355|
    24. | qa| 20235|
    25. |student| 2035|
    26. +-------+----------+
    27. +-------+----------+
    28. | title|sum(sales)|
    29. +-------+----------+
    30. |teacher| 202390|
    31. | qa| 22590|
    32. |student| 2488|
    33. +-------+----------+
    34. 数据排序:
    35. from pyspark.sql.functions import desc
    36. # 按列排序
    37. sorted_df = df.sort("column1")
    38. # 按列降序排序
    39. sorted_df = df.sort(desc("column1"))
    40. 添加,修改和删除列:
    41. from pyspark.sql.functions import upper
    42. # 添加新列
    43. new_df = df.withColumn("new_column", df["column1"] * 2)
    44. # 修改现有列
    45. modified_df = df.withColumn("column1", upper(df["column1"]))
    46. # 删除列
    47. dropped_df = df.drop("column1")
    48. 重命名列:
    49. # 重命名 DataFrame 中的列
    50. renamed_df = df.withColumnRenamed("old_column_name", "new_column_name")

    spark sql

    初始化

    1. from pyspark import SparkContext, SparkConf, SQLContext
    2. # 创建 SparkSession
    3. conf = SparkConf().setMaster("local").setAppName("My App")
    4. sc = SparkContext(conf=conf)
    5. sqlContext = SQLContext(sc)
    6. dicts = [
    7. ['teacher', 202355, 16, '336051551@qq.com'],
    8. ['student', 2035, 16, '336051551@qq.com'],
    9. ['qa', 2355, 16, '336051551@qq.com'],
    10. ['qa', 20235, 16, '336051551@qq.com'],
    11. ['teacher', 35, 16, '336051asdf'],
    12. ['student', 453, 16, '336051asdf'],
    13. ]
    14. rdd = sc.parallelize(dicts, 3)
    15. data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])
    16. data.createOrReplaceTempView("table")

    要使用spark sql的能力, 需要利用createOrReplaceTempView创建一个临时表,然后才能执行 sql

    简单的sql执行

    1. query = "select * from table where title = 'qa'"
    2. resultB = sqlContext.sql(query)
    3. resultB.show()
    4. # 执行结果
    5. +-----+-----+---+----------------+
    6. |title|sales|age| email|
    7. +-----+-----+---+----------------+
    8. | qa| 2355| 16|336051551@qq.com|
    9. | qa|20235| 16|336051551@qq.com|
    10. +-----+-----+---+----------------+

    分组查询

    1. query = "select title, sum(sales), max(sales) from table group by title"
    2. resultC = sqlContext.sql(query)
    3. resultC.show()
    4. # 执行结果
    5. +-------+----------+----------+
    6. | title|sum(sales)|max(sales)|
    7. +-------+----------+----------+
    8. |teacher| 202390| 202355|
    9. | qa| 22590| 20235|
    10. |student| 2488| 2035|
    11. +-------+----------+----------+

    Spark sql适合熟悉sql语法的人使用,本质上sql和dataframe最终都会被翻译成rdd来运行。我们可以把它看成是rdd的高级语法糖就可以。 大家喜欢哪种操作方式就选择哪种就可以。

    数据测试/监控

    回顾自学习与数据闭环那里,我们知道在这样的系统中针对与每天新采集的数据,需要做一道数据校验。下面我模拟一个场景写这样一个检查脚本。

    1. from pyspark import SparkContext, SparkConf, SQLContext
    2. from pyspark.sql import SparkSession
    3. import pyspark.sql.functions as F
    4. conf = SparkConf().setMaster("local").setAppName("My App")
    5. sc = SparkContext(conf=conf)
    6. sqlContext = SQLContext(sc)
    7. rdd = sc.parallelize(range(1000))
    8. print(rdd.map(lambda x: '%s,%s' % ('男', '16')).collect())
    9. dicts = [
    10. ['frank', 202355, 16, '336051551@qq.com'],
    11. ['frank', 202355, 16, '336051551@qq.com'],
    12. ['frank', 202355, 16, '336051551@qq.com'],
    13. ['frank', 202355, 16, '336051551@qq.com'],
    14. ['frank', 202355, 16, '336051asdf'],
    15. ['', 452345, 16, '336051asdf'],
    16. ]
    17. rdd = sc.parallelize(dicts, 3)
    18. dataf = sqlContext.createDataFrame(rdd, ['name', 'id', 'age', 'email'])
    19. # 验证 id 字段必须是整数
    20. id_filter = F.col("id").cast("int") >= 0
    21. # 验证 name 字段必须是非空字符串
    22. name_filter = F.col("name").isNotNull() & (F.col("name") != "")
    23. # 验证 age 字段必须是大于等于 0 的整数
    24. age_filter = F.col("age").cast("int") >= 0
    25. # 验证 email 字段必须是有效的电子邮件地址
    26. email_filter = F.col("email").rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$")
    27. # 应用过滤条件
    28. valid_data = dataf.filter(id_filter & name_filter & age_filter & email_filter)
    29. # 输出符合质量要求的数据
    30. valid_data.show()
    31. # 输出不符合质量要求的数据
    32. invalid_data = dataf.exceptAll(valid_data)
    33. invalid_data.show()

    更多内容欢迎来到我的知识星球:
     

  • 相关阅读:
    MVC使用的设计模式
    mybatis源码阅读系列(一)
    外包干了一个月,技术明显进步。。。。。
    日期分析处理
    Basis运维日常检查清单- Checklist
    智能驾驶ADAS算法设计及Prescan仿真(3): 自适应巡航ACC跟车目标选择策略设计与simulink仿真
    mysql数据库安装
    Python人员信息管理系统(简直期末人福音)
    DTO的作用
    CentOS shell中的变量
  • 原文地址:https://blog.csdn.net/ycwdaaaa/article/details/137912207