昨天实验课试着做了一个 Spark SQL 小案例,发现好多内容还是没有掌握,以及好多书上没有的内容需要学习。
csv 文件内容部分数据展示:
- PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
- 1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
- 2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
- 3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
- 4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
- 5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
- 6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
• PassengerId : 乘客编号。
• Survived : 是否存活,0表示未能存活,1表示存活。
• Pclass : 描述乘客所属的等级,总共分为三等,用1、2、3来描述:1表示高等;2表示中等;3表示低等。
• Name : 乘客姓名。
• Sex : 乘客性别。
• Age : 乘客年龄。
• SibSp : 与乘客同行的兄弟姐妹(Siblings)和配偶(Spouse)数目。
• Parch : 与乘客同行的家长(Parents)和孩子(Children)数目。
• Ticket : 乘客登船所使用的船票编号。
• Fare : 乘客上船的花费。
• Cabin : 乘客所住的船舱。
• Embarked : 乘客上船时的港口,C表示Cherbourg;Q表示Queenstown;S表示Southampton。
- // 创建 SparkSession 对象
- val conf = new SparkConf().setMaster("local[*]").setAppName("practice1")
- val spark = SparkSession.builder()
- .config(conf)
- .getOrCreate()
-
- // 导入隐式转换相关依赖
- import spark.implicits._
-
- // 读取csv文件生成 DataFrame 对象
- val df = spark.read.format("csv")
- .option("header","true")
- .option("mode","DROPMALFORMED")
- .load("data/practice1/titanic.csv")
DataFrame 读取进来的都是 StringType 类型,我们需要对部分字段进行修改。
'withColumn'是一个DataFrame转换函数,用于在现有的DataFrame上添加或替换列。这个函数接收两个参数,第一个是新列的名称,第二个是新列的值。对于新列的值,我们使用 cast 方法将它强制转为一个新的类型。
cast方法用于将一个数据类型的值转换为另一个数据类型。它可以用于将一种数据类型转换为另一种数据类型,例如将字符串转换为整数或将整数转换为浮点数等。
withColumn 作为一个转换函数会返回一个新的 DataFrame 对象,记得通过变量或常量存储起来。
- // 修改字段数据类型
- val md_df = df.withColumn("Pclass", df("Pclass").cast(IntegerType)) // 乘客登记 包括1-2-3三个等级
- .withColumn("Survived", df("Survived").cast(IntegerType)) //是否存活-1存活 0-未能存活
- .withColumn("Age", df("Age").cast(DoubleType)) // 年龄
- .withColumn("SibSp", df("SibSp").cast(IntegerType)) // 乘客的兄弟姐妹和配偶的数量
- .withColumn("Parch", df("Parch").cast(IntegerType)) //乘客的家长和孩子数目
- .withColumn("Fare", df("Fare").cast(DoubleType)) // 上传的花费
- // 删除不必要的字段
- val df1 = md_df.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin")
用到的函数:
DSL 语句中的 select、where函数,以及 count 、zip 函数。
涉及到的操作:
RDD 对象转为 DataFrame 对象,这里因为RDD对象的内容是元组,所以可以直接调用 toDF 方法。
- // 缺失值处理
- val columns: Array[String] = df1.columns //返回df1的字段组成的数组 Array("字段1","字段2","字段3"...)
- // 通过select方法对字段数组中的每一个字段进行搜索,并通过where方法找出满足列col(字段).isNUll的值的count(个数)
- val missing_cnt: Array[Long] = columns.map(field => df1.select(col(field)).where(col(field).isNull).count())
- // 通过zip方法将两个集合数组合并成一个元组
- val tuples: Array[(Long, String)] = missing_cnt.zip(columns)
- // 把生成的元组读取为RDD对象再转为DataFrame对象
- val result_df: DataFrame = spark.sparkContext.parallelize(tuples).toDF("missing_cnt", "column_name")
- result_df.show() // 统计缺失值
统计结果展示:
- +-----------+-----------+
- |missing_cnt|column_name|
- +-----------+-----------+
- | 0| Survived|
- | 0| Pclass|
- | 0| Sex|
- | 177| Age|
- | 0| SibSp|
- | 0| Parch|
- | 0| Fare|
- | 2| Embarked|
- +-----------+-----------+
- // 处理缺失值函数
- def meanAge(dataFrame: DataFrame): Double = {
- dataFrame.select("Age")
- .na.drop() //删除 Age 为空的行
- //'round' 函数用于将数字四舍五入到指定的小数位数。'mean' 函数则用于计算一组数值的平均值。
- .agg(round(mean("Age"), 0)) //对'Age'列计算平均值,并保留0位小数,也就是取整
- .first() //由于agg操作返回的是一个DataFrame,而这个DataFrame只有一行,所以使用first()方法获取这一行。
- .getDouble(0) //从结果行中获取第一个字段(索引为0)的值,并将其转换为Double类型。
- }
处理:
- val df2 = df1.na.fill(Map("Age" -> meanAge(df1), "Embarked" -> "S"))
- df2.show()
处理结果展示:
- +--------+------+------+----+-----+-----+-------+--------+
- |Survived|Pclass| Sex| Age|SibSp|Parch| Fare|Embarked|
- +--------+------+------+----+-----+-----+-------+--------+
- | 0| 3| male|22.0| 1| 0| 7.25| S|
- | 1| 1|female|38.0| 1| 0|71.2833| C|
- | 1| 3|female|26.0| 0| 0| 7.925| S|
- | 1| 1|female|35.0| 1| 0| 53.1| S|
- | 0| 3| male|35.0| 0| 0| 8.05| S|
- | 0| 3| male|30.0| 0| 0| 8.4583| Q|
- | 0| 1| male|54.0| 0| 0|51.8625| S|
- | 0| 3| male| 2.0| 3| 1| 21.075| S|
- | 1| 3|female|27.0| 0| 2|11.1333| S|
- | 1| 2|female|14.0| 1| 0|30.0708| C|
- | 1| 3|female| 4.0| 1| 1| 16.7| S|
- | 1| 1|female|58.0| 0| 0| 26.55| S|
- | 0| 3| male|20.0| 0| 0| 8.05| S|
- | 0| 3| male|39.0| 1| 5| 31.275| S|
- | 0| 3|female|14.0| 0| 0| 7.8542| S|
- | 1| 2|female|55.0| 0| 0| 16.0| S|
- | 0| 3| male| 2.0| 4| 1| 29.125| Q|
- | 1| 2| male|30.0| 0| 0| 13.0| S|
- | 0| 3|female|31.0| 1| 0| 18.0| S|
- | 1| 3|female|30.0| 0| 0| 7.225| C|
- +--------+------+------+----+-----+-----+-------+--------+
- only showing top 20 rows
- // 1.891人当中,共多少人生还?
- val survived_count: DataFrame = df2.groupBy("Survived").count()
- survived_count.show()
- //保存结果到本地
- survived_count.coalesce(1).write.option("header","true").csv("output/practice1/survived_count.csv")
运行结果:
- +--------+-----+
- |Survived|count|
- +--------+-----+
- | 1| 342|
- | 0| 549|
- +--------+-----+
- // 2.不同上船港口生还情况
- val survived_embark = df2.groupBy("Embarked", "Survived").count()
- survived_embark.show()
- survived_embark.coalesce(1).write.option("header","true").csv("data/practice1survived_embark.csv")
运行结果:
- +--------+--------+-----+
- |Embarked|Survived|count|
- +--------+--------+-----+
- | Q| 1| 30|
- | S| 0| 427|
- | S| 1| 219|
- | C| 1| 93|
- | Q| 0| 47|
- | C| 0| 75|
- +--------+--------+-----+
- // 3.存活/未存活的男女数量及比例
- val survived_sex_count=df2.groupBy("Sex","Survived").count()
- val survived_sex_percent=survived_sex_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
- survived_sex_percent.show()
- survived_sex_percent.coalesce(1).write.option("header", "true").csv("data/practice1/survived_sex_percent.csv")
运行结果:
- +------+--------+-----+--------+
- | Sex|Survived|count| percent|
- +------+--------+-----+--------+
- | male| 0| 468|52.52525|
- |female| 1| 233|26.15039|
- |female| 0| 81| 9.09091|
- | male| 1| 109|12.23345|
- +------+--------+-----+--------+
- // 4. 不同级别乘客生还人数和占总生还人数的比例
- val survived_df = df2.filter(col("Survived")===1)
- val pclass_survived_count=survived_df.groupBy("Pclass").count()
- val pclass_survived_percent=pclass_survived_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
- pclass_survived_percent.show()
- pclass_survived_percent.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_survived_percent.csv")
运行结果:
- +------+-----+--------+
- |Pclass|count| percent|
- +------+-----+--------+
- | 1| 136|39.76608|
- | 3| 119|34.79532|
- | 2| 87|25.43860|
- +------+-----+--------+
- // 5.有无同行父母/孩子的生还情况
- val df4=df2.withColumn("Parch_label",when(df2("Parch")>0,1).otherwise(0))
- val parch_survived_count=df4.groupBy("Parch_label","Survived").count()
- parch_survived_count.show()
- parch_survived_count.coalesce(1).write.option("header", "true").csv("data/practice1/parch_survived_count.csv")
运行结果:
- +-----------+--------+-----+
- |Parch_label|Survived|count|
- +-----------+--------+-----+
- | 1| 0| 104|
- | 1| 1| 109|
- | 0| 0| 445|
- | 0| 1| 233|
- +-----------+--------+-----+
- // 6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况
- val df3=survived_df.withColumn("Age_label",when(df2("Age")<=18,"minor").when(df2("Age")>18 && df2("Age")<=35,"young").when(df2("Age")>35 && df2("Age")<=55,"middle").otherwise("older"))
- val age_survived=df3.groupBy("Age_label","Survived").count()
- age_survived.show()
- age_survived.coalesce(1).write.option("header", "true").csv("data/practice1/age_survived.csv")
运行结果:
- +---------+--------+-----+
- |Age_label|Survived|count|
- +---------+--------+-----+
- | young| 1| 189|
- | older| 1| 12|
- | minor| 1| 70|
- | middle| 1| 71|
- +---------+--------+-----+
- // 7.提取乘客等级和上船费用信息
- val sef = Seq("Pclass", "Fare")
- val df5 = df2.select(sef.head, sef.tail: _*)
- df5.show(5)
- df5.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_fare.csv")
运行结果:
- +------+-------+
- |Pclass| Fare|
- +------+-------+
- | 3| 7.25|
- | 1|71.2833|
- | 3| 7.925|
- | 1| 53.1|
- | 3| 8.05|
- +------+-------+
- only showing top 5 rows
数据可视化部分打算在学完 R 语言再完成,Python 实现后续更新。