提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
Spark SQL是一个用于处理结构化数据对组件,主要用于结构化处理和对数据执行SQL查询,类似于pandas操作,只不过数据量相对更大。
spark SQL数据集类型:DataFrame
使用所提供的结构化操作来操作或转换data frame:
结构化操作有时被描述为一种用于分布式数据操作的特定领域的语言(DSL)
结构化操作分为两类:transformation、action
dataframe是不可变的,他们的转换操作总是返回一个新的dataframe
大多数dataframe结构化操作都需要指定一个或多个列
对于其中的一些列,列是以字符串的形式指定的,对于某些列,需要被指定为column类的实例
在较高的层次上,列类提供的功能可以分解为以下类别
数学运算:加法、乘法等
将列值与文字之间的逻辑比较,例如相等、大于、等于等
字符串匹配模式
结构化转换transformation:
结构化动作action具有与RDD动作相同的eager evalueated语义,因此他们触发了所有导致特定动作的转换的计算量,函数有:
实例化spark
sc = SparkSession.builder.appName('sparkSQL').getOrCreate()
sc.range(10)
sc.range(10).show()
scores = [['章三','m',90],['李四','F',70],['王五','m',88]]
string_schema = """
`姓名` string
,`性别` string
,score int
"""
# 中文的字段名,或者别名,需要用``上引号(tab键上面的)括起来
sc.createDataFrame(data=scores,schema=string_schema).show()
spark = sc.sparkContext
rdd1 = spark.parallelize([["章三","男",90],["李四","女",70],["王五","男",88]])
sc.createDataFrame(rdd1,schema=string_schema).show()
df = pd.DataFrame(scores,columns=['姓名','性别','score'])
sc.createDataFrame(df).show()
movies = sc.read.csv('./data/spark_data/movies.csv',header = True)
movies.show(5)
与dataframe类似,dataframe的API也分为两类:transformation和action
选择列select selectExpr
select与SQL中的类似,选取指定列(指定条件的数据)
selectExpr与select类似,不同的是可以解析表达式(地处进行了SQL转化)
spark = SparkSession.builder.appName('sparkSQL').getOrCreate()
movies_parquet = spark.read.parquet("./data/spark_data/movies.parquet")
movies_parquet.printSchema()
movies_parquet.select("*",'movie_title').show(5)
#查询电影上映的年代
#select不能存放字符串表达式
#这里会报错,用selectexpr
movies_parquet.select('movies_title','produced_year-produced_year%10').show(5)
filter where用来设置条件筛选字段,两者用法一致
#不等比较操作符 !=
movies_parquet.where('produced_year!=2000').show(5)
#组合一个或多个比较表达式,我们将使用or或and表达运算符
#2000~2005范围内的电影
movies_parquet.where('produced_year>2000 and produced_year<2005').show(5)
#电影数量
movies_parquet.selectExpr('count(movie_title)').show()
#去重后的数量
movies_parquet.select('movie_title').distinct().selectExpr('count(movie_title)').show()
#根据电影title名称长短排序
(movies_parquet
.dropDuplicates(['movie_title'])
.selectExpr("*",'length(movie_title)t_len')
.sort('t_len')
).show()
#使用sparkSQL函数实现升降的设定
from pyspark.sql.functions import desc,asc
(movies_parquet
.dropDuplicates(['movie_title'])
.selectExpr("*",'length(movie_title)t_len')
.sort(desc('t_len')
,asc('produced_year'))
).show()
# 选择前3行
movies_parquet.limit(3).show()
# 显示前3行
movies_parquet.show(3)
相当于SQL的union all,去重用前面的方法
df1 = spark.createDataFrame([[1,2,3]],['c1','c2','c3'])
df2 = spark.createDataFrame([[4,5,6]],['c2','c3','c1'])
df1.union(df1).show()
df1.union(df2).show()
向DataFrame增加一个新的列,基于某一列表达式
movies_parquet.withColumn('年代',movies_parquet.produced_year - movies_parquet.produced_year%10).show(10)
movies_parquet.withColumnRenamed('actor_name','actor').show(5)
movies_parquet.drop('actor_name').show(5)
badmovie = [[None,None,2005],
['test',None,2003],
[None,'test1',2007]]
bad_df = spark.createDataFrame(badmovie,schema=movies_parquet.schema)
bad_df.dropna(how='any',subset=['actor_name','produced_year']).show()
# 非空数据大于等于2则不删除
bad_df.dropna(thresh=2 #非空数据至少有2个
).show()
movies_parquet.groupBy('actor_name').agg({'movie_title':'count'}).show(5)
import pyspark.sql.functions as F #使用F中的count函数,这样返回dataframe列对象,可以用alias设置列名
movies_parquet.groupBy('actor_name').agg(F.count('movie_title')).show(5)
movies_parquet.describe().show()
返回数据第一行
movies_parquet.head()
movies_parquet.head()['actor_name']
movies_parquet.first()
movies_parquet.take(3)
以列表形式返回客户端
movies_parquet.limit(3).collect()
movies_parquet.limit(3).collect()
sparkSQL提供的最酷功能之一是能够使用SQL来执行分布式数据操作或大规模数据分析
# 将df注册为临时视图movies
movies_parquet.createOrReplaceTempView('movies')
spark.sql('select * from movies limit 3').show()
与API混用
# 电影数量高于平均值的演员信息
spark.sql('''
select actor_name,numb
,avg(numb) over() avg_numb
from(select actor_name
,count(*)numb
from movies group by actor_name) t
''').where('numb>avg_numb').show()
#通过语法 desc function extended 函数名 ,查看函数文档
# 自定义函数,以更好的格式查看文档信息
def print_doc(func_name:str)->None:
for i in spark.sql(f'desc function extended {func_name}').head(100):
print(i['function_desc'])
print_doc('explode')
spark.sql('show functions').count()
spark.sql('show functions').show(333,False)
内置函数:处理日期的时间函数
spark内置的日期时间函数大致可分为以下三个类别:
日期和时间转换函数:这些函数使用的默认的日期格式为yyyy-mm-dd:HH:mm:ss
test_data = [(1,'2023-05-08','2023-05-08 11:22:33','28-05-2023','28-05-2023 13:50')]
test_DF = spark.createDataFrame(test_data,schema=['id','date','datetime','date_str','dt_str'])
test_DF.show()
test_DF.printSchema()
将这些字符串转换成date、timestamp和unix timestamp,并指定一个自定义的date和timestamp格式
#注册视图
test_DF.createOrReplaceTempView('test_DF')
- to_date将各种字符串替换为date
- to_timestamp将字符串转换为datetime
- unix_timestamp将字符串转换为整数
test_DF.show()
spark.sql('''
select to_date(datetime) date2
,to_timestamp(dt_str,'dd-MM-yyyy HH:mm')
from test_DF
''').show()
spark.sql('''
select to_date(datetime) date2
,to_timestamp(dt_str,'dd-MM-yyyy HH:mm')
from test_DF
''').printSchema()
test_DF.select(F.to_date('datetime'),
F.to_timestamp('dt_str','dd-MM-yyyy HH:mm')).show()
将日期或时间戳转换为时间字符串,使用
from pyspark.sql.functions import date_format,from_unixtime
case_df = spark.createDataFrame([('a','2015-01-10','2015-02-10'),('b','2017-05-20','2025-02-15')]
,schema = ['name','join_date','leave_date'])
case_df.createOrReplaceTempView('case_df')
case_df.show()
from pyspark.sql.functions import datediff,months_between,last_day
# API
case_df.select(F.datediff('leave_date','join_date'),
F.last_day('leave_date')).show()
from pyspark.sql.functions import date_add,date_sub
from pyspark.sql.functions import year,month,dayofmonth,hour,second,dayofweek
使用udf函数涉及有三个步骤:
data = [[101,'中山路','南京路'],
[102,'北京路','中山路'],
[103,'南京路','北京路'],
[104,'北京路','南京路'],
[105,'中山路','南京路'],
[106,'南京路','北京路'],
]
route_df = spark.createDataFrame(data,schema='id int ,st string,end string')
route_df.show()
# 每个线路的订单数量,线路不区分起点和终点
def get_route(st,ed):
return "-".join(sorted([st,ed]))
from pyspark.sql.functions import udf
udf_get_route = udf(get_route)
route_df.select('id',udf_get_route('st',"end").alias('route')).show()
route_df.select('id',udf_get_route('st',"end").alias('route')).groupBy('route').count().show()
spark.udf.register('udf_get_route_sql',get_route)
route_df.createOrReplaceTempView('route_df')
spark.sql("""
select udf_get_route_sql(st,end)
,count(*)
from route_df
group by udf_get_route_sql(st,end)
""").show()
result = spark.sql("""
select udf_get_route_sql(st,end)
,count(*)
from route_df
group by udf_get_route_sql(st,end)
""")
# action操作toPandas()
df = result.toPandas()
df
df.plot(kind = 'bar'
,x='udf_get_route_sql(st, end)'
,y='count(1)')
文字未正常加载,可以使用以下代码解决
import matplotlib.pyplot as plt
plt.rcParams['axes.unicode_minus']=False
plt.rcParams['font.sans-serif']=['Simhei']
窗口函数:排序函数、分析函数、聚合函数
- rank:返回一个frame内行的排名和排序,基于一些排序规则
- dense_rank:类似于rank,但是在不同的排名之间没有间隔,紧密衔接显示
- ntile(n):在一个有序的窗口分区中返回ntile分组ID.比如,如果n是4,那么前25%行得到的ID值为1,第二个25%行得到的ID值为2,依次类推
- row_number:返回一个序列号,每个frame从1开始
- cume_dist:返回一个frame的值的累积分布。换句话说,低于当前的行的比例
- log(col,offset):返回当前行之前offset行的列值
- lead(col,offset):返回当前行之后offset行的列值
它可以理解为记录集合,开窗函数也就是在满足某种条件的记录集合上执行的特殊函数。对于每条记录都要在此窗口内执行函数,有点函数随着记录不同,窗口大小都是固定的,这
种属于静态窗口;有的函数则相反,不同的记录对应着不同的窗口,这种动态变化的窗口叫滑动窗口。开窗函数的本质还是聚合运算,只不过它更具灵活性,它对数据的每一行,都
使用与该行相关的行进行计算并返回计算结果。
数据初始化
logFile = './data/spark_data/order_tab.csv'
from pyspark.sql.types import FloatType,StringType,DateType,IntegerType,StructType,StructField
schema1 = StructType(
[StructField('order_id',IntegerType(),True),
StructField('user_no',StringType(),True),
StructField('amount',IntegerType(),True),
StructField('create_date',DateType(),True)
]
)
logData = spark.read.csv(logFile,schema=schema1,header=True)
logData.printSchema()
logData.createOrReplaceTempView('order_tab')
spark.sql('select * from order_tab').show()
# 查询每个用户的订单总金额
spark.sql('select user_no ,sum(amount) from order_tab group by user_no').show()
#查询每个用户按时间顺序的累计订单金额
spark.sql('select *,sum(amount) over (partition by user_no order by create_date) cusum from order_tab').show()
spark.sql('select *,sum(amount) over (partition by user_no) cusum from order_tab').show()
# 查询每个用户按下单时间顺序,前一行和后一行记录的平均订单金额
spark.sql("""
select * ,avg(amount) over (partition by user_no order by create_date rows between 1 preceding and 1 following) avg_amount
from order_tab
""").show()
通常使用rows between frame_start and frame_end语法来表示行范围,frame_start和frame_end可以支持如下关键字,来确定不同的动态行记录:
和基于行类似,通常使用range between frame_start and frame_end语法来表示值范围,比如下面都是合法的值范围
示例:查询每个用户订单金额最高的前三个订单
spark.sql("""
select * from
(select *
,row_number() over (partition by user_no order by amount desc) rb
from order_tab) t
where rb<4
""").show()
以上结果中用户002的前两个订单的金额都是800,随机排为第一和第二,但实际两笔订单金额应该并列第一
这种情况row_number函数就不能满足要求,需要rank和dense_rank函数,这2个函数和row_number函数类似,只是在出现重复值时处理逻辑不同
示例:使用三个不同的序号函数,查询不同用户的订单中,按照订单金额进行排序,显示出相应的排名
分区中位于当前行前n行(lag)/后n行(lead)的记录值
示例:查询上一个订单距离当前订单的间隔天数
spark.sql('select *,lag(create_date,1) over(partition by user_no order by create_date) ldate from order_tab').show()
spark.sql("""
select *,datediff(create_date,ldate)diff
from (select * ,lag(create_date,1) over (partition by user_no order by create_date) ldate
from order_tab) t
""").show()