• Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】


    前言

            昨天实验课试着做了一个 Spark SQL 小案例,发现好多内容还是没有掌握,以及好多书上没有的内容需要学习。

    一、数据准备

    csv 文件内容部分数据展示:

    1. PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
    2. 1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
    3. 2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
    4. 3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
    5. 4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
    6. 5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
    7. 6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q

    字段说明

    • PassengerId : 乘客编号。
    • Survived : 是否存活,0表示未能存活,1表示存活。
    • Pclass : 描述乘客所属的等级,总共分为三等,用1、2、3来描述:1表示高等;2表示中等;3表示低等。
    • Name : 乘客姓名。
    • Sex : 乘客性别。
    • Age : 乘客年龄。
    • SibSp : 与乘客同行的兄弟姐妹(Siblings)和配偶(Spouse)数目。
    • Parch : 与乘客同行的家长(Parents)和孩子(Children)数目。
    • Ticket : 乘客登船所使用的船票编号。
    • Fare : 乘客上船的花费。
    • Cabin : 乘客所住的船舱。
    • Embarked : 乘客上船时的港口,C表示Cherbourg;Q表示Queenstown;S表示Southampton。

    二、Spark数据预处理

    1、通过读取本地文件生成 DataFrame 对象。

    1. // 创建 SparkSession 对象
    2. val conf = new SparkConf().setMaster("local[*]").setAppName("practice1")
    3. val spark = SparkSession.builder()
    4. .config(conf)
    5. .getOrCreate()
    6. // 导入隐式转换相关依赖
    7. import spark.implicits._
    8. // 读取csv文件生成 DataFrame 对象
    9. val df = spark.read.format("csv")
    10. .option("header","true")
    11. .option("mode","DROPMALFORMED")
    12. .load("data/practice1/titanic.csv")

    2、修改字段类型

    DataFrame 读取进来的都是 StringType 类型,我们需要对部分字段进行修改。

            'withColumn'是一个DataFrame转换函数,用于在现有的DataFrame上添加或替换列。这个函数接收两个参数,第一个是新列的名称,第二个是新列的值。对于新列的值,我们使用 cast 方法将它强制转为一个新的类型。

            cast方法用于将一个数据类型的值转换为另一个数据类型。它可以用于将一种数据类型转换为另一种数据类型,例如将字符串转换为整数或将整数转换为浮点数等。

    withColumn 作为一个转换函数会返回一个新的 DataFrame 对象,记得通过变量或常量存储起来。

    1. // 修改字段数据类型
    2. val md_df = df.withColumn("Pclass", df("Pclass").cast(IntegerType)) // 乘客登记 包括1-2-3三个等级
    3. .withColumn("Survived", df("Survived").cast(IntegerType)) //是否存活-1存活 0-未能存活
    4. .withColumn("Age", df("Age").cast(DoubleType)) // 年龄
    5. .withColumn("SibSp", df("SibSp").cast(IntegerType)) // 乘客的兄弟姐妹和配偶的数量
    6. .withColumn("Parch", df("Parch").cast(IntegerType)) //乘客的家长和孩子数目
    7. .withColumn("Fare", df("Fare").cast(DoubleType)) // 上传的花费

    3、删除不必要的字段

    1. // 删除不必要的字段
    2. val df1 = md_df.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin")

    4、缺失值处理

    用到的函数:

    DSL 语句中的 select、where函数,以及 count 、zip 函数。

    涉及到的操作:

    RDD 对象转为 DataFrame 对象,这里因为RDD对象的内容是元组,所以可以直接调用 toDF 方法。 

    统计缺失值
    1. // 缺失值处理
    2. val columns: Array[String] = df1.columns //返回df1的字段组成的数组 Array("字段1","字段2","字段3"...)
    3. // 通过select方法对字段数组中的每一个字段进行搜索,并通过where方法找出满足列col(字段).isNUll的值的count(个数)
    4. val missing_cnt: Array[Long] = columns.map(field => df1.select(col(field)).where(col(field).isNull).count())
    5. // 通过zip方法将两个集合数组合并成一个元组
    6. val tuples: Array[(Long, String)] = missing_cnt.zip(columns)
    7. // 把生成的元组读取为RDD对象再转为DataFrame对象
    8. val result_df: DataFrame = spark.sparkContext.parallelize(tuples).toDF("missing_cnt", "column_name")
    9. result_df.show() // 统计缺失值

     统计结果展示:

    1. +-----------+-----------+
    2. |missing_cnt|column_name|
    3. +-----------+-----------+
    4. | 0| Survived|
    5. | 0| Pclass|
    6. | 0| Sex|
    7. | 177| Age|
    8. | 0| SibSp|
    9. | 0| Parch|
    10. | 0| Fare|
    11. | 2| Embarked|
    12. +-----------+-----------+
    缺失值处理
    1. // 处理缺失值函数
    2. def meanAge(dataFrame: DataFrame): Double = {
    3. dataFrame.select("Age")
    4. .na.drop() //删除 Age 为空的行
    5. //'round' 函数用于将数字四舍五入到指定的小数位数。'mean' 函数则用于计算一组数值的平均值。
    6. .agg(round(mean("Age"), 0)) //对'Age'列计算平均值,并保留0位小数,也就是取整
    7. .first() //由于agg操作返回的是一个DataFrame,而这个DataFrame只有一行,所以使用first()方法获取这一行。
    8. .getDouble(0) //从结果行中获取第一个字段(索引为0)的值,并将其转换为Double类型。
    9. }

    处理: 

    1. val df2 = df1.na.fill(Map("Age" -> meanAge(df1), "Embarked" -> "S"))
    2. df2.show()

    处理结果展示:

    1. +--------+------+------+----+-----+-----+-------+--------+
    2. |Survived|Pclass| Sex| Age|SibSp|Parch| Fare|Embarked|
    3. +--------+------+------+----+-----+-----+-------+--------+
    4. | 0| 3| male|22.0| 1| 0| 7.25| S|
    5. | 1| 1|female|38.0| 1| 0|71.2833| C|
    6. | 1| 3|female|26.0| 0| 0| 7.925| S|
    7. | 1| 1|female|35.0| 1| 0| 53.1| S|
    8. | 0| 3| male|35.0| 0| 0| 8.05| S|
    9. | 0| 3| male|30.0| 0| 0| 8.4583| Q|
    10. | 0| 1| male|54.0| 0| 0|51.8625| S|
    11. | 0| 3| male| 2.0| 3| 1| 21.075| S|
    12. | 1| 3|female|27.0| 0| 2|11.1333| S|
    13. | 1| 2|female|14.0| 1| 0|30.0708| C|
    14. | 1| 3|female| 4.0| 1| 1| 16.7| S|
    15. | 1| 1|female|58.0| 0| 0| 26.55| S|
    16. | 0| 3| male|20.0| 0| 0| 8.05| S|
    17. | 0| 3| male|39.0| 1| 5| 31.275| S|
    18. | 0| 3|female|14.0| 0| 0| 7.8542| S|
    19. | 1| 2|female|55.0| 0| 0| 16.0| S|
    20. | 0| 3| male| 2.0| 4| 1| 29.125| Q|
    21. | 1| 2| male|30.0| 0| 0| 13.0| S|
    22. | 0| 3|female|31.0| 1| 0| 18.0| S|
    23. | 1| 3|female|30.0| 0| 0| 7.225| C|
    24. +--------+------+------+----+-----+-----+-------+--------+
    25. only showing top 20 rows

    三、Spark 数据分析

    1、891人当中,共多少人生还?

    1. // 1.891人当中,共多少人生还?
    2. val survived_count: DataFrame = df2.groupBy("Survived").count()
    3. survived_count.show()
    4. //保存结果到本地
    5. survived_count.coalesce(1).write.option("header","true").csv("output/practice1/survived_count.csv")

    运行结果: 

    1. +--------+-----+
    2. |Survived|count|
    3. +--------+-----+
    4. | 1| 342|
    5. | 0| 549|
    6. +--------+-----+

    2.不同上船港口生还情况

    1. // 2.不同上船港口生还情况
    2. val survived_embark = df2.groupBy("Embarked", "Survived").count()
    3. survived_embark.show()
    4. survived_embark.coalesce(1).write.option("header","true").csv("data/practice1survived_embark.csv")

    运行结果:

    1. +--------+--------+-----+
    2. |Embarked|Survived|count|
    3. +--------+--------+-----+
    4. | Q| 1| 30|
    5. | S| 0| 427|
    6. | S| 1| 219|
    7. | C| 1| 93|
    8. | Q| 0| 47|
    9. | C| 0| 75|
    10. +--------+--------+-----+

    3.存活/未存活的男女数量及比例

    1. // 3.存活/未存活的男女数量及比例
    2. val survived_sex_count=df2.groupBy("Sex","Survived").count()
    3. val survived_sex_percent=survived_sex_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
    4. survived_sex_percent.show()
    5. survived_sex_percent.coalesce(1).write.option("header", "true").csv("data/practice1/survived_sex_percent.csv")

    运行结果:

    1. +------+--------+-----+--------+
    2. | Sex|Survived|count| percent|
    3. +------+--------+-----+--------+
    4. | male| 0| 468|52.52525|
    5. |female| 1| 233|26.15039|
    6. |female| 0| 81| 9.09091|
    7. | male| 1| 109|12.23345|
    8. +------+--------+-----+--------+

    4. 不同级别乘客生还人数和占总生还人数的比例

    1. // 4. 不同级别乘客生还人数和占总生还人数的比例
    2. val survived_df = df2.filter(col("Survived")===1)
    3. val pclass_survived_count=survived_df.groupBy("Pclass").count()
    4. val pclass_survived_percent=pclass_survived_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
    5. pclass_survived_percent.show()
    6. pclass_survived_percent.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_survived_percent.csv")

    运行结果:

    1. +------+-----+--------+
    2. |Pclass|count| percent|
    3. +------+-----+--------+
    4. | 1| 136|39.76608|
    5. | 3| 119|34.79532|
    6. | 2| 87|25.43860|
    7. +------+-----+--------+

    5. 有无同行父母/孩子的生还情况

    1. // 5.有无同行父母/孩子的生还情况
    2. val df4=df2.withColumn("Parch_label",when(df2("Parch")>0,1).otherwise(0))
    3. val parch_survived_count=df4.groupBy("Parch_label","Survived").count()
    4. parch_survived_count.show()
    5. parch_survived_count.coalesce(1).write.option("header", "true").csv("data/practice1/parch_survived_count.csv")

    运行结果:

    1. +-----------+--------+-----+
    2. |Parch_label|Survived|count|
    3. +-----------+--------+-----+
    4. | 1| 0| 104|
    5. | 1| 1| 109|
    6. | 0| 0| 445|
    7. | 0| 1| 233|
    8. +-----------+--------+-----+

    6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况

    1. // 6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况
    2. val df3=survived_df.withColumn("Age_label",when(df2("Age")<=18,"minor").when(df2("Age")>18 && df2("Age")<=35,"young").when(df2("Age")>35 && df2("Age")<=55,"middle").otherwise("older"))
    3. val age_survived=df3.groupBy("Age_label","Survived").count()
    4. age_survived.show()
    5. age_survived.coalesce(1).write.option("header", "true").csv("data/practice1/age_survived.csv")

    运行结果:

    1. +---------+--------+-----+
    2. |Age_label|Survived|count|
    3. +---------+--------+-----+
    4. | young| 1| 189|
    5. | older| 1| 12|
    6. | minor| 1| 70|
    7. | middle| 1| 71|
    8. +---------+--------+-----+

    7. 提取乘客等级和上船费用信息

    1. // 7.提取乘客等级和上船费用信息
    2. val sef = Seq("Pclass", "Fare")
    3. val df5 = df2.select(sef.head, sef.tail: _*)
    4. df5.show(5)
    5. df5.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_fare.csv")

    运行结果:

    1. +------+-------+
    2. |Pclass| Fare|
    3. +------+-------+
    4. | 3| 7.25|
    5. | 1|71.2833|
    6. | 3| 7.925|
    7. | 1| 53.1|
    8. | 3| 8.05|
    9. +------+-------+
    10. only showing top 5 rows

    四、数据可视化

    数据可视化部分打算在学完 R 语言再完成,Python 实现后续更新。

  • 相关阅读:
    GHostNet网络最通俗易懂的解读【不接受反驳】
    SchedulingConfigurer教程,怎么使用Spring自带的可扩展定时任务调度接口
    NLP 学习之:2022.10.27 阶段性文章总结和筛选
    LED灯亮灭
    Android JetPack~LiveData(二) 数据倒灌问题
    ThreadLocal 源码解析
    局域网监控软件如何防止数据泄密
    批量插入通话记录
    hadoop fs,hadoop dfs以及hdfs dfs区别
    【2023年11月第四版教材】专题3 - 10大管理49过程输入、输出、工具和技术总结
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/133064194