• Flink watermark与乱序消息处理机制


    flink中,watermark用于标识数据当前的进度、触发窗口计算、通过延迟设置容忍部分数据的乱序,详细定义可见:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/

    那么,watermark具体如何计算以及怎样对乱序数据起作用?特此通过代码加以解析。

    注:下文中所涉及的flink源码版本为 release-1.15,使用flink sql作为例子。

    基于timestamp计算watermark

    首先,以event time为例,定义watermark的方式为:

    1. CREATE TABLE Orders (
    2. `user` BIGINT,
    3. product STRING,
    4. order_time TIMESTAMP(3),
    5. WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
    6. ) WITH ( . . . );

    该table的watermark策略为当前最大的时间戳 - 5 second,如下:

    Emits watermarks, which are the maximum observed timestamp minus the specified delay, e.g., WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND is a 5 seconds delayed watermark strategy.

    首先,我们来看watermark的生成机制,以BoundedOutOfOrderTimestamps为例:

    1. 从当前与历史输入中获取最大的timestamp
      1. @Override
      2. public void nextTimestamp(long timestamp) {
      3. if (timestamp > maxTimestamp) {
      4. maxTimestamp = timestamp;
      5. }
      6. }
    2.  依据step1中计算得到的timestamp与乱序容忍时间获取watermark
      1. @Override
      2. public Watermark getWatermark() {
      3. return new Watermark(maxTimestamp - delay);
      4. }

    乱序与window trigger

    有了watermark之后,就可以 进行窗口相关的操作了。

    以常用的tumble滚动窗口为例,什么时候会触发水位的计算呢,我们可以找到如下判断逻辑

    1. @Override
    2. public boolean onElement(Object element, long timestamp, W window) throws Exception {
    3. if (triggerTime(window) <= ctx.getCurrentWatermark()) {
    4. // if the watermark is already past the window fire immediately
    5. return true;
    6. } else {
    7. ctx.registerEventTimeTimer(triggerTime(window));
    8. return false;
    9. }
    10. }

    可以看到,当watermark超过窗口时,会触发计算。由于我们已经设置了乱序容忍即delay时间,因此,当所有event的最大timestamp超过窗口delay时间后,才会触发计算。

    对于那些延迟到达(即timestamp产生了回退的event)有两种可能:

    1. 在窗口计算被触发前到达,仍然会被计算,达到了容忍延迟的效果。
    2. 在窗口计算被触发后到达,直接被丢弃,不参与计算,因为已经超过了delay时间。

    watermark广播机制

    在实际的flink job中,我们会并行运行多个operator,同时每个operator有多个上下游input/output,此时,针对多个不同的输入watermark会进行合并处理,同时将计算得到的watermark下发至所有的下游operator(即所谓的广播)。

    flink中实际处理record与window关系的类是WindowOperator,其继承了AbstractStreamOperator,在AbstractStreamOperator中,通IndexedCombinedWatermarkStatus管理多个input产生的watermark:

    1. private void processWatermark(Watermark mark, int index) throws Exception {
    2. if (combinedWatermark.updateWatermark(index, mark.getTimestamp())) {
    3. processWatermark(new Watermark(combinedWatermark.getCombinedWatermark()));
    4. }
    5. }
    1. 依据record所属的index,更新对应的partial的watermark
    2. 更新combined的watermark,轮询所有非idle的partial,取最小值为总体的watermark
    1. public boolean updateCombinedWatermark() {
    2. long minimumOverAllOutputs = Long.MAX_VALUE;
    3. // if we don't have any outputs minimumOverAllOutputs is not valid, it's still
    4. // at its initial Long.MAX_VALUE state and we must not emit that
    5. if (partialWatermarks.isEmpty()) {
    6. return false;
    7. }
    8. boolean allIdle = true;
    9. for (PartialWatermark partialWatermark : partialWatermarks) {
    10. if (!partialWatermark.isIdle()) {
    11. minimumOverAllOutputs =
    12. Math.min(minimumOverAllOutputs, partialWatermark.getWatermark());
    13. allIdle = false;
    14. }
    15. }
    16. this.idle = allIdle;
    17. if (!allIdle && minimumOverAllOutputs > combinedWatermark) {
    18. combinedWatermark = minimumOverAllOutputs;
    19. return true;
    20. }
    21. return false;
    22. }

    从上述分析我们可以明晰以下概念:

    1. event的eventime或者processtime与delay一起决定了当前的watermark
    2. watermark用于触发窗口计算且永不回退
    3. 在窗口计算被触发前到达的乱序数据仍然参与计算,否则被丢弃
    4. 窗口一经触发即被丢弃,不会重复计算
    5. 多个非idle的input的watermark最小值确定了整体的watermark
  • 相关阅读:
    Mybatis-分页插件
    移动跨平台技术方案浅析
    喜爱拍拍宝宝照片的,一定要制作照片书方便保存
    Vue2-replace属性、编程式路由导航、缓存路由组件、两个新的生命周期钩子、路由守卫、路由器工作模式
    【每日一题】528. 按权重随机选择
    《深度学习之模型设计:核心算法与案例实践》知识记录
    用c语言编写出三底模型
    前端设计模式
    Brief. Bioinformatics2023 | 利用深度学习和分子动力学模拟设计抗菌肽
    深度学习入门-卷积神将网络(CNN)
  • 原文地址:https://blog.csdn.net/kakaweb/article/details/127927524