• pyspark常用功能记录


    前言

    pyspark中很多常用的功能,过段时间没有使用就容易忘记,需要去网上搜索,这里总结一下,省的以后还去去搜,供自己以后参考。

    withColumn

    def hot_func(info_str):
        if info_str:
             eturn "1"
        return "0"
    df = df.withColumn("is_hot", F.udf(hot_func, StringType())(F.col("your_col_name")))
    
    • 1
    • 2
    • 3
    • 4
    • 5

    自定义函数

    from pyspark.sql.functions import udf  
    # 定义并注册函数
    @udf(returnType=StringType())
    def f_parse_category(info):
        x = json.loads(info)['category']
        return x if x is not None else ''
    spark.udf.register('f_parse_category', f_parse_category)
    # 在sql中使用注册的函数
    sql = """
    select *, f_parse_category(info) category, 
    from your_table
    where info is not null 
    """
    df = spark.sql(sql).cache()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    groupby处理

    按groupby处理,保留goupby字段,并对groupby的结果处理。正常情况下,使用df.groupBy即可,但需要处理多列并逻辑较为复杂时,可以使用这种方式。

    from pyspark.sql.functions import pandas_udf                                                         
    from pyspark.sql.functions import PandasUDFType 
    from pyspark.sql.types import StructField, LongType, StringType, StructType
    from collections import Counter
    
    pattern = re.compile(r'\b\w+(?:' + '|'.join(['_size', '_sum']) + r')\b')
    
    group_cols = ['category']
    value_cols = ['sales_sum', 'stat_size']
    
    schema = StructType(                                                                                
                        [StructField(col, LongType()) if len(re.findall(pattern, col))>0 else StructField(col, StringType())  for col in group_cols+value_cols],
                        )
    
    @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)                                          
    def group_stat(df):
    	# 获取
        l = [df[item].iloc[0] for item in group_cols]
        df = df[[col for col in df.columns if col not in group_cols]]
        sales_sum = df['sales'].sum().item()
        stat_size = len(df)
        
        # d: {"key": "value"}
        df['first_attr'] = df['attr'].transform(lambda d: list(json.loads(d).keys())[0])
        attr_dict = json.dumps({k:v for k, v in Counter(df['first_attr'].value_counts().to_dict()).most_common()}, ensure_ascii=0)
       
        counter = sum(df['brand_name'].apply(lambda x:Counter(json.loads(x))), Counter())
        ct = len(counter)
        brand_list = df["brand"].to_list()
        values = [sales_sum, stat_size, attr_dict, ct, infobox_brand_stat, brand_list]
        return pd.DataFrame([l + values])
    
    # df 包含字段:category, sales, attr, brand_name, brand
    df = df.groupby(group_cols).apply(group_stat).cache()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    patition By & orderBy

    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number, dense_rank
    # 根据department分区,然后按salary排序编号
    windowSpec  = Window.partitionBy("department").orderBy("salary")
    df.withColumn("row_number",row_number().over(windowSpec)) \
        .show(truncate=False)
    # dense_rank: 相同值排序编号一致
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    sql的方式:

    select 
    	name, category, sales, 
    	DENSE_RANK() OVER (PARTITION BY category ORDER BY b.sales DESC) as sales_rank
    from your_tb
    
    • 1
    • 2
    • 3
    • 4

    dataframe转正rdd处理行

    该中情况一般在需要处理过个行的情况下使用,如果是少数的行处理,可以使用withColumn

    def hot_func(info_str):
        if info_str:
             eturn "1"
        return "0"
    df = df.withColumn("is_hot", F.udf(hot_func, StringType())(F.col("your_col_name")))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    转为rdd的处理方式为:
    def gen_norm(row):
    	# 转为字段处理
        row_dict = row.asDict(recursive=True)
        process_key = row_dict["key"]
        row_dict["process_key"] = process_key
        return Row(**row_dict)
    # sampleRatio=0.01 为推断列类型的抽样数据比例
    df = df.rdd.map(gen_norm).toDF(sampleRatio=0.01).cache()
    df.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • 相关阅读:
    YSA Toon (Anime/Toon Shader)
    (原创)【MAUI】在窗口(页面)关闭后获取其返回值
    python 如何解析含有重复key的json
    硬件电子基础 -单片机信号转换
    MongoDB入门级别教程全(Windows版,保姆级教程)
    关于Mybaits缓存....
    java url编码 解码
    Orcad Schematic常用功能
    基于pion生态的SFU实时音视频发布服务(二)
    6种交互式内容创意帮助跨境电商卖家提高独立站商店知名度
  • 原文地址:https://blog.csdn.net/qq_42693848/article/details/133354774