• Structured Streaming系列-6、事件时间窗口分析


    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    传送门:大数据系列文章目录

    官方网址http://spark.apache.org/http://spark.apache.org/sql/

    在这里插入图片描述

    SparkStreaming中窗口统计分析: Window Operation(设置窗口大小WindowInterval和滑动大小SlideInterval),按照Streaming 流式应用接收数据的时间进行窗口设计的,其实是不符合实际应用场景的。

    例如,在物联网数据平台中,每个设备产生的数据,其中包含数据产生的时间,然而数据需要经过一系列采集传输才能被流式计算框架处理: SparkStreaming,此过程需要时间的,再按照处理时间来统计业务的时候,准确性降低,存在不合理性。

    在结构化流Structured Streaming中窗口数据统计时间是基于数据本身事件时间EventTime字段统计,更加合理性,官方文档:

    http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#window-operations-on-event-time

    时间概念

    在Streaming流式数据处理中,按照时间处理数据,其中时间有三种概念:

    1. 事件时间EventTime,表示数据本身产生的时间,该字段在数据本身中。
    2. 注入时间IngestionTime,表示数据到达流式系统时间,简而言之就是流式处理系统接收到数据的时间。
    3. 处理时间ProcessingTime,表示数据被流式系统真正开始计算操作的时间。

    在这里插入图片描述
    不同流式计算框架支持时间不一样, SparkStreaming框架仅仅支持处理时间ProcessTime,StructuredStreaming支持事件时间和处理时间, Flink框架支持三种时间数据操作,实际项目中往往针对【事件时间EventTime】进行数据处理操作,更加合理化。

    event-time 窗口分析

    基于事件时间窗口聚合操作:基于窗口的聚合(例如每分钟事件数) 只是事件时间列上特殊类型的分组和聚合,其中每个时间窗口都是一个组,并且每一行可以属于多个窗口/组

    事件时间EventTime是嵌入到数据本身中的时间,数据实际真实产生的时间。例如,如果希望获得每分钟由物联网设备生成的事件数,那么可能希望使用生成数据的时间(即数据中的事件时间event time),而不是Spark接收数据的时间(receive time/archive time)。

    这个事件时间很自然地用这个模型表示,设备中的每个事件(Event)都是表中的一行(Row),而事件时间(Event Time)是行中的一列值(Column Value)。

    因此,这种基于事件时间窗口的聚合查询既可以在静态数据集(例如,从收集的设备事件日志中)上定义,也可以在数据流上定义,从而使用户的使用更加容易。

    修改词频统计程序,数据流包含每行数据以及生成每行的时间。 希望在10分钟的窗口内对单词进行计数,每5分钟更新一次,如下图所示:

    在这里插入图片描述
    单词在10分钟窗口【12:00-12:10、 12:05-12:15、 12:10-12:20】等之间接收的单词中计数。注意,【12:00-12:10】表示处理数据的事件时间为12:00之后但12:10之前的数据。思考一下, 12:07的一条数据,应该增加对应于两个窗口12:00-12:10和12:05-12:15的计数

    基于事件时间窗口统计有两个参数索引: 分组键(如单词) 和窗口(事件时间字段) 。

    在这里插入图片描述
    为了演示案例,将上述案例中的每5分钟统计最近10分钟窗口改为每5秒统计最近10秒窗口数据,测试数据集:

    2012-10-20 09:00:02,cat dog
    2019-10-12 09:00:03,dog dog
    
    2019-10-12 09:00:07,owl cat
    
    2019-10-12 09:00:11,dog
    2019-10-12 09:00:13,owl
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    案例中三个时间范围,说明如下:

    1、触发时间间隔, trigger interval: 5(案例: 5分钟)
    2、事件时间窗口大小, window interval: 10秒(案例: 10分钟)
    3、滑动大小, slider interval: 5秒(案例: 5分钟)
    
    • 1
    • 2
    • 3

    官方案例演示代码如下:

    package window
    
    import java.sql.Timestamp
    
    import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
      * 基于Structured Streaming 模块读取TCP Socket读取数据,进行事件时间窗口统计词频WordCount,将结果打印到控制台
      *      TODO:每5秒钟统计最近10秒内的数据(词频:WordCount)
      *
      * EventTime即事件真正生成的时间:
      *  例如一个用户在10:06点击 了一个按钮,记录在系统中为10:06
      *  这条数据发送到Kafka,又到了Spark Streaming中处理,已经是10:08,这个处理的时间就是process Time。
      *
      */
    object StructuredWindow {
    
      def main(args: Array[String]): Unit = {
    
        // 1. 构建SparkSession实例对象,传递sparkConf参数
        val spark: SparkSession = SparkSession.builder()
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          .master("local[2]")
          .config("spark.sql.shuffle.partitions", "2")
          .getOrCreate()
        // 导入隐式转换及函数库
        import org.apache.spark.sql.functions._
        import spark.implicits._
        spark.sparkContext.setLogLevel("WARN")
    
        // 2. 使用SparkSession从TCP Socket读取流式数据
        val inputStreamDF: DataFrame = spark.readStream
          .format("socket")
          .option("host", "localhost")
          .option("port", 9999)
          .load()
    
        // 3. 针对获取流式DStream进行词频统计
        val resultStreamDF = inputStreamDF
          // 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
          .as[String]
          // 过滤无效数据
          .filter(line => null != line && line.trim.length > 0)
          // 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
          .flatMap{line =>
          val arr = line.trim.split(",")
          arr(1).split("\\s+").map(word => (Timestamp.valueOf(arr(0)), word))
        }  /**
          *      2019-10-12 09:00:02    cat
          *      2019-10-12 09:00:02    dog
          */
          // 设置列的名称
          .toDF("insert_timestamp", "word")
          // TODO:设置基于事件时间(event time)窗口 -> insert_timestamp, 每5秒统计最近10秒内数据
          /*
                1. 先按照窗口分组
                2. 再对窗口中按照单词分组
                3. 最后使用聚合函数聚合
           */ // reduceByKeyAndWindows
          .groupBy(
            //按照数据中的事件时间构建窗口
            window($"insert_timestamp", "10 seconds", "5 seconds"),
            $"word"
          )
          .count()
          // 按照窗口字段降序排序
          .orderBy($"window")
    
        /*
            root
             |-- window: struct (nullable = true)
             |    |-- start: timestamp (nullable = true)
             |    |-- end: timestamp (nullable = true)
             |-- word: string (nullable = true)
             |-- count: long (nullable = false)
         */
        //resultStreamDF.printSchema()
    
        // 4. 将计算的结果输出,打印到控制台
        val query: StreamingQuery = resultStreamDF.writeStream
          .outputMode(OutputMode.Complete())
          .format("console")
          .option("numRows", "100")
          .option("truncate", "false")
          .trigger(Trigger.ProcessingTime("5 seconds"))
          .start()  // 流式DataFrame,需要启动
        // 查询器一直等待流式应用结束
        query.awaitTermination()
        query.stop()
      }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94

    运行上述基于事件时间Event Time窗口统计流式应用,演示效果图如下所示:
    在这里插入图片描述

    event-time 窗口生成

    Structured Streaming中如何依据EventTime事件时间生成窗口的呢?查看类TimeWindowing源码中生成窗口规则:

    org.apache.spark.sql.catalyst.analysis.TimeWindowing
    // 窗口个数
    /* 最大的窗口数 = 向下取整(窗口长度/滑动步长) */
    maxNumOverlapping <- ceil(windowDuration / slideDuration)
    for (i <- 0 until maxNumOverlapping)
    	/**
    		timestamp是event-time 传进的时间戳
    		startTime是window窗口参数,默认是0 second 从时间的0s
    		含义: event-time从1970年...有多少个滑动步长,如果说浮点数会向上取整
    	*/
    	windowId <- ceil((timestamp - startTime) / slideDuration)
    	/**
    		windowId * slideDuration 向上取能整除滑动步长的时间
    		(i - maxNumOverlapping) * slideDuration 每一个窗口开始时间相差一个步长
    	*/	
    	windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
    	windowEnd <- windowStart + windowDuration
    	return windowStart, windowEnd
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这里插入图片描述

    在这里插入图片描述

    延迟数据处理

    Structed Streaming与Spark Streaming相比一大特性就是支持基于数据中的时间戳的数据处理。也就是在处理数据时,可以对记录中的eventTime事件时间字段进行考虑。因为eventTime更好的代表数据本身的信息,且可以借助eventTime处理比预期晚到达的数据,但是需要有一个限度(阈值),不能一直等,应该要设定最多等多久。

    延迟数据

    在很多流计算系统中,数据延迟到达(the events arrives late to the application)的情况很常见,并且很多时候是不可控的,因为很多时候是外围系统自身问题造成的。 Structured Streaming可以保证一条旧的数据进入到流上时,依然可以基于这些“迟到”的数据重新计算并更新计算结果。

    在这里插入图片描述
    上图中在12:04(即事件时间) 生成的单词可能在12:11被应用程序接收,此时,应用程序应使用时间12:04而不是12:11更新窗口12:00-12:10的旧计数。但是会出现如下两个问题:

    问题一:延迟数据计算是否有价值

    • 如果某些数据,延迟很长时间(如30分钟)才到达流式处理系统,数据还需要再次计算吗?计算的结果还有价值吗?原因在于流式处理系统处理数据关键核心在于实时性
    • 实践表明,流计算关注的是近期数据,更新一个很早之前的状态往往已经不再具有很大的业务价值;

    问题二:以前状态保存浪费资源

    • 实时统计来说,如果保存很久以前的数据状态,很多时候没有作用的,反而浪费大量资源

    Spark 2.1引入的watermarking允许用户指定延迟数据的阈值,也允许引擎清除掉旧的状态。即根据watermark机制来设置和判断消息的有效性,如可以获取消息本身的时间戳, 然后根据该时间戳来判断消息的到达是否延迟(乱序)以及延迟的时间是否在容忍的范围内(延迟的数据是否处理)。

    Watermarking 水位

    水位watermarking官方定义:

    lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly.
    
    • 1

    翻译:让Spark SQL引擎自动追踪数据中当前事件时间EventTime,依据规则清除旧的状态数据。

    通过指定event-time列(上一批次数据中EventTime最大值) 和预估事件的延迟时间上限(Threshold) 来定义一个查询的水位线watermark。

    Watermark = MaxEventTime - Threshod
    
    • 1
    • 第一点: 执行第一批次数据时, Watermarker为0,所以此批次中所有数据都参与计算;
    • 第二点: Watermarker值只能逐渐增加,不能减少;
    • 第三点:Watermark机制主要解决处理聚合延迟数据和减少内存中维护的聚合状态;
    • 第四点: 设置Watermark以后,输出模式OutputMode只能是Append和Update;

    如下方式设置阈值Threshold,计算每批次数据执行时的水位Watermark:
    在这里插入图片描述

    看一下官方案例:词频统计WordCount,设置阈值Threshold为10分钟,每5分钟触发执行一次。
    在这里插入图片描述

    • 延迟到达但没有超过watermark: (12:08, dog)

    在12:20触发执行窗口(12:10-12:20)数据中, (12:08, dog) 数据是延迟数据, 阈值Threshold设定为10分钟,此时水位线【Watermark = 12:14 - 10m = 12:04】,因为12:14是上个窗口(12:05-12:15)中接收到的最大的事件时间,代表目标系统最后时刻的状态,由于12:08在12:04之后,因此被视为“虽
    然迟到但尚且可以接收”的数据而被更新到了结果表中,也就是(12:00 -12:10, dog, 2)和(12:05 - 12:11,dog, 3)

    在这里插入图片描述

    • 超出watermark: (12:04, donkey)

    在12:25触发执行窗口(12:15-12:25)数据中, (12:04, donkey)数据是延迟数据,上个窗口中接收到最大的事件时间为12:21, 此时水位线【Watermark = 12:21 - 10m = 12:11】 ,而(12:04, donkey)比这个值还要早,说明它”太旧了”,所以不会被更新到结果表中了。

    在这里插入图片描述
    设置水位线Watermark以后,不同输出模式OutputMode,结果输出不一样:

    • Update模式:总是倾向于“尽可能早”的将处理结果更新到sink,当出现迟到数据时,早期的某个计算结果将会被更新;
    • Append模式:推迟计算结果的输出到一个相对较晚的时刻,确保结果是稳定的,不会再被更新,比如: 12:00 - 12:10窗口的处理结果会等到watermark更新到12: 11之后才会写入到sink。

    如果用于接收处理结果的sink不支持更新操作,则只能选择Append模式。

    官方案例演示

    编写代码,演示官方案例,如下几点注意:

    1、该outputMode为update模式,即只会输出那些有更新的数据!!
    2、该开窗窗口长度为10 min,步长5 min,水印为eventtime-10 min,(需理解开窗规则)
    3、官网案例trigger(Trigger.ProcessingTime("5 minutes")),但是测试的时候不建议使用这个
    4、未输出数据不代表已经在内存中被剔除,只是由于update模式的原因
    5、建议比对append理解水印
    
    • 1
    • 2
    • 3
    • 4
    • 5

    测试数据:

    dog,2019-10-10 12:00:07
    owl,2019-10-10 12:00:08
    
    dog,2019-10-10 12:00:14
    cat,2019-10-10 12:00:09
    
    cat,2019-10-10 12:00:15
    dog,2019-10-10 12:00:08
    owl,2019-10-10 12:00:13
    owl,2019-10-10 12:00:21
    
    owl,2019-10-10 12:00:17
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    具体案例代码如下:

    package window
    
    import java.sql.Timestamp
    
    import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
      * 基于Structured Streaming 读取TCP Socket读取数据,事件时间窗口统计词频,将结果打印到控制台
      *      TODO:每5秒钟统计最近10秒内的数据(词频:WordCount),设置水位Watermark时间为10秒
      */
    object StructuredWatermarkUpdate {
    
      def main(args: Array[String]): Unit = {
    
        // 1. 构建SparkSession实例对象,传递sparkConf参数
        val spark: SparkSession =  SparkSession.builder()
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          .master("local[2]")
          .config("spark.sql.shuffle.partitions", "2")
          .getOrCreate()
        // b. 导入隐式转换及函数库
        import org.apache.spark.sql.functions._
        import spark.implicits._
        spark.sparkContext.setLogLevel("WARN")
    
        // 2. 使用SparkSession从TCP Socket读取流式数据
        val inputStreamDF: DataFrame = spark.readStream
          .format("socket")
          .option("host", "localhost")
          .option("port", 9999)
          .load()
    
        // 3. 针对获取流式DStream设置EventTime窗口及Watermark水位限制
        val resultStreamDF = inputStreamDF
          // 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
          .as[String]
          // 过滤无效数据
          .filter(line => null != line && line.trim.length > 0)
          // 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
          .map{line =>
          val arr = line.trim.split(",")
          (arr(0), Timestamp.valueOf(arr(1)))
        }
          // 设置列的名称
          .toDF("word", "time")
          // TODO:设置水位Watermark
          .withWatermark("time", "10 seconds")
          // TODO:设置基于事件时间(event time)窗口 -> time, 每5秒统计最近10秒内数据
          .groupBy(
            window($"time", "10 seconds", "5 seconds"),
            $"word"
        ).count()
    
        // 4. 将计算的结果输出,打印到控制台
        val query: StreamingQuery = resultStreamDF.writeStream
          .outputMode(OutputMode.Update())
          .format("console")
          .option("numRows", "100")
          .option("truncate", "false")
          .trigger(Trigger.ProcessingTime("5 seconds"))
          .start()  // 流式DataFrame,需要启动
        // 查询器一直等待流式应用结束
        query.awaitTermination()
        query.stop()
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
  • 相关阅读:
    Sovit3D数字孪生智慧海上风电场3D可视化管理平台
    论文总结-交通预测(未完成)
    git本地项目与远程仓库建立连接
    [Model.py 02] 地图按比例放大的实现
    莞中 2022暑假训练题04:树型DP
    安防监控视频AI智能分析网关:人流量统计算法的应用场景汇总
    从CPU100%高危故障到稳定在10%:一个月的优化之旅,成功上线!
    通过内网穿透实现文件共享,Python—行代码轻松实现公网访问
    Go Mac配置Air热加载
    TCP重传机制、滑动窗口、流量控制、拥塞控制
  • 原文地址:https://blog.csdn.net/l848168/article/details/127426590