• Spark学习(6)-Spark SQL


    1 快速入门

    SparkSQL是Spark的一个模块, 用于处理海量结构化数据
    SparkSQL是非常成熟的 海量结构化数据处理框架.
    学习SparkSQL主要在2个点:

    • SparkSQL本身十分优秀, 支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等等。
    • 企业大面积在使用SparkSQL处理业务数据。
      • 离线开发
      • 数仓搭建
      • 科学计算
      • 数据分析

    特点:
    在这里插入图片描述

    2 SparkSQL概述

    2.1 SparkSQL和Hive的异同

    在这里插入图片描述

    2.2 SparkSQL的数据抽象

    在这里插入图片描述
    在这里插入图片描述

    2.3 SparkSession对象

    RDD阶段,程序的执行入口对象是: SparkContext
    在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
    SparkSession对象可以:

    • 用于SparkSQL编程作为入口对象。
    • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

    所以,后续执行环境入口对象,统一变更为SparkSession对象。

    在这里插入图片描述
    2.4 SparkSession对象

    # coding:utf8
    # SparkSQL 中的入口对象是SparkSession对象
    from pyspark.sql import SparkSession
    if __name__ == '__main__':
    # 构建SparkSession对象, 这个对象是 构建器模式 通过builder方法来构建
    spark = SparkSession.builder.\
    appName("local[*]").\
    config("spark.sql.shuffle.partitions", "4").\
    getOrCreate()
    # appName 设置程序名称, config设置一些常用属性
    # 最后通过getOrCreate()方法 创建SparkSession对象
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3 DataFrame入门和操作

    3.1 DataFrame的组成

    在结构层面:

    • StructType对象描述整个DataFrame的表结构。
    • StructField对象描述一个列的信息。

    在数据层面:

    • Row对象记录一行数据。
    • Column对象记录一列数据并包含列的信。

    StructType描述,如下图:
    在这里插入图片描述
    一个StructField记录:列名、列类型、列是否运行为空。
    多个StructField组成一个StructType对象。
    一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空。

    3.2 DataFrame的代码构建 - 基于RDD方式1

    DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构。

    # 首先构建一个RDD rdd[(name, age), ()]
    rdd = sc.textFile("../data/sql/people.txt").\
    map(lambda x: x.split(',')).\
    map(lambda x: [x[0], int(x[1])]) # 需要做类型转换, 因为类型从RDD中探测
    # 构建DF方式1
    df = spark.createDataFrame(rdd, schema = ['name', 'age'])
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame,这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。

    3.3 DataFrame的代码构建 - 基于RDD方式2

    通过StructType对象来定义DataFrame的“表结构”转换RDD

    # 创建DF , 首先创建RDD 将RDD转DF
    rdd = sc.textFile("../data/sql/stu_score.txt").\
    map(lambda x:x.split(',')).\
    map(lambda x:(int(x[0]), x[1], int(x[2])))
    # StructType 类
    # 这个类 可以定义整个DataFrame中的Schema
    schema = StructType().\
    add("id", IntegerType(), nullable=False).\
    add("name", StringType(), nullable=True).\
    add("score", IntegerType(), nullable=False)
    # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add, 每一个add代表一个StructField
    # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
    df = spark.createDataFrame(rdd, schema)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    3.4 DataFrame的代码构建 - 基于RDD方式3

    使用RDD的toDF方法转换RDD

    # StructType 类
    # 这个类 可以定义整个DataFrame中的Schema
    schema = StructType().\
    add("id", IntegerType(), nullable=False).\
    add("name", StringType(), nullable=True).\
    add("score", IntegerType(), nullable=False)
    # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
    # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
    # 方式1: 只传列名, 类型靠推断, 是否允许为空是true
    df = rdd.toDF(['id', 'subject', 'score'])
    df.printSchema()
    df.show()
    # 方式2: 传入完整的Schema描述对象StructType
    df = rdd.toDF(schema)
    df.printSchema()
    df.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3.5 DataFrame的代码构建 - 基于Pandas的DataFrame

    将Pandas的DataFrame对象,转变为分布式的SparkSQL

    # 构建Pandas的DF
    pdf = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ["张大仙", '王晓晓', '王大锤'],
    "age": [11, 11, 11]
    })
    # 将Pandas的DF对象转换成Spark的DF
    df = spark.createDataFrame(pdf)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.6 DataFrame的代码构建 - 读取外部数据

    通过SparkSQL的统一API进行数据读取构建DataFrame

    sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
    .option("K", "V") # option可选
    .schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT")
    .load("被读取文件的路径, 支持本地文件系统和HDFS")
    
    • 1
    • 2
    • 3
    • 4
    读取text数据源

    使用format(“text”)读取文本数据,读取到的DataFrame只会有一个列,列名默认称之为:value。

    schema = StructType().add("data", StringType(), nullable=True)
    df = spark.read.format("text")\
    .schema(schema)\
    .load("../data/sql/people.txt")
    
    • 1
    • 2
    • 3
    • 4
    读取json数据源

    使用format(“json”)读取json数据

    df = spark.read.format("json").\
    load("../data/sql/people.json")
    # JSON 类型 一般不用写.schema, json自带, json带有列名 和列类型(字符串和数字)
    df.printSchema()
    df.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    读取csv数据源

    使用format(“csv”)读取csv数据

    df = spark.read.format("csv")\
    .option("sep", ";")\ # 列分隔符
    .option("header", False)\ # 是否有CSV标头
    .option("encoding", "utf-8")\ # 编码
    .schema("name STRING, age INT, job STRING")\ # 指定列名和类型
    .load("../data/sql/people.csv") # 路径
    df.printSchema()
    df.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    读取parquet数据源

    使用format(“parquet”)读取parquet数据

    # parquet 自带schema, 直接load啥也不需要了
    df = spark.read.format("parquet").\
    load("../data/sql/users.parquet")
    df.printSchema()
    df.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    注意:
    parquet: 是Spark中常用的一种列式存储文件格式,和Hive中的ORC差不多, 他俩都是列存储格式
    parquet对比普通的文本文件的区别:

    • parquet 内置schema (列名\ 列类型\ 是否为空)。
    • 存储是以列作为存储格式。
    • 存储是序列化存储在文件中的(有压缩属性体积小)。

    Parquet文件不能直接打开查看,如果想要查看内容,可以在PyCharm中安装如下插件来查看:
    在这里插入图片描述

    3.7 DataFrame的入门操作

    DataFrame支持两种风格进行编程,分别是:DSL风格和SQL风格。

    • DSL语法风格:
      DSL称之为:领域特定语言。其实就是指DataFrame的特有API,DSL风格意思就是以调用API的方式来处理Data。比如:df.where().limit()
    • SQL语法风格
      SQL风格就是使用SQL语句处理DataFrame的数据,比如:spark.sql(“SELECT * FROM xxx)
    DSL - show 方法

    功能:展示DataFrame中的数据, 默认展示20条。
    语法:

    df.show(参数1, 参数2)
    - 参数1: 默认是20, 控制展示多少条
    - 参数2: 是否阶段列, 默认只输出20个字符的长度, 过长不显示, 要显示的话 请填入 truncate = True
    
    • 1
    • 2
    • 3
    DSL - printSchema方法

    功能:打印输出df的schema信息
    语法:

    df.printSchema()
    
    • 1

    例如:
    在这里插入图片描述

    DSL - select

    功能:选择DataFrame中的指定列(通过传入参数进行指定)
    语法:
    在这里插入图片描述
    可传递:

    • 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串
      列名来指定列。
    • List[Column]对象或者List[str]对象, 用来选择多个列。

    在这里插入图片描述

    DSL - filter和where

    功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame
    语法:

    df.filter()
    df.where()
    
    • 1
    • 2

    where和filter功能上是等价的。
    在这里插入图片描述

    DSL - groupBy 分组

    功能:按照指定的列进行数据的分组, 返回值是GroupedData对象
    语法:

    df.groupBy()
    
    • 1

    传入参数和select一样,支持多种形式,不管怎么传意思就是告诉spark
    按照哪个列分组。

    在这里插入图片描述

    GroupedData对象

    GroupedData对象是一个特殊的DataFrame数据集,其类全名:,这个对象是经过groupBy后得到的返回值, 内部记录了 以分组形式存储的数据。
    GroupedData对象其实也有很多API,像:min、max、avg、sum、等等许多方法都存在。

    SQL风格语法 - 注册DataFrame成为表

    DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中,使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。
    如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:
    在这里插入图片描述

    SQL风格语法 - 使用SQL查询

    在这里插入图片描述

    pyspark.sql.functions 包

    PySpark提供了一个包: pyspark.sql.functions,这个包里面提供了 一系列的计算函数供SparkSQL使用。

    导包
    from pyspark.sql import functions as F
    
    • 1
    • 2

    3.8 SparkSQL Shuffle 分区数目

    在这里插入图片描述

    3.9 SparkSQL 数据清洗API

    在大数据处理之前,首先要对数据进行清洗,有去重,删除缺值,填充缺值等等。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    3.10 DataFrame数据写出

    DataFrame数据写出
    在这里插入图片描述

    3.11 DataFrame 通过JDBC读写数据库(MySQL示例)

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    SQL中GROUP BY语句介绍
    【Web基础】Servlet——生命周期及其请求响应
    python学习之基本语法---语法规则---注释,标识符,关键字,命名规则(二)day8
    JS 处理文档选择和范围创建【createRange | getSelection】
    JMeter笔记2 | JMeter原理及测试计划要素
    2. 双链表的定义+ 代码实现
    使用Process Explorer查看线程的函数调用堆栈去排查程序高CPU占用问题
    【微软技术栈】C#.NET 内置数值转换
    C语言入门(二)运算符和表达式
    美容院共享系统开发|共享模式具体应该怎么去做?
  • 原文地址:https://blog.csdn.net/wu2374633583/article/details/128075069