• PysparkNote103---window滑窗


    Intro

        window滑窗函数的使用,主要是应用在一些统计场景。举例:统计下面每个人每15分钟点击次数

    数据构造

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    
    • 1
    • 2
    def get_or_create(app_name):
        spark = SparkSession \
            .builder \
            .appName(app_name) \
            .config("spark.driver.maxResultSize", "10g") \
            .config("spark.sql.execution.arrow.enabled", "true") \
            .config("spark.dynamicAllocation.enabled", "false") \
            .config("spark.sql.crossJoin.enabled", "true") \
            .config("spark.kryoserializer.buffer.max", "512m") \
            .getOrCreate()
        spark.sparkContext.setLogLevel('ERROR')
        return spark
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    import pandas as pd
    
    • 1
    df = pd.DataFrame({'name': ['A', 'A', 'A', 'B', 'B'], 'click': [1, 1, 1, 1, 1],
                      'click_time': ['2022-08-01 11:59:59', '2022-08-01 12:02:00', '2022-08-01 12:03:59',
                                     '2022-08-01 12:00:10', '2022-08-01 12:02:10']})
    df
    
    • 1
    • 2
    • 3
    • 4
    nameclickclick_time
    0A12022-08-01 11:59:59
    1A12022-08-01 12:02:00
    2A12022-08-01 12:03:59
    3B12022-08-01 12:00:10
    4B12022-08-01 12:02:10
    spark = get_or_create('spark')
    df_spark = spark.createDataFrame(df)
    df_spark.show(truncate=False)
    
    • 1
    • 2
    • 3
    D:\code\spark\python\pyspark\sql\session.py:714: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true; however, failed by the reason below:
      PyArrow >= 0.8.0 must be installed; however, it was not found.
    Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.
      warnings.warn(msg)
    
    
    +----+-----+-------------------+
    |name|click|click_time         |
    +----+-----+-------------------+
    |A   |1    |2022-08-01 11:59:59|
    |A   |1    |2022-08-01 12:02:00|
    |A   |1    |2022-08-01 12:03:59|
    |B   |1    |2022-08-01 12:00:10|
    |B   |1    |2022-08-01 12:02:10|
    +----+-----+-------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    df_spark.printSchema()
    
    • 1
    root
     |-- name: string (nullable = true)
     |-- click: long (nullable = true)
     |-- click_time: string (nullable = true)
    
    • 1
    • 2
    • 3
    • 4

    滑窗统计

    滑窗左闭右开
    参数介绍:

    • timeColumn: The time column must be of :class:pyspark.sql.types.TimestampType 滑窗统计的时间字段有类型要求
    • windowDuration: Durations are provided as strings, e.g. ‘1 second’, ‘1 day 12 hours’, ‘2 minutes’. Valid
      interval strings are ‘week’, ‘day’, ‘hour’, ‘minute’, ‘second’, ‘millisecond’, ‘microsecond’.窗宽
    • slideDuration: If the slideDuration is not provided, the windows will be tumbling windows.滑窗长度,如果None就是滚动滑窗
    • startTime: The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
      window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
      past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes.
    df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time'))).show()
    
    • 1
    +----+-----+-------------------+-------------------+
    |name|click|         click_time|    click_timestamp|
    +----+-----+-------------------+-------------------+
    |   A|    1|2022-08-01 11:59:59|2022-08-01 11:59:59|
    |   A|    1|2022-08-01 12:02:00|2022-08-01 12:02:00|
    |   A|    1|2022-08-01 12:03:59|2022-08-01 12:03:59|
    |   B|    1|2022-08-01 12:00:10|2022-08-01 12:00:10|
    |   B|    1|2022-08-01 12:02:10|2022-08-01 12:02:10|
    +----+-----+-------------------+-------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time'))).printSchema()
    
    • 1
    root
     |-- name: string (nullable = true)
     |-- click: long (nullable = true)
     |-- click_time: string (nullable = true)
     |-- click_timestamp: timestamp (nullable = true)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    5分钟窗宽,每分钟滑一次

    df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time')))\
    .groupby(['name',F.window(timeColumn='click_timestamp', windowDuration='5 minute', slideDuration='1 minute', startTime=None)])\
    .agg(F.count('click').alias('cnt'),F.countDistinct('click').alias('dnt'))\
    .withColumn('window_start',F.col('window.start'))\
    .withColumn('window_end',F.col('window.end'))\
    .orderBy(F.col('name'),F.col('window_start').asc()).show(truncate=False)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    +----+------------------------------------------+---+---+-------------------+-------------------+
    |name|window                                    |cnt|dnt|window_start       |window_end         |
    +----+------------------------------------------+---+---+-------------------+-------------------+
    |A   |[2022-08-01 11:55:00, 2022-08-01 12:00:00]|1  |1  |2022-08-01 11:55:00|2022-08-01 12:00:00|
    |A   |[2022-08-01 11:56:00, 2022-08-01 12:01:00]|1  |1  |2022-08-01 11:56:00|2022-08-01 12:01:00|
    |A   |[2022-08-01 11:57:00, 2022-08-01 12:02:00]|1  |1  |2022-08-01 11:57:00|2022-08-01 12:02:00|
    |A   |[2022-08-01 11:58:00, 2022-08-01 12:03:00]|2  |1  |2022-08-01 11:58:00|2022-08-01 12:03:00|
    |A   |[2022-08-01 11:59:00, 2022-08-01 12:04:00]|3  |1  |2022-08-01 11:59:00|2022-08-01 12:04:00|
    |A   |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2  |1  |2022-08-01 12:00:00|2022-08-01 12:05:00|
    |A   |[2022-08-01 12:01:00, 2022-08-01 12:06:00]|2  |1  |2022-08-01 12:01:00|2022-08-01 12:06:00|
    |A   |[2022-08-01 12:02:00, 2022-08-01 12:07:00]|2  |1  |2022-08-01 12:02:00|2022-08-01 12:07:00|
    |A   |[2022-08-01 12:03:00, 2022-08-01 12:08:00]|1  |1  |2022-08-01 12:03:00|2022-08-01 12:08:00|
    |B   |[2022-08-01 11:56:00, 2022-08-01 12:01:00]|1  |1  |2022-08-01 11:56:00|2022-08-01 12:01:00|
    |B   |[2022-08-01 11:57:00, 2022-08-01 12:02:00]|1  |1  |2022-08-01 11:57:00|2022-08-01 12:02:00|
    |B   |[2022-08-01 11:58:00, 2022-08-01 12:03:00]|2  |1  |2022-08-01 11:58:00|2022-08-01 12:03:00|
    |B   |[2022-08-01 11:59:00, 2022-08-01 12:04:00]|2  |1  |2022-08-01 11:59:00|2022-08-01 12:04:00|
    |B   |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2  |1  |2022-08-01 12:00:00|2022-08-01 12:05:00|
    |B   |[2022-08-01 12:01:00, 2022-08-01 12:06:00]|1  |1  |2022-08-01 12:01:00|2022-08-01 12:06:00|
    |B   |[2022-08-01 12:02:00, 2022-08-01 12:07:00]|1  |1  |2022-08-01 12:02:00|2022-08-01 12:07:00|
    +----+------------------------------------------+---+---+-------------------+-------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    5分钟窗宽,滚动滑窗

    这里的滚动滑窗相当于滑动距离等于与窗宽的滑动滑窗

    df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time')))\
    .groupby(['name',F.window(timeColumn='click_timestamp', windowDuration='5 minute', slideDuration='5 minute', startTime=None)])\
    .agg(F.count('click').alias('cnt'),F.countDistinct('click').alias('dnt'))\
    .withColumn('window_start',F.col('window.start'))\
    .withColumn('window_end',F.col('window.end'))\
    .orderBy(F.col('name'),F.col('window_start').asc()).show(truncate=False)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    +----+------------------------------------------+---+---+-------------------+-------------------+
    |name|window                                    |cnt|dnt|window_start       |window_end         |
    +----+------------------------------------------+---+---+-------------------+-------------------+
    |A   |[2022-08-01 11:55:00, 2022-08-01 12:00:00]|1  |1  |2022-08-01 11:55:00|2022-08-01 12:00:00|
    |A   |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2  |1  |2022-08-01 12:00:00|2022-08-01 12:05:00|
    |B   |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2  |1  |2022-08-01 12:00:00|2022-08-01 12:05:00|
    +----+------------------------------------------+---+---+-------------------+-------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time')))\
    .groupby(['name',F.window(timeColumn='click_timestamp', windowDuration='5 minute', slideDuration=None, startTime=None)])\
    .agg(F.count('click').alias('cnt'),F.countDistinct('click').alias('dnt'))\
    .withColumn('window_start',F.col('window.start'))\
    .withColumn('window_end',F.col('window.end'))\
    .orderBy(F.col('name'),F.col('window_start').asc()).show(truncate=False)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    +----+------------------------------------------+---+---+-------------------+-------------------+
    |name|window                                    |cnt|dnt|window_start       |window_end         |
    +----+------------------------------------------+---+---+-------------------+-------------------+
    |A   |[2022-08-01 11:55:00, 2022-08-01 12:00:00]|1  |1  |2022-08-01 11:55:00|2022-08-01 12:00:00|
    |A   |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2  |1  |2022-08-01 12:00:00|2022-08-01 12:05:00|
    |B   |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2  |1  |2022-08-01 12:00:00|2022-08-01 12:05:00|
    +----+------------------------------------------+---+---+-------------------+-------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    startTime偏移量的使用

    这里的窗宽是从0分0秒开始计算,如果头铁,起点从1分开始算,e.g.统计1-6,6-11这样的窗宽咋整

    df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time')))\
    .groupby(['name',F.window(timeColumn='click_timestamp', windowDuration='5 minute', slideDuration='5 minute', startTime='1 minutes')])\
    .agg(F.count('click').alias('cnt'),F.countDistinct('click').alias('dnt'))\
    .withColumn('window_start',F.col('window.start'))\
    .withColumn('window_end',F.col('window.end'))\
    .orderBy(F.col('name'),F.col('window_start').asc()).show(truncate=False)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    +----+------------------------------------------+---+---+-------------------+-------------------+
    |name|window                                    |cnt|dnt|window_start       |window_end         |
    +----+------------------------------------------+---+---+-------------------+-------------------+
    |A   |[2022-08-01 11:56:00, 2022-08-01 12:01:00]|1  |1  |2022-08-01 11:56:00|2022-08-01 12:01:00|
    |A   |[2022-08-01 12:01:00, 2022-08-01 12:06:00]|2  |1  |2022-08-01 12:01:00|2022-08-01 12:06:00|
    |B   |[2022-08-01 11:56:00, 2022-08-01 12:01:00]|1  |1  |2022-08-01 11:56:00|2022-08-01 12:01:00|
    |B   |[2022-08-01 12:01:00, 2022-08-01 12:06:00]|1  |1  |2022-08-01 12:01:00|2022-08-01 12:06:00|
    +----+------------------------------------------+---+---+-------------------+-------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    简单的用法介绍完毕

                                    2022-08-03 于南京市江宁区九龙湖

  • 相关阅读:
    AndroidStudio推荐下载和配置
    vivo 短视频用户访问体验优化实践
    CUDA中的线程层次
    css 绘制 上,下,右,左箭头
    Linux系统Redis的安装
    安装注册使用gitlab-runner
    js中的数据结构:栈,队列,链表,字典&哈希表,树
    WebView2 通过 PuppeteerSharp 实现爬取 王者 壁纸 (案例版)
    如何看待unity新的收费模式
    2023/10/05 部分汇编指令
  • 原文地址:https://blog.csdn.net/wendaomudong_l2d4/article/details/126145262