• Flink入门系列05-时间语义


    时间语义

    时间语义,是 flink 中用于时间推进和时间判断的机制,以什么为判断标准,就产生了两种不同的时间语义:

    • processiong time 处理时间语义
      指数据被 Operator 处理时所在机器的系统时间。遵循客观世界中时间特性:单调递增、恒定速度、永不停滞。
    • event time 事件时间语义
      指数据本身的业务时间(如用户行为日志时间),时间的推进完全由流入 flink 系统的数据来驱动。但事件事件可能停滞,速度可能不稳定。

    时间语义API

    flink 1.12及以后,flink 以event time 作为默认时间语义。在需要指定时间语义的相关操作(如时间窗口)时,可以通过显式的api来使用特定的时间语义。

    keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
    
    keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)))
    
    keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(30)))
    
    keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    如果需要禁用event time,则可以通过设置 watermark 生成频率间隔来实现:

    // 如果设置为0,则禁用了 watermark 的生成,从而失去了 event time 语义
    ExcutionConfig.setAutoWatermarkInterval(long);
    
    • 1
    • 2

    watermark

    数据在接收处理过程中,数据时间可能存在乱序,所以引入 watermark,就是在事件时间语义中,用于单调递增向前推进时间的一种标记。
    watermark的核心是在数据中周期性地插入一种时间戳单调递增的特殊数据(watermark),是 flink 内部自动产生并插入到数据流的,来不可逆转地在整个数据流中进行时间的推进。

    // watermark 的生命周期(默认值为200ms)
    env.getConfig().setAutoWatermarkInterval(200)
    
    • 1
    • 2

    watermark 生成策略

    策略1WatermarkStrategy.noWatermarks()  不生成 watermark,禁用了事件时间的推进机制
    策略2WatermarkStrategy.forMonotonousTimestamps()  紧跟最大事件时间
    策略3WatermarkStrategy.forBoundedOutOfOrderness()  允许乱序的 watermark生成策略
    策略4WatermarkStrategy.forGenerator()  自定义watermark生成算法
    
    • 1
    • 2
    • 3
    • 4

    例 一
    从最源头算子开始,生成watermark

    // 1、构造一个watermark的生成策略对象(算法策略,及事件时间的抽取方法)
    WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
            .<String>forBoundedOutOfOrderness(Duration.ofMillis(0))  // 允许乱序的算法策略
            .withTimestampAssigner((element, recordTimestamp) -> Long.parseLong(element.split(",")[2]));  // 时间戳抽取方法
    // 2、将构造好的 watermark策略对象,分配给流(source算子)
    s1.assignTimestampsAndWatermarks(watermarkStrategy);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    例 二
    不从最源头算子开始生成watermark,而是从中间环节的某个算子开始生成watermark。
    注意:如果在源头就已经生成了watermark, 就不要在下游再次产生watermark

    SingleOutputStreamOperator<EventBean> s2 = s1.map(s -> {
                String[] split = s.split(",");
                return new EventBean(Long.parseLong(split[0]), split[1], Long.parseLong(split[2]), split[3]);
            }).returns(EventBean.class)
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy
                            .<EventBean>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<EventBean>() {
                                @Override
                                public long extractTimestamp(EventBean eventBean, long recordTimestamp) {
                                    return eventBean.getTimeStamp();
                                }
                            })
            );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    每天五分钟机器学习:如何解决欠拟合问题
    《神经网络与深度学习》算法伪代码汇总
    在HBuilder X中ElementUI框架的搭建
    第一部分 Makefile介绍
    基于JSP实现的作业管理系统
    【STM32】电容触摸按键
    利用DMA的触发循环实现eTMR的PWM周期计数
    使用 JPA、Hibernate 和 Spring Data JPA 进行审计
    Python自行车租车系统设计与实现报告,基于Django+MySQL
    信息系统项目管理师Part11-软件设计
  • 原文地址:https://blog.csdn.net/qq_17310871/article/details/126539467