• 【Spark】PySpark DataFrame


    1 SparkSession 执行环境入口

    from pyspark.sql import SparkSession, Row
    from pyspark.sql.types import *
    
    • 1
    • 2
    # 构建SparkSession执行环境入口
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    
    sc = spark.sparkContext
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2 构建DataFrame

    2.1 由rdd构建 (StructType、StructField)

    rdd = sc.parallelize([(1001, "Tom", 12), (1002, "Jerry", 13), (1003, "John", 14)])
    rdd = rdd.map(lambda x: Row(x[0], x[1], x[2]))
    
    • 1
    • 2
    '''
    StructType(列名, 列类型, 列是否允许为空) 描述df表结构
    StructField 描述一列的信息
    Row(1, 'a', 12) 记录一行数据
    Column 记录一列数据并包含列信息(StructField)
    '''
    schema = StructType([StructField("id", LongType(), True)
                        ,StructField("name", StringType(), True)
                        ,StructField("age", IntegerType(), True)])
    '''
    也可以:
    schema = StructType().add('id', LongType(), True)\
                         .add('name', StringType(), True)\
                         .add('age', IntegerType(), True)
    '''
    
    df = spark.createDataFrame(rdd, schema)
    # 或 # df = spark.createDataFrame(rdd, schema=['id', 'name', 'age'])
    
    '''
    param1: 展示多少数据 默认20
    param2: 默认True 进行截断,数据长度超过20,后续内容不显示
    '''
    df.show()
    
    # 打印表结构
    df.printSchema()
    '''
        +----+-----+---+
        |  id| name|age|
        +----+-----+---+
        |1001|  Tom| 12|
        |1002|Jerry| 13|
        |1003| John| 14|
        +----+-----+---+
        
        root
         |-- id: long (nullable = true)
         |-- name: string (nullable = true)
         |-- age: integer (nullable = true)
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    '''
    数据类型需要在构建rdd时进行指定的转换
    '''
    df = rdd.toDF(['id', 'name', 'age'])
    df.show()
    df.printSchema()
    '''
        +----+-----+---+
        |  id| name|age|
        +----+-----+---+
        |1001|  Tom| 12|
        |1002|Jerry| 13|
        |1003| John| 14|
        +----+-----+---+
        
        root
         |-- id: long (nullable = true)
         |-- name: string (nullable = true)
         |-- age: long (nullable = true)
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    # 也可传入schema,指定类型
    schema = StructType().add('id', LongType(), True)\
                         .add('name', StringType(), True)\
                         .add('age', IntegerType(), True)
    df = rdd.toDF(schema=schema)
    df.show()
    df.printSchema()
    '''
        +----+-----+---+
        |  id| name|age|
        +----+-----+---+
        |1001|  Tom| 12|
        |1002|Jerry| 13|
        |1003| John| 14|
        +----+-----+---+
        
        root
         |-- id: long (nullable = true)
         |-- name: string (nullable = true)
         |-- age: integer (nullable = true)
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    2.2 由pandas.DataFrame构建

    # pandas DataFrame >>> pyspark.sql DataFrame
    import pandas as pd
    
    pdf = pd.DataFrame(
        {
            'id': [1, 2, 3],
            'name': ['a', 'b', 'c'],
            'age': [12, 13, 14]
        }
    )
    df = spark.createDataFrame(pdf)
    df.show()
    df.printSchema()
    '''
        +---+----+---+
        | id|name|age|
        +---+----+---+
        |  1|   a| 12|
        |  2|   b| 13|
        |  3|   c| 14|
        +---+----+---+
        
        root
         |-- id: long (nullable = true)
         |-- name: string (nullable = true)
         |-- age: long (nullable = true)
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    2.3 由外部数据构建

    2.3.1 text数据源

    '''
    text数据源读取数据时,将一整行的数据 作为一个列的一项数据,默认名称value,类型string
    可以用schema指定列的名字和类型
    '''
    schema = StructType().add('data', StringType(), True)
    df = spark.read.format('text').\
        schema(schema=schema).\
        load('/test.text')
    '''
    +---+---+
    |   data|
    +-------+
    | 1 a 12|
    | 2 b 13|
    | 3 c 14|
    +-------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    2.3.2 json数据源

    '''
    JSON数据源自带Schema信息
    {"name": 'c'}
    {"id": 1, "name": 'a'}
    {"id": 2, "name": 'b'}
    '''
    df = spark.read.format('json').load('/test.text')
    '''
    +----+-----+
    | id | name|
    +----+-----+
    |null|  c  | 
    | 1  |  a  |
    | 2  |  b  |
    +----------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.3.3 csv数据源

    '''
    csv数据源
    id,name,age
    1,a,12
    2,b,13
    '''
    df = spark.read.format('csv')\
        .option('sep', ',')\ # 指定分隔符
        .option('header', True)\ # csv文件中有表头(列名),True为不用
        .option('encoding', 'utf-8')\ 
        .schema('id INT, name STRING, age INT') # 指定列名和列数据类型
        .load('/test.text')
    '''
    +---+----+---+
    | id|name|age|
    +---+----+---+
    |  1|   a| 12|
    |  2|   b| 13|
    +---+----+---+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    3 DataFrame 操作

    3.1 SQL风格

    # 构建临时视图表age,后续可用sql查询
    df.createOrReplaceTempView("age")
    # 构建全局表,可以跨SparkSession对象使用
    # 在使用前 global_temp.
    df.createGlobalTempView('global_age')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    spark.sql(
        """  
        select * from age limit 2
        """
    ).show()
    '''
        +----+----+---+
        |  id|name|age|
        +----+----+---+
        |1001| Tom| 12|
        |1003|John| 14|
        +----+----+---+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    spark.sql(
        """
        select * from global_temp.global_age
        """
    ).show()
    '''
        +----+-----+---+
        |  id| name|age|
        +----+-----+---+
        |1001|  Tom| 12|
        |1002|Jerry| 13|
        |1003| John| 14|
        |1004|  Tom| 15|
        +----+-----+---+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.2 DSL风格

    data = [(1001, "Tom", 12.0)
           ,(1002, "Jerry", 13.0)
           ,(1003, "John", 14.0)
           ,(1004, 'Tom', 15.0)
           ,(1003, "John", 17.353)]
    
    rdd = sc.parallelize(data)
    rdd = rdd.map(lambda x: Row(x[0], x[1], x[2]))
    
    schema = StructType([StructField("id", LongType(), True)
                        ,StructField("name", StringType(), True)
                        ,StructField("age", FloatType(), True)])
    df = spark.createDataFrame(rdd, schema)
    df.show()
    '''
        +----+-----+------+
        |  id| name|   age|
        +----+-----+------+
        |1001|  Tom|  12.0|
        |1002|Jerry|  13.0|
        |1003| John|  14.0|
        |1004|  Tom|  15.0|
        |1003| John|17.353|
        +----+-----+------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    3.2.1 df.select() 指定查看某列

    df.select('id', 'age').show()
    '''
        +----+------+
        |  id|   age|
        +----+------+
        |1001|  12.0|
        |1002|  13.0|
        |1003|  14.0|
        |1004|  15.0|
        |1003|17.353|
        +----+------+
    '''
    df.select(df.col_name).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    3.2.2 df.where/filter()

    df.filter('age < 14').show()
    # 同:
    df.where('age < 14').show()
    '''
        +----+-----+----+
        |  id| name| age|
        +----+-----+----+
        |1001|  Tom|12.0|
        |1002|Jerry|13.0|
        +----+-----+----+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.2.3 指定范围筛选数据

    3.2.3.1 指定阈值
    from pyspark.sql import functions as F
    
    df[df['id'] > 1003].show()
    '''
        +----+----+----+
        |  id|name| age|
        +----+----+----+
        |1004| Tom|15.0|
        +----+----+----+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    df.where(df['id'] > 1003).show()
    '''
        +----+----+----+
        |  id|name| age|
        +----+----+----+
        |1004| Tom|15.0|
        +----+----+----+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    df.where((df['id'] < 1003) & (df['id'] > 1001)).show()
    '''
        +----+-----+----+
        |  id| name| age|
        +----+-----+----+
        |1002|Jerry|13.0|
        +----+-----+----+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    # age大于平均值
    df.where(df['age'] >  df.select(F.avg(df['age'])).first()['avg(age)']).show()
    '''
        +----+----+------+
        |  id|name|   age|
        +----+----+------+
        |1004| Tom|  15.0|
        |1003|John|17.353|
        +----+----+------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    3.2.3.2 统计null、nan、“”
    查找某一列中的Null
    df.where(df.col_name.isNull()).show() 返回所有列,且该列值为Null的行
    df.where(df.col_name.isNull()).count()可统计该列null值数量
    df.select(df.col_name.isNull()).show() 只返回该列,值为true或false
    df.select(df.col_name.isNull()).count() 返回的仍是df的所有行数
    
    查找空值(空字符串""被认为是空值)
    df.where(df.col_name=="").count()
    
    查找缺失值nan(not a number)
    from pyspark.sql.functions import isnan
    df.where(isna(df.col_name)).count()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.2.4 df.orderBy() 排序

    df.orderBy('age', ascending=False).show()
    '''
        +----+-----+------+
        |  id| name|   age|
        +----+-----+------+
        |1003| John|17.353|
        |1004|  Tom|  15.0|
        |1003| John|  14.0|
        |1002|Jerry|  13.0|
        |1001|  Tom|  12.0|
        +----+-----+------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.2.5 df.groupBy().agg() 分组操作

    3.2.5.1 {col: func} 指定特征聚合
    # groupBy
    df.groupBy('name').agg({'id': 'max', 'age': 'max'}).show()
    '''
        +-----+-------+--------+
        | name|max(id)|max(age)|
        +-----+-------+--------+
        |  Tom|   1004|    15.0|
        | John|   1003|  17.353|
        |Jerry|   1002|    13.0|
        +-----+-------+--------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • F 实现复杂聚合
    from pyspark.sql import functions as F
    
    df.groupBy('name').agg(
            # 取平均后保留2位小数
            # alias 对Columns改名
            F.round(F.avg('age'), 2).alias('avg_age'),
            F.max('age').alias('min_age'),
            F.count('id').alias('cnt_id')
    
        ).show()
    '''
        +-----+-------+-------+------+
        | name|avg_age|min_age|cnt_id|
        +-----+-------+-------+------+
        |  Tom|   13.5|   15.0|     2|
        |Jerry|   13.0|   13.0|     1|
        | John|  15.68| 17.353|     2|
        +-----+-------+-------+------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    3.2.5.2 所有/部分 特征统一聚合
    
    cols = diff.columns
    # .agg(*[F.mean(col) for col in cols])
    diff_mean = diff.groupBy('pt').agg(*[F.round(F.mean(col), 5).alias(col) for col in cols[:2]])
    
    diff_NoneRatio = diff.groupBy('pt').agg(*[F.round(F.mean(diff[col].isNull().cast('double')), 5).alias(col) for col in cols[:2]])
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.2.6 列操作

    3.2.6.1 创建列 df.withcolumn()
    # 创建新列
    from pyspark.sql.functions import lit
    
    df1 = df.withColumn("nnumberOne", lit(2))
    df1.show()
    '''
        +----+-----+------+----------+
        |  id| name|   age|nnumberOne|
        +----+-----+------+----------+
        |1001|  Tom|  12.0|         2|
        |1002|Jerry|  13.0|         2|
        |1003| John|  14.0|         2|
        |1004|  Tom|  15.0|         2|
        |1003| John|17.353|         2|
        +----+-----+------+----------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    from pyspark.sql import functions as F
    
    '''
    withColumn
    对已存在的列操作
    如果新列名和原有列名相同则覆盖原列,否则在表后追加新列
    '''
    df.withColumn('class & No.', F.split(df['id'], '00')).show()
    '''
        +----+-----+------+-----------+
        |  id| name|   age|class & No.|
        +----+-----+------+-----------+
        |1001|  Tom|  12.0|     [1, 1]|
        |1002|Jerry|  13.0|     [1, 2]|
        |1003| John|  14.0|     [1, 3]|
        |1004|  Tom|  15.0|     [1, 4]|
        |1003| John|17.353|     [1, 3]|
        +----+-----+------+-----------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    3.2.6.2 删除列 df.drop()
    # 删除列
    df1.drop('numberOne').show()
    '''
        +----+-----+------+----------+
        |  id| name|   age|nnumberOne|
        +----+-----+------+----------+
        |1001|  Tom|  12.0|         2|
        |1002|Jerry|  13.0|         2|
        |1003| John|  14.0|         2|
        |1004|  Tom|  15.0|         2|
        |1003| John|17.353|         2|
        +----+-----+------+----------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    3.2.6.3 改列名 df.withColumnRenamed()
    # 改名
    df.withColumnRenamed('id', 'No.').show()
    '''
        +----+-----+------+
        | No.| name|   age|
        +----+-----+------+
        |1001|  Tom|  12.0|
        |1002|Jerry|  13.0|
        |1003| John|  14.0|
        |1004|  Tom|  15.0|
        |1003| John|17.353|
        +----+-----+------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    3.2.6.4 改列数据类型 df.withColumn(‘new_name’, col(‘name’).cast(‘type’))
    from pyspark.sql.functions import col
    
    # 转换列的类型
    df.withColumn('age_int', col('age').cast('int')).show()
    '''
        +----+-----+------+-------+
        |  id| name|   age|age_int|
        +----+-----+------+-------+
        |1001|  Tom|  12.0|     12|
        |1002|Jerry|  13.0|     13|
        |1003| John|  14.0|     14|
        |1004|  Tom|  15.0|     15|
        |1003| John|17.353|     17|
        +----+-----+------+-------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    3.2.6.5 拼接列 concat、concat_ws
    • 拼接后的新列尾string类型
    • concat_ws 指定拼接符
    from spark.sql.functions import concat, concat_ws
    
    
    df_concat = df.withColumn("col_concat", \
    						  concat(df['col0'], df['col1'], df['col2']))
    						  
    df_concat = df.withColumn("col_concat", \
    					      concat_ws('_', df['col0'], df['col1'], df['col2']))
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    3.2.6.6 批量重命名列 df.toDF(*list)

    另:rdd.toDF(list)

    '''
    整张表的字段重命名
    '''
    
    cols = ['new_name1', 'new_name2', 'name3']
    df.toDF(*cols)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    3.2.6.7 添加索引列
    from pyspark.sql import Window
    from pyspark.sql.functions import row_number, monotonically_increasing_id
    
    
    window_spec = Window.orderBy(monotonically_increasing_id())
    df = df.withColumn("index", row_number().over(window_spec) - 1)
    '''
    +------------------+--------------------+-----+
    |            cfrnid|                0830|index|
    +------------------+--------------------+-----+
    |360287970218473313|[[0, 0, 0, 0, 0, ...|    0|
    |360287970218338563|[[6, 47398, 0, 29...|    1|
    |360287970207922506|[[11932, 1048, 0,...|    2|
    |360287970208320398|[[171, 0, 0, 0, 0...|    3|
    |360287970218289139|[[208, 1520, 0, 0...|    4|
    |360287970218491829|[[0, 0, 0, 0, 0, ...|    5|
    |360287970218325554|[[180354, 0, 0, 0...|    6|
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    3.2.7 去重 df.dropDuplicates([‘name’]) / df.distinct()

    '''
    去重
    无参数,只保留重复行的第一行;
    指定列,只保留重复列值第一次出现所在的行
    '''
    df.dropDuplicates(['name']).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    '''
    选定列,去重
    '''
    df[['col_name']].distinct().show()
    
    • 1
    • 2
    • 3
    • 4

    3.2.8 缺失值处理 df.dropna/fillna()

    '''
    去缺失值
    无参数,有缺失值就删除对应行;
    有参数,
    '''
    df.dropna().show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    df.fillna('loss') 填充缺失值为'loss'
    df.fillna('loss', subset=['name']) 指定列填充缺失值
    df.fillna({'id': 0, 'name': 'loss', 'age': -1}) 同时指定多个列的缺失值填充规则
    
    
    • 1
    • 2
    • 3
    • 4

    3.2.9 表 join

    data = [(1001, "Tom", 12.0)
           ,(1002, "Jerry", 13.0)
           ,(1003, "John", 14.0)
           ,(1004, 'Tom', 15.0)
           ,(1003, "John", 17.353)]
    
    rdd = sc.parallelize(data)
    rdd = rdd.map(lambda x: Row(x[0], x[1], x[2]))
    
    df_l = spark.createDataFrame(rdd, schema=['id', 'name', 'age'])
    df_l.show()
    '''
        +----+-----+------+
        |  id| name|   age|
        +----+-----+------+
        |1001|  Tom|  12.0|
        |1002|Jerry|  13.0|
        |1003| John|  14.0|
        |1004|  Tom|  15.0|
        |1003| John|17.353|
        +----+-----+------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    df_r = df_l.where(df_l['id'] > 1002).\
                withColumn('class', F.split(df_l['id'], '00')[0])
    
    df_r.show()
    '''
        +----+----+------+-----+
        |  id|name|   age|class|
        +----+----+------+-----+
        |1003|John|  14.0|    1|
        |1004| Tom|  15.0|    1|
        |1003|John|17.353|    1|
        +----+----+------+-----+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    df_l.join(df_r[['id', 'class']], how='inner', on='id').show()
    '''
        +----+----+------+-----+
        |  id|name|   age|class|
        +----+----+------+-----+
        |1003|John|  14.0|    1|
        |1003|John|  14.0|    1|
        |1003|John|17.353|    1|
        |1003|John|17.353|    1|
        |1004| Tom|  15.0|    1|
        +----+----+------+-----+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.2.10 list/dict多元素展开 F.explode()

    3.2.10.1 将一列的list中的元素展开成多行
    rdd = sc.parallelize([(1001, 'a b c'), (1002, 'd e f')])
    rdd = rdd.map(lambda x: (x[0], x[1].split()))
    df = rdd.toDF(schema=['id', 'line'])
    df.show()
    '''
    +----+---------+
    |  id|     line|
    +----+---------+
    |1001|[a, b, c]|
    |1002|[d, e, f]|
    +----+---------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    from pyspark.sql import functions as F
    
    df.withColumn('explode_line', F.explode(df['line'])).show()
    '''
    +----+---------+------------+
    |  id|     line|explode_line|
    +----+---------+------------+
    |1001|[a, b, c]|           a|
    |1001|[a, b, c]|           b|
    |1001|[a, b, c]|           c|
    |1002|[d, e, f]|           d|
    |1002|[d, e, f]|           e|
    |1002|[d, e, f]|           f|
    +----+---------+------------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    3.2.10.2 将dict展开 只能展开为key、value两列
    def udf():
        d = {'a': '1', 'b': '2'}
        return d
    
    udf1 = spark.udf.register('udf1', udf, MapType(StringType(), StringType()))
    
    df = df.withColumn('col', udf1())
    '''
    +------------------+---+---+----------------+
    |            cfrnid|No1|No2|             col|
    +------------------+---+---+----------------+
    |360287970194088911|  1|  2|[a -> 1, b -> 2]|
    |360287970192611432|  1|  2|[a -> 1, b -> 2]|
    |360287970195509570|  1|  2|[a -> 1, b -> 2]|
    |360287970195244387|  1|  2|[a -> 1, b -> 2]|
    |360287970193184476|  1|  2|[a -> 1, b -> 2]|
    +------------------+---+---+----------------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    df.select(F.explode('col')).show()
    '''
    +---+-----+
    |key|value|
    +---+-----+
    |  a|    1|
    |  b|    2|
    |  a|    1|
    |  b|    2|
    |  a|    1|
    |  b|    2|
    |  a|    1|
    |  b|    2|
    |  a|    1|
    |  b|    2|
    +---+-----+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    3.2.11 将一列的list中的元素展开成多列

    '''
    原字段 id, cr_stagesum(list)
    现字段 id, cr_stagesum[0], cr_stagesum[1], cr_stagesum[2]
    
    只分离出原list字段的索引为0,1,2的元素,作为新的3列
    '''
    col_nums = [0, 1, 2]
    cols = ['id']
    for i in col_nums:
        cols.append(df.cr_stagesum[i])
    df = df.select(*cols)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.2.12 查看个数/去重后个数 df.describe() / df.select(‘name’).distinct().count()

    df.describe().show()
    '''
    +-------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------+
    |summary|              cfrnid|             traceid|       report_times|              respon|              lender|              crtype|              ricomm|        pt|
    +-------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------+
    |  count|               38074|               38074|              38074|               35704|               35704|               35704|               35704|     38074|
    |   mean|3.602879702074297E17|                null|               null|                null|                null|                null|                null|      null|
    | stddev|   9750597.034539178|                null|               null|                null|                null|                null|                null|      null|
    |    min|  360287970189650107|0a0f120e64ee09722...|2023-08-29 00:00:09|[[0, 0, 0, 0, 0, ...|[[0, 0, 0, 0, 0, ...|[[0, 0, 0, 0, 0, ...|[[0, 0, 0, 0, 0, ...|2023-08-30|
    |    max|  360287970218654431|s570868e64ed9a4b1...|2023-08-29 23:59:55|[[99998, 0, 0, 0,...|[[9997, 0, 0, 646...|[[9993, 0, 0, 0, ...|[[0, 0, 0, 0, 0, ...|2023-08-30|
    +-------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    # 计算个数
    df.select('respon').count()
    # 计算不重复的个数
    df.select('respon').distinct().count()
    
    • 1
    • 2
    • 3
    • 4

    3.2.13 取前n个数据 df.take()

    • df.take(50) 返回类型是list

    3.2.14 按索引读取行数据,读取表格指定位置数据 df.take()[]

    # 获取指定行的数据
    '''
    获取第3行, 索引为2的一行数据 必须后面加[idx];
    返回的是 Row类型
    '''
    df.take(3)[2] 
    # 获取Row对象中具体哪个数据
    df.take(3)[2]['cfrnid']
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.2.15 条件语句 F.when

    import pyspark.sql.functions as F
    
    
    df = df.withColumn('sample_set', 
                       F.when(df.index > t, F.lit('OOT'))\
                       .when(df.index <= t, F.lit('TRAIN')))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.2.16 日期操作 F.date_sub

    日期往前推几天

    import pyspark.sql.functions as F
    
    # pt - 1 == credit_date
    d.where(d.credit_date == F.date_sub(d.pt, 1)).show()
    
    • 1
    • 2
    • 3
    • 4

    3.2.17 将一列的值提取为list .rdd.flatMap()

    接 3.2.10.2

    keys = df.select('key').distinct()
    '''
    +---+
    |key|
    +---+
    |  a|
    |  b|
    +---+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    keys.rdd.collect()
    '''
    [Row(key='a'), Row(key='b')]
    '''
    
    • 1
    • 2
    • 3
    • 4
    keys.rdd.flatMap(lambda x: x).collect()
    '''
    ['a', 'b']
    '''
    
    • 1
    • 2
    • 3
    • 4

    3.2.18 按分区随机采样 df.sample(fraction=, seed=)

    df = df.sample(fraction=0.02, seed=666)
    
    • 1

    4 UDF 用户定义(普通)函数

    from pyspark.sql import SparkSession,
    from pyspark.sql import functions as F
    from pyspark.sql.types import *
    
    • 1
    • 2
    • 3
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        config('spark.sql.shuffle.partitions', 2).\
        getOrCreate()
    sc = spark.sparkContext
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    rdd = sc.parallelize([1, 2, 3]).\
        map(lambda x: [x])
    df = rdd.toDF(['num'])
    df.show()
    '''
        +---+
        |num|
        +---+
        |  1|
        |  2|
        |  3|
        +---+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    4.1 sparksession.udf.register()

    • 注册的udf可以用于DSL和SQL
    # sparksession.udf.register()
    def num_ride_10(num):
        return num * 10
    '''
    param1: 注册的udf名称,仅可用于SQL风格
    param2: 数据处理方法
    param3: UDF的返回类型
    return: udf对象,仅可以用于DSL风格
    '''
    udf_dsl = spark.udf.register('udf_sql', num_ride_10, IntegerType())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    df.createOrReplaceTempView("num")
    spark.sql('select udf_sql(*) from num').show()
    # df.selectExpr('udf_sql(num)').show()
    '''
        +------------+
        |udf_sql(num)|
        +------------+
        |          10|
        |          20|
        |          30|
        +------------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    # DSL 风格的udf只接受Column对象
    # 注意:列名是udf_sql, 正常使用时返回对象和注册udf名一致
    df.select(udf_dsl(df['num'])).show()
    '''
        +------------+
        |udf_sql(num)|
        +------------+
        |          10|
        |          20|
        |          30|
        +------------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4.2 pyspark.sql.functions.udf() & 数据类型

    • 仅用于DSL风格

    4.2.1 IntegerType()

    def num_ride_10(num):
        return num * 10
    
    udf = F.udf(num_ride_10, IntegerType())
    df.select(udf(df['num'])).show()
    '''
        +----------------+
        |num_ride_10(num)|
        +----------------+
        |              10|
        |              20|
        |              30|
        +----------------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    4.2.2 ArrayType(StringType())

    rdd = sc.parallelize([['hello nice good a'], ['hello b'], ['nice c']])
    df = rdd.toDF(['line'])
    df.show()
    '''
        +-----------------+
        |             line|
        +-----------------+
        |hello nice good a|
        |          hello b|
        |           nice c|
        +-----------------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    # 返回ArrayType
    def split_line(line):
        return line.split(' ')
    
    # 规定返回数组类型中装string类型
    udf_split = spark.udf.register('udf_split', split_line, ArrayType(StringType()))
    # False 展示一行全部数据, 否则过长表示为'...'
    df.select(udf_split(df['line'])).show(truncate=False)
    '''
        +----------------------+
        |udf_split(line)       |
        +----------------------+
        |[hello, nice, good, a]|
        |[hello, b]            |
        |[nice, c]             |
        +----------------------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    4.2.3 StructType()

    import string
    '''
    返回字典类型
    将字典想像成一张表格(json),用StructType()接收
    '''
    
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(['num'])
    
    def get_letter(num):
        return {'num': num, 'letter': string.ascii_letters[num]}
    
    udf= spark.udf.register('udf', 
                            get_letter, 
                            StructType().add('num', IntegerType(), True)\
                                        .add('letter', StringType(), True)
                           )
    
    df.select(udf(df['num'])).show()
    '''
        +--------+
        |udf(num)|
        +--------+
        |  [1, b]|
        |  [2, c]|
        |  [3, d]|
        +--------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    5 UDAF 用户定义聚合函数

    • User- Defined Aggregation Funcation
    • 作用于多行数据, 相当于SQL中的count()、avg()
    # 后期用rdd.mapPartitions()完成聚合, 必须用单分区
    single_partition_rdd = df.rdd.repartition(1) 
    single_partition_rdd.collect()
    '''
        [Row(num=1), Row(num=2), Row(num=3)]
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    def process(iter):
        sum = 0
        for row in iter:
            sum += row['num']
        # !!!必须嵌套list
        return [sum]
    
    single_partition_rdd.mapPartitions(process).collect()
    '''
        [6]
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    6 开窗函数

    rdd = sc.parallelize([
        (1, 'a', 12),
        (2, 'b', 13),
        (3, 'c', 14),
        (1, 'd', 12),
        (2, 'e', 16)
    ])
    schema = StructType([
        StructField('class', IntegerType(), True),
        StructField('name', StringType(), True),
        StructField('age', LongType(), True)
    ])
    
    df = rdd.toDF(schema=schema)
    df.createOrReplaceTempView('stu')
    
    df.show()
    '''
        +-----+----+---+
        |class|name|age|
        +-----+----+---+
        |    1|   a| 12|
        |    2|   b| 13|
        |    3|   c| 14|
        |    1|   d| 12|
        |    2|   e| 16|
        +-----+----+---+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    6.1 聚合窗口函数

    '''
    聚合窗口函数
    聚合函数(列) over()
    
    avg() 聚合函数 将多行变为一行
    over() 窗口函数 将一行变为多行
    '''
    
    spark.sql("""
    select *, avg(age) over() as avg_age from stu
    
    """).show()
    '''
        +-----+----+---+-------+
        |class|name|age|avg_age|
        +-----+----+---+-------+
        |    3|   c| 14|   13.4|
        |    1|   d| 12|   13.4|
        |    2|   e| 16|   13.4|
        |    1|   a| 12|   13.4|
        |    2|   b| 13|   13.4|
        +-----+----+---+-------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    6.2 排序窗口函数

    '''
    排序窗口函数
    row_number() over(order by age desc) 按照age全局进行降序排序,追加一列名次
    dense_rank() over(partition by class order by age desc) 按class分区,区内按age排序,追加一列名次
    rank() over(order by age) 按照age全局进行升序排序,追加一列名次
    ntile(3) over(order by age desc) 按age均分成3份,每份中的数据都是一个排名
    
    row_number 出现相同会依次排序,
    rank 出现相同排序一样
    
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    spark.sql("""
    select *, 
           row_number() over(order by age desc) as row_number_rank,
           dense_rank() over(partition by class order by age desc) as dense_rank,
           rank() over(order by age) as rank
           
    from   stu
    """).show()
    '''
        +-----+----+---+---------------+----------+----+
        |class|name|age|row_number_rank|dense_rank|rank|
        +-----+----+---+---------------+----------+----+
        |    1|   a| 12|              4|         1|   1|
        |    1|   d| 12|              5|         1|   1|
        |    2|   b| 13|              3|         2|   3|
        |    3|   c| 14|              2|         1|   4|
        |    2|   e| 16|              1|         1|   5|
        +-----+----+---+---------------+----------+----+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    
    spark.sql("""
    select *, 
           ntile(3) over(order by age desc) as ntile_3
           
    from   stu
    """).show()
    '''
        +-----+----+---+-------+
        |class|name|age|ntile_3|
        +-----+----+---+-------+
        |    2|   e| 16|      1|
        |    3|   c| 14|      1|
        |    2|   b| 13|      2|
        |    1|   a| 12|      2|
        |    1|   d| 12|      3|
        +-----+----+---+-------+
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    7 DataFrame 缓存

    【Spark】PySpark RDD - RDD 缓存

    df.cache() # 优先缓存到内存,不够放硬盘
    
    • 1

    8 保存DataFrame 到hive

    if spark.catalog._jcatalog.tableExists(table):
    	# insertInto 不用partitionBy
    	pre_fea_importance_df.write.format("hive").mode("overwrite").insertInto(table)
    else:
    	pre_fea_importance_df.write.format("hive").mode("overwrite").partitionBy('l_r').saveAsTable(table)
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    Netty实践-- echo
    jar依赖批量上传Nexus服务器(二)
    Ubuntu 22.04 无法使用网易云音乐
    Eth-Trunk负载分担不均怎么办,如何通过Hash算法实现负载分担?
    【Linux系统满足产品实时性需求】
    6.Tomcat概述与部署
    AcWing 3. 完全背包问题 学习笔记
    解锁Spring Boot AOP的魔力:优雅地管理交叉关注点
    vulnhub靶机Vegeta
    百度搜索清理大量低质量网站
  • 原文地址:https://blog.csdn.net/qq_45249685/article/details/132985703