• Flink 源码解读系列 DataStream 带 Watermark 生成的时间戳分配器


    传送门:Flink 系统性学习笔记


    Flink 1.10

    这篇文章主要从源码角度讲一下 Flink DataStream 中带 Watermark 生成的时间戳分配器。

    我们通常通过 DataStream 的 assignTimestampsAndWatermarks 方法分配时间戳并生成 Watermark。assignTimestampsAndWatermarks 方法可以传入两种时间戳分配器:

    • 周期性生成 Watermark 的时间戳分配器:AssignerWithPeriodicWatermarks
    • 断点式生成 Watermark 的时间戳分配器:AssignerWithPunctuatedWatermarks

    这两种时间戳分配器均是 TimestampAssigner 的子类,具体继承关系如下图所示。在为元素分配时间戳的基础之上增加了生成 Watermark 的逻辑,可以理解是一个实现 Watermark 生成逻辑的时间戳分配器。

    在这里插入图片描述

    1. TimestampAssigner

    时间戳分配器 TimestampAssigner 主要用来从元素中提取时间戳并为元素分配事件时间时间戳。TimestampAssigner 接口比较简单,只有一个 extractTimestamp 方法,最终会交由用户来实现提取时间戳的逻辑:

    public interface TimestampAssigner<T> extends Function {
       
      // 提取时间戳
    	long extractTimestamp(T element, long previousElementTimestamp);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    用户可能会比较疑惑数据里不是已经有时间戳了吗,为什么还要创建时间戳分配器来分配时间戳呢?这是因为原始的时间戳只是写入日志数据的一个字段,如果不提取出来并明确告诉 Flink,Flink 是无法知道数据真正产生的时间。当然,有些时候数据源本身就提供了时间戳信息,比如读取 Kafka 时,我们就可以从 Kafka 数据中直接获取时间戳,而不需要单独提取字段分配了。

    2. AssignerWithXXXWatermarks

    根据 Watermark 生成形式不同,分为周期性生成 Watermark 和断点式生成 Watermark 两种形式。周期性生成 Watermark 需要实现 AssignerWithPeriodicWatermarks 接口,而断点式生成 Watermark 则需要实现 AssignerWithPunctuatedWatermarks 接口。

    2.1 AssignerWithPeriodicWatermarks

    AssignerWithPeriodicWatermarks 接口是周期性生成 Watermark 的时间戳分配器通用实现接口。实现该接口的时间分配器会根据设定的时间间隔周期性生成 Watermark。可以通过如下方式设置生成 Watermark 的时间间隔周期,默认 200 毫秒:

    // 设置每 100ms 生成 Watermark
    env.getConfig().setAutoWatermarkInterval(100);
    
    • 1
    • 2

    AssignerWithPeriodicWatermarks 接口在 TimestampAssigner 基础之上增加了生成 Watermark 的方法 getCurrentWatermark:

    public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
       
    	@Nullable
    	Watermark getCurrentWatermark();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    根据设定的时间间隔系统会周期性的调用 getCurrentWatermark 方法,如果返回的 Watermark 非空并且大于前一个 Watermark 的时间戳,才会输出一个新的 Watermark。如果当前 Waternark 与前一个 Watermark 相同,那表示自上一次调用此方法以来,在事件时间上没有任何进展。如果返回一个空值,或者返回的 Watermark 的时间戳小于上次发出的时间戳,就不会生成新的 Watermark。

    在 Flink 中已经内置实现了两种周期性生成 Watermark 的时间戳分配器,分别是 AscendingTimestampExtractor 和 BoundedOutOfOrdernessTimestampExtractor。

    2.1.1 AscendingTimestampExtractor

    AscendingTimestampExtractor 是一个实现 AssignerWithPeriodicWatermarks 接口的抽象类,周期性生成 Watermark。这种时间分配器比较适合于事件按顺序生成,没有乱序的情况。

    AscendingTimestampExtractor 时间分配器不仅需要实现 AssignerWithPeriodicWatermarks 接口中的 getCurrentWatermark 方法来生成 Watermark,还需实现 TimestampAssigner 中的 extractTimestamp 方法来实现为元素分配时间戳:

    public abstract class AscendingTimestampExtract
    • 相关阅读:
      Java8-Java16部分重要新特性汇总
      01.jvm介绍
      AD教程 (十八)导入常见报错解决办法(unkonw pin及绿色报错等)
      hive中的join操作及其数据倾斜
      线性代数---第五章特征值和特征向量
      美团二面算法 之 串联所有单词的子串[困难]
      leetcode经典例题——滑动窗口最大值
      实战PyQt5: 140-QChart图表之烛台图
      python使用execjs利用jsdom来执行含有document的js代码方案(上)
      银行 Zabbix 监控架构分享
    • 原文地址:https://blog.csdn.net/SunnyYoona/article/details/126797894