• Spark 6:Spark SQL DataFrame


    SparkSQL 是Spark的一个模块, 用于处理海量结构化数据。

    148dc251c91f40cba8a316745b65d22f.png

    SparkSQL是用于处理大规模结构化数据的计算引擎
    SparkSQL在企业中广泛使用,并性能极好
    SparkSQL:使用简单、API统一、兼容HIVE、支持标准化JDBC和ODBC连接
    SparkSQL 2014年正式发布,当下使用最多的2.0版Spark发布于2016年,当下使用的最新3.0办发布于2019年

    SparkSQL和Hive的异同

    Hive和Spark 均是:“分布式SQL计算引擎”。均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。

    6ba76b7a40ce4f2fa613c19c5462a422.png

    SparkSQL的数据抽象

    5834d1c0b6a044d5b7bc3c0f945fb038.png

    Pandas - DataFrame
    • 二维表数据结构
    • 单机(本地)集合
    SparkCore - RDD
    • 无标准数据结构,存储什么数据均可
    • 分布式集合(分区)
    SparkSQL - DataFrame
    • 二维表数据结构
    • 分布式集合(分区) 

    6abaa6bf1efa46669218ee9c13e0133b.png

    SparkSQL 其实有3类数据抽象对象
    • SchemaRDD对象(已废弃)
    • DataSet对象:可用于Java、Scala语言
    • DataFrame对象:可用于Java、Scala、Python、R
    以Python开发SparkSQL,主要使用的就是DataFrame对象作为核心数据结构 

    DataFrame概述

    RDD:有分区的、弹性的、分布式的、存储任意结构数据
    DataFrame:有分区的、弹性的、分布式的、存储二维表结构数据

    DataFrame和RDD都是:弹性的、分布式的、数据集。只是,DataFrame存储的数据结构“限定”为:二维表结构化数据;而RDD可以存储的数据则没有任何限制,想处理什么就处理什么。

    假定有如下数据集

    1562823cb9f34eb1a93105fc1bec7192.png

    DataFrame按二维表格存储

    7a1ba11e2841496b9218a4fa37fb751e.png

    RDD按数组对象存储

    2431552393704aecb38f9084fa8c8a77.png

    SparkSession对象
    在RDD阶段,程序的执行入口对象是: SparkContext
    在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
    SparkSession对象可以:
    - 用于SparkSQL编程作为入口对象
    - 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
    所以,后续的代码,执行环境入口对象,统一变更为SparkSession对象

    47b303f5b5764750abb6de331008b8f9.png

    构建SparkSession核心代码

    有如下数据集:列1ID,列2学科,列3分数

    859c4a3d46764fd5ac5d05b1e4c30012.png

    数据集文件:资料\data\sql\stu_score.txt

    需求:读取文件,找出学科为“语文”的数据,并限制输出5条where subject = '语文' limit 5
    代码如下:

    1. # coding:utf8
    2. # SparkSession对象的导包, 对象是来自于 pyspark.sql包中
    3. from pyspark.sql import SparkSession
    4. if __name__ == '__main__':
    5. # 构建SparkSession执行环境入口对象
    6. spark = SparkSession.builder.\
    7. appName("test").\
    8. master("local[*]").\
    9. getOrCreate()
    10. # 通过SparkSession对象 获取 SparkContext对象
    11. sc = spark.sparkContext
    12. # SparkSQL的HelloWorld
    13. df = spark.read.csv("../data/input/stu_score.txt", sep=',', header=False)
    14. df2 = df.toDF("id", "name", "score")
    15. df2.printSchema()
    16. df2.show()
    17. df2.createTempView("score")
    18. # SQL 风格
    19. spark.sql("""
    20. SELECT * FROM score WHERE name='语文' LIMIT 5
    21. """).show()
    22. # DSL 风格
    23. df2.where("name='语文'").limit(5).show()

    SparkSQL 和 Hive同样,都是用于大规模SQL分布式计算的计算框架,均可以运行在YARN之上,在企业中广泛被应用。
    SparkSQL的数据抽象为:SchemaRDD(废弃)、DataFrame(Python、R、Java、Scala)、DataSet(Java、Scala)。
    DataFrame同样是分布式数据集,有分区可以并行计算,和RDD不同的是,DataFrame中存储的数据结构是以表格形式组织的,方便进行SQL计算。
    DataFrame对比DataSet基本相同,不同的是DataSet支持泛型特性,可以让Java、Scala语言更好的利用到。
    SparkSession是2.0后退出的新执行环境入口对象,可以用于RDD、SQL等编程。

    DataFrame的组成
    DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:
    • 行
    • 列
    • 表结构描述
    比如,在MySQL中的一张表:
    • 由许多行组成
    • 数据也被分成多个列
    • 表也有表结构信息(列、列名、列类型、列约束等)

    基于这个前提,DataFrame的组成如下:
    在结构层面:
    - StructType对象描述整个DataFrame的表结构
    - StructField对象描述一个列的信息
    在数据层面:
    - Row对象记录一行数据
    - Column对象记录一列数据并包含列的信息

    562f7dc317bd41569d6f15e07dc4829d.png

    如图, 在表结构层面,DataFrame的表结构由:
    StructType描述,如下图

    c4fae3b6f4dd40a1b1ac87713628c404.png
    一个StructField记录:列名、列类型、列是否运行为空
    多个StructField组成一个StructType对象。
    一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空。同时,一行数据描述为Row对象,如Row(1, 张三, 11)
    一列数据描述为Column对象,Column对象包含一列数据和列的信息

    DataFrame的代码构建 - 基于RDD方式1 

    DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构。

    通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame
    这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)

    1. # coding:utf8
    2. from pyspark.sql import SparkSession
    3. if __name__ == '__main__':
    4. # 0. 构建执行环境入口对象SparkSession
    5. spark = SparkSession.builder.\
    6. appName("test").\
    7. master("local[*]").\
    8. getOrCreate()
    9. sc = spark.sparkContext
    10. # 基于RDD转换成DataFrame
    11. rdd = sc.textFile("../data/input/sql/people.txt").\
    12. map(lambda x: x.split(",")).\
    13. map(lambda x: (x[0], int(x[1])))
    14. # 构建DataFrame对象
    15. # 参数1 被转换的RDD
    16. # 参数2 指定列名, 通过list的形式指定, 按照顺序依次提供字符串名称即可
    17. df = spark.createDataFrame(rdd, schema=['name', 'age'])
    18. # 打印DataFrame的表结构
    19. df.printSchema()
    20. # 打印df中的数据
    21. # 参数1 表示 展示出多少条数据, 默认不传的话是20
    22. # 参数2 表示是否对列进行截断, 如果列的数据长度超过20个字符串长度, 后续的内容不显示以...代替
    23. # 如果给False 表示不阶段全部显示, 默认是True
    24. df.show(20, False)
    25. # 将DF对象转换成临时视图表, 可供sql语句查询
    26. df.createOrReplaceTempView("people")
    27. spark.sql("SELECT * FROM people WHERE age < 30").show()

    DataFrame的代码构建 - 基于RDD方式2

    通过StructType对象来定义DataFrame的“表结构”转换RDD

    1. # coding:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import StructType, StringType, IntegerType
    4. if __name__ == '__main__':
    5. # 0. 构建执行环境入口对象SparkSession
    6. spark = SparkSession.builder.\
    7. appName("test").\
    8. master("local[*]").\
    9. getOrCreate()
    10. sc = spark.sparkContext
    11. # 基于RDD转换成DataFrame
    12. rdd = sc.textFile("../data/input/sql/people.txt").\
    13. map(lambda x: x.split(",")).\
    14. map(lambda x: (x[0], int(x[1])))
    15. # 构建表结构的描述对象: StructType对象
    16. schema = StructType().add("name", StringType(), nullable=True).\
    17. add("age", IntegerType(), nullable=False)
    18. # 基于StructType对象去构建RDD到DF的转换
    19. df = spark.createDataFrame(rdd, schema=schema)
    20. df.printSchema()
    21. df.show()

    使用RDD的toDF方法转换RDD

    1. # coding:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import StructType, StringType, IntegerType
    4. if __name__ == '__main__':
    5. # 0. 构建执行环境入口对象SparkSession
    6. spark = SparkSession.builder.\
    7. appName("test").\
    8. master("local[*]").\
    9. getOrCreate()
    10. sc = spark.sparkContext
    11. # 基于RDD转换成DataFrame
    12. rdd = sc.textFile("../data/input/sql/people.txt").\
    13. map(lambda x: x.split(",")).\
    14. map(lambda x: (x[0], int(x[1])))
    15. # toDF的方式构建DataFrame
    16. df1 = rdd.toDF(["name", "age"])
    17. df1.printSchema()
    18. df1.show()
    19. # toDF的方式2 通过StructType来构建
    20. schema = StructType().add("name", StringType(), nullable=True).\
    21. add("age", IntegerType(), nullable=False)
    22. df2 = rdd.toDF(schema=schema)
    23. df2.printSchema()
    24. df2.show()

    将Pandas的DataFrame对象,转变为分布式的SparkSQL DataFrame对象

    1. # coding:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import StructType, StringType, IntegerType
    4. import pandas as pd
    5. if __name__ == '__main__':
    6. # 0. 构建执行环境入口对象SparkSession
    7. spark = SparkSession.builder.\
    8. appName("test").\
    9. master("local[*]").\
    10. getOrCreate()
    11. sc = spark.sparkContext
    12. # 基于Pandas的DataFrame构建SparkSQL的DataFrame对象
    13. pdf = pd.DataFrame(
    14. {
    15. "id": [1, 2, 3],
    16. "name": ["张大仙", "王晓晓", "吕不为"],
    17. "age": [11, 21, 11]
    18. }
    19. )
    20. df = spark.createDataFrame(pdf)
    21. df.printSchema()
    22. df.show()

    DataFrame的代码构建 - 读取外部数据
    通过SparkSQL的统一API进行数据读取构建DataFrame
    统一API示例代码:

    1. # coding:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import StructType, StringType, IntegerType
    4. import pandas as pd
    5. if __name__ == '__main__':
    6. # 0. 构建执行环境入口对象SparkSession
    7. spark = SparkSession.builder.\
    8. appName("test").\
    9. master("local[*]").\
    10. getOrCreate()
    11. sc = spark.sparkContext
    12. # 构建StructType, text数据源, 读取数据的特点是, 将一整行只作为`一个列`读取, 默认列名是value 类型是String
    13. schema = StructType().add("data", StringType(), nullable=True)
    14. df = spark.read.format("text").\
    15. schema(schema=schema).\
    16. load("../data/input/sql/people.txt")
    17. df.printSchema()
    18. df.show()

    读取text数据源:使用format(“text”)读取文本数据,读取到的DataFrame只会有一个列,列名默认称之为:value

    1. schema = StructType().add("data", StringType(), nullable=True)
    2. df = spark.read.format("text")\
    3. .schema(schema)\
    4. .load("../data/sql/people.txt")

    读取json数据源
    使用format(“json”)读取json数据
    示例代码:

    1. # coding:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import StructType, StringType, IntegerType
    4. import pandas as pd
    5. if __name__ == '__main__':
    6. # 0. 构建执行环境入口对象SparkSession
    7. spark = SparkSession.builder.\
    8. appName("test").\
    9. master("local[*]").\
    10. getOrCreate()
    11. sc = spark.sparkContext
    12. # JSON类型自带有Schema信息
    13. df = spark.read.format("json").load("../data/input/sql/people.json")
    14. df.printSchema()
    15. df.show()

    读取csv数据源
    使用format(“csv”)读取csv数据
    示例代码:

    1. # coding:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import StructType, StringType, IntegerType
    4. import pandas as pd
    5. if __name__ == '__main__':
    6. # 0. 构建执行环境入口对象SparkSession
    7. spark = SparkSession.builder.\
    8. appName("test").\
    9. master("local[*]").\
    10. getOrCreate()
    11. sc = spark.sparkContext
    12. # 读取CSV文件
    13. df = spark.read.format("csv").\
    14. option("sep", ";").\
    15. option("header", True).\
    16. option("encoding", "utf-8").\
    17. schema("name STRING, age INT, job STRING").\
    18. load("../data/input/sql/people.csv")
    19. df.printSchema()
    20. df.show()

    读取parquet数据源
    使用format(“parquet”)读取parquet数据

    parquet: 是Spark中常用的一种列式存储文件格式。和Hive中的ORC差不多, 他俩都是列存储格式。parquet对比普通的文本文件的区别:
    ● parquet 内置schema (列名\ 列类型\ 是否为空)
    ● 存储是以列作为存储格式
    ● 存储是序列化存储在文件中的(有压缩属性体积小)
    Parquet文件不能直接打开查看,如果想要查看内容,可以在PyCharm中安装如下插件来查看:

    52b27459f9ec4f1ea28eccdf586a7e4a.png
    示例代码:

    1. # coding:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import StructType, StringType, IntegerType
    4. import pandas as pd
    5. if __name__ == '__main__':
    6. # 0. 构建执行环境入口对象SparkSession
    7. spark = SparkSession.builder.\
    8. appName("test").\
    9. master("local[*]").\
    10. getOrCreate()
    11. sc = spark.sparkContext
    12. # 读取parquet类型的文件
    13. df = spark.read.format("parquet").load("../data/input/sql/users.parquet")
    14. df.printSchema()
    15. df.show()

    DataFrame的入门操作
    DataFrame支持两种风格进行编程,分别是:
    • DSL风格
    • SQL风格

    DSL语法风格
    DSL称之为:领域特定语言。
    其实就是指DataFrame的特有API
    DSL风格意思就是以调用API的方式来处理Data
    比如:df.where().limit()
    SQL语法风格
    SQL风格就是使用SQL语句处理DataFrame的数据
    比如:spark.sql(“SELECT * FROM xxx)

    DSL - show 方法
    功能:展示DataFrame中的数据, 默认展示20条
    语法:

    df.show(参数1, 参数2)
    - 参数1: 默认是20, 控制展示多少条
    - 参数2: 是否阶段列, 默认只输出20个字符的长度, 过长不显示, 要显示的话 请填入 truncate = True

    如图,某个df.show后的展示结果:

    b07756284cf042a5a088d5bce137beed.png

    DSL - printSchema方法
    功能:打印输出df的schema信息
    语法:

    df.printSchema()

    807e126f6bb04cdbabef5ea769995109.png

    DSL - select
    功能:选择DataFrame中的指定列(通过传入参数进行指定)
    语法:

    df.select()

    可传递:
    • 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串
    列名来指定列
    • List[Column]对象或者List[str]对象, 用来选择多个列

    ef2170ebaae2414c98a5e46f1ab344e8.png

    1. # coding:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import StructType, StringType, IntegerType
    4. import pandas as pd
    5. if __name__ == '__main__':
    6. # 0. 构建执行环境入口对象SparkSession
    7. spark = SparkSession.builder.\
    8. appName("test").\
    9. master("local[*]").\
    10. getOrCreate()
    11. sc = spark.sparkContext
    12. df = spark.read.format("csv").\
    13. schema("id INT, subject STRING, score INT").\
    14. load("../data/input/sql/stu_score.txt")
    15. # Column对象的获取
    16. id_column = df['id']
    17. subject_column = df['subject']
    18. # DLS风格演示
    19. df.select(["id", "subject"]).show()
    20. df.select("id", "subject").show()
    21. df.select(id_column, subject_column).show()

    DSL - filter和where
    功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame
    语法: 

    df.filter()
    df.where()
    where和filter功能上是等价的

    c62c55be968548a89a6e9750f0631d91.png

    1. # filter API
    2. df.filter("score < 99").show()
    3. df.filter(df['score'] < 99).show()
    4. # where API
    5. df.where("score < 99").show()
    6. df.where(df['score'] < 99).show()

    DSL - groupBy 分组
    功能:按照指定的列进行数据的分组, 返回值是GroupedData对象
    语法:

    df.groupBy()

    传入参数和select一样,支持多种形式,不管怎么传意思就是告诉spark按照哪个列分组

    6ed97e39542c4d3983bf75baa2eca87d.png

    1. # group By API
    2. df.groupBy("subject").count().show()
    3. df.groupBy(df['subject']).count().show()

    GroupedData对象
    GroupedData对象是一个特殊的DataFrame数据集
    其类全名:
    这个对象是经过groupBy后得到的返回值, 内部记录了 以分组形式存储的数据
    GroupedData对象其实也有很多API,比如前面的count方法就是这个对象的内置方法
    除此之外,像:min、max、avg、sum、等等许多方法都存在

    SQL风格语法 - 注册DataFrame成为表
    DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中
    使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。
    如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:

    1. # 注册成临时表
    2. df.createTempView("score") # 注册临时视图(表)
    3. df.createOrReplaceTempView("score_2") # 注册 或者 替换 临时视图
    4. df.createGlobalTempView("score_3") # 注册全局临时视图 全局临时视图在使用的时候 需要在前面带上global_temp. 前缀

    f950dbb5e36a4d62ae1ab4c8d5b88657.png

    SQL风格语法 - 使用SQL查询

    8472c6099b9e42fa9cc152772dace233.png

    1. # 可以通过SparkSession对象的sql api来完成sql语句的执行
    2. spark.sql("SELECT subject, COUNT(*) AS cnt FROM score GROUP BY subject").show()
    3. spark.sql("SELECT subject, COUNT(*) AS cnt FROM score_2 GROUP BY subject").show()
    4. spark.sql("SELECT subject, COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()

    pyspark.sql.functions 包
    PySpark提供了一个包: pyspark.sql.functions
    这个包里面提供了 一系列的计算函数供SparkSQL使用
    如何用呢?
    导包

    from pyspark.sql import functions as F

    然后就可以用F对象调用函数计算了。
    这些功能函数, 返回值多数都是Column对象。

    词频统计案例练习
    单词计数需求,使用DSL和SQL两种风格来实现。

    1. # coding:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import StructType, StringType, IntegerType
    4. import pandas as pd
    5. from pyspark.sql import functions as F
    6. if __name__ == '__main__':
    7. # 0. 构建执行环境入口对象SparkSession
    8. spark = SparkSession.builder.\
    9. appName("test").\
    10. master("local[*]").\
    11. getOrCreate()
    12. sc = spark.sparkContext
    13. # TODO 1: SQL 风格进行处理
    14. rdd = sc.textFile("../data/input/words.txt").\
    15. flatMap(lambda x: x.split(" ")).\
    16. map(lambda x: [x])
    17. df = rdd.toDF(["word"])
    18. # 注册DF为表格
    19. df.createTempView("words")
    20. spark.sql("SELECT word, COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show()
    21. # TODO 2: DSL 风格处理
    22. df = spark.read.format("text").load("../data/input/words.txt")
    23. # withColumn方法
    24. # 方法功能: 对已存在的列进行操作, 返回一个新的列, 如果名字和老列相同, 那么替换, 否则作为新列存在
    25. df2 = df.withColumn("value", F.explode(F.split(df['value'], " ")))
    26. df2.groupBy("value").\
    27. count().\
    28. withColumnRenamed("value", "word").\
    29. withColumnRenamed("count", "cnt").\
    30. orderBy("cnt", ascending=False).\
    31. show()

    电影评分数据分析案例

    f83fd35e786b4478ac848e83f3b74b86.png

    6352d5c13e1e4a76b6781a4bf2b70484.png

    1. # coding:utf8
    2. import time
    3. from pyspark.sql import SparkSession
    4. from pyspark.sql.types import StructType, StringType, IntegerType
    5. import pandas as pd
    6. from pyspark.sql import functions as F
    7. if __name__ == '__main__':
    8. # 0. 构建执行环境入口对象SparkSession
    9. spark = SparkSession.builder.\
    10. appName("test").\
    11. master("local[*]").\
    12. getOrCreate()
    13. sc = spark.sparkContext
    14. # 1. 读取数据集
    15. schema = StructType().add("user_id", StringType(), nullable=True).\
    16. add("movie_id", IntegerType(), nullable=True).\
    17. add("rank", IntegerType(), nullable=True).\
    18. add("ts", StringType(), nullable=True)
    19. df = spark.read.format("csv").\
    20. option("sep", "\t").\
    21. option("header", False).\
    22. option("encoding", "utf-8").\
    23. schema(schema=schema).\
    24. load("../data/input/sql/u.data")
    25. # TODO 1: 用户平均分
    26. df.groupBy("user_id").\
    27. avg("rank").\
    28. withColumnRenamed("avg(rank)", "avg_rank").\
    29. withColumn("avg_rank", F.round("avg_rank", 2)).\
    30. orderBy("avg_rank", ascending=False).\
    31. show()
    32. # TODO 2: 电影的平均分查询
    33. df.createTempView("movie")
    34. spark.sql("""
    35. SELECT movie_id, ROUND(AVG(rank), 2) AS avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC
    36. """).show()
    37. # TODO 3: 查询大于平均分的电影的数量 # Row
    38. print("大于平均分电影的数量: ", df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())
    39. # TODO 4: 查询高分电影中(>3)打分次数最多的用户, 此人打分的平均分
    40. # 先找出这个人
    41. user_id = df.where("rank > 3").\
    42. groupBy("user_id").\
    43. count().\
    44. withColumnRenamed("count", "cnt").\
    45. orderBy("cnt", ascending=False).\
    46. limit(1).\
    47. first()['user_id']
    48. # 计算这个人的打分平均分
    49. df.filter(df['user_id'] == user_id).\
    50. select(F.round(F.avg("rank"), 2)).show()
    51. # TODO 5: 查询每个用户的平局打分, 最低打分, 最高打分
    52. df.groupBy("user_id").\
    53. agg(
    54. F.round(F.avg("rank"), 2).alias("avg_rank"),
    55. F.min("rank").alias("min_rank"),
    56. F.max("rank").alias("max_rank")
    57. ).show()
    58. # TODO 6: 查询评分超过100次的电影, 的平均分 排名 TOP10
    59. df.groupBy("movie_id").\
    60. agg(
    61. F.count("movie_id").alias("cnt"),
    62. F.round(F.avg("rank"), 2).alias("avg_rank")
    63. ).where("cnt > 100").\
    64. orderBy("avg_rank", ascending=False).\
    65. limit(10).\
    66. show()
    67. time.sleep(10000)
    68. """
    69. 1. agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合
    70. 2. alias: 它是Column对象的API, 可以针对一个列 进行改名
    71. 3. withColumnRenamed: 它是DataFrame的API, 可以对DF中的列进行改名, 一次改一个列, 改多个列 可以链式调用
    72. 4. orderBy: DataFrame的API, 进行排序, 参数1是被排序的列, 参数2是 升序(True) 或 降序 False
    73. 5. first: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象.
    74. # Row对象 就是一个数组, 你可以通过row['列名'] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等)
    75. """

    SparkSQL Shuffle 分区数目

    a257e6d69044428aae624620f9901faf.png

    1. # 0. 构建执行环境入口对象SparkSession
    2. spark = SparkSession.builder.\
    3. appName("test").\
    4. master("local[*]").\
    5. config("spark.sql.shuffle.partitions", 2).\
    6. getOrCreate()
    7. sc = spark.sparkContext
    8. """
    9. spark.sql.shuffle.partitions 参数指的是, 在sql计算中, shuffle算子阶段默认的分区数是200个.
    10. 对于集群模式来说, 200个默认也算比较合适
    11. 如果在local下运行, 200个很多, 在调度上会带来额外的损耗
    12. 所以在local下建议修改比较低 比如2\4\10均可
    13. 这个参数和Spark RDD中设置并行度的参数 是相互独立的.
    14. """

    SparkSQL 数据清洗API

    57ae7d1d5cb54302a0ecab976297eead.png

    1. df.dropDuplicates().show()
    2. df.dropDuplicates(['age', 'job']).show()

     a3a215af60ec41429748e429c56e36fb.png

    1. df.dropna().show()
    2. # # thresh = 3表示, 最少满足3个有效列, 不满足 就删除当前行数据
    3. df.dropna(thresh=3).show()
    4. df.dropna(thresh=2, subset=['name', 'age']).show()

    64438ecc0a584588ac8a653eda93b3f5.png

    1. # 缺失值处理也可以完成对缺失值进行填充
    2. # DataFrame的 fillna 对缺失的列进行填充
    3. df.fillna("loss").show()
    4. # 指定列进行填充
    5. df.fillna("N/A", subset=['job']).show()
    6. # 设定一个字典, 对所有的列 提供填充规则
    7. df.fillna({"name": "未知姓名", "age": 1, "job": "worker"}).show()

    DataFrame数据写出

    00ee5a7503a941368bbddb92447e00c2.png

    f52b2f3c5bc2411690981e93b0d01bde.png

     c65fb09aab28467fa77b20a911b55f1e.png

    1. # Write text 写出, 只能写出一个列的数据, 需要将df转换为单列df
    2. df.select(F.concat_ws("---", "user_id", "movie_id", "rank", "ts")).\
    3. write.\
    4. mode("overwrite").\
    5. format("text").\
    6. save("../data/output/sql/text")
    7. # Write csv
    8. df.write.mode("overwrite").\
    9. format("csv").\
    10. option("sep", ";").\
    11. option("header", True).\
    12. save("../data/output/sql/csv")
    13. # Write json
    14. df.write.mode("overwrite").\
    15. format("json").\
    16. save("../data/output/sql/json")
    17. # Write parquet
    18. df.write.mode("overwrite").\
    19. format("parquet").\
    20. save("../data/output/sql/parquet")

    DataFrame 通过JDBC读写数据库(MySQL示例)

    1. # 1. 写出df到mysql数据库中
    2. df.write.mode("overwrite").\
    3. format("jdbc").\
    4. option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true").\
    5. option("dbtable", "movie_data").\
    6. option("user", "root").\
    7. option("password", "2212072ok1").\
    8. save()

    3ce460d3d77e487b9214ddb91de2cc4d.png

    1. # 2. 从mysql数据库中读df
    2. df2 = spark.read.format("jdbc"). \
    3. option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true"). \
    4. option("dbtable", "movie_data"). \
    5. option("user", "root"). \
    6. option("password", "2212072ok1"). \
    7. load()

    967106ad85384f3c89f40a2aa9121358.png

    DataFrame 在结构层面上由StructField组成列描述,由StructType构造表描述。在数据层面上,Column对象记录列数据,Row对象记录行数据。
    DataFrame可以从RDD转换、Pandas DF转换、读取文件、读取JDBC等方法构建。
    spark.read.format()和df.write.format() 是DataFrame读取和写出的统一化标准API。
    SparkSQL默认在Shuffle阶段200个分区,可以修改参数获得最好性能。
    dropDuplicates可以去重、dropna可以删除缺失值、fillna可以填充缺失值。
    SparkSQL支持JDBC读写,可用标准API对数据库进行读写操作。

     

     

     

  • 相关阅读:
    (八)Java算法:堆排序(详细图解)
    10.Java面向对象基础(下)
    Android11去掉Setings里的投射菜单条目
    郑州中创|业绩下滑、股价大跌……Meta为何继续坚持元宇宙
    成人职业教育:知乎、B站、网易“短兵相接”
    axios不经过全局拦截器策略
    P2432 zxbsmk爱查错,字符串线性dp
    STM32FreeRTOS任务通知(STM32cube高效开发)
    【基于python+Django的有机食品推荐系统-哔哩哔哩】 https://b23.tv/lqRcF1y
    玻璃生产过程中的窑内压力高精度恒定控制解决方案
  • 原文地址:https://blog.csdn.net/Amzmks/article/details/130454389