• Flink Java 之 Watermark(触发原理, 使用)


    示例代码

    package com.daidai.watermarks;
    
    import com.daidai.source.mocksource.domain.Order;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    import java.time.Duration;
    import java.util.Random;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    public class WaterMarksTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<Order> dataStreamSource = env.addSource(new SourceFunction<Order>() {
                private boolean flag = true;
    
                @Override
                public void run(SourceContext<Order> sourceContext) throws Exception {
                    Random random = new Random();
                    while (flag) {
                        Thread.sleep(1000);
                        String id = UUID.randomUUID().toString();
                        int userId = random.nextInt(3);
                        int money = random.nextInt(101);
                        //模拟乱序时间
                        long createTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
                        sourceContext.collect(new Order(id, userId, money, createTime));
                        //模拟数据延迟
                        TimeUnit.MILLISECONDS.sleep(1000);
                    }
                }
    
                @Override
                public void cancel() {
                    flag = false;
                }
            });
    
            SingleOutputStreamOperator<Order> assignTimestampsAndWatermarks = dataStreamSource
                    .assignTimestampsAndWatermarks(
                            WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                    .withTimestampAssigner(
                                            ((event, timestamp) ->
                                            event.getCreateTime())));
    
            SingleOutputStreamOperator<Order> money = assignTimestampsAndWatermarks.keyBy(Order::getUserId)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .sum("money");
    
            money.print();
    
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    解析

    1. 什么是Watermaker?

    Watermaker就是给数据再额外的加的一个时间列,也就是Watermaker是个时间戳。

    2. 如何计算Watermaker?

    Watermaker = 数据的事件时间 - 最大允许的延迟时间或乱序时间
    准确来说:
    Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
    这样可以保证Watermaker水位线会一直上升(变大),不会下降。

    3. Watermaker有什么用?

    之前的窗口都是按照系统时间来触发计算的,如: [10:00:00 ~ 10:00:10) 的窗口,一但系统时间到了10:00:10就会触发计算,那么可能会导致延迟到达的数据丢失!
    那么现在有了Watermaker,窗口就可以按照Watermaker来触发计算! 也就是说Watermaker是用来触发窗口计算的!

    4. Watermaker如何触发窗口计算的?

    窗口计算的触发条件为:

    1. 窗口中有数据
    2. Watermaker >= 窗口的结束时间

    因为前面说到Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间也就是说只要不断有数据来,就可以保证Watermaker水位线是会一直上升/变大的,不会下降/减小的所以最终一定是会触发窗口计算的。

    注意:
    上面的触发公式进行如下变形:

    Watermaker >= 窗口的结束时间
    Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
    当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 >= 窗口的结束时间
    当前窗口的最大的事件时间 >= 窗口的结束时间 + 最大允许的延迟时间或乱序时间

    5. 图解Watermaker

    在这里插入图片描述
    在这里插入图片描述

    6. 内置 Watermark 生成器

    6.1 单调递增时间戳分配器

    周期性 watermark 生成方式的一个最简单特例就是你给定的数据源中数据的时间戳升序出现。在这种情况下,当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。

    注意:在 Flink 应用程序中,如果是并行数据源,则只要求并行数据源中的每个单分区数据源任务时间戳递增。例如,设置每一个并行数据源实例都只读取一个 Kafka 分区,则时间戳只需在每个 Kafka 分区内递增即可。Flink 的 watermark 合并机制会在并行数据流进行分发(shuffle)、联合(union)、连接(connect)或合并(merge)时生成正确的 watermark。

    WatermarkStrategy.forMonotonousTimestamps();
    
    • 1

    6.2 数据之间存在最大固定延迟的时间戳分配器

    另一个周期性 watermark 生成的典型例子是,watermark 滞后于数据流中最大(事件时间)时间戳一个固定的时间量。该示例可以覆盖的场景是你预先知道数据流中的数据可能遇到的最大延迟,例如,在测试场景下创建了一个自定义数据源,并且这个数据源的产生的数据的时间戳在一个固定范围之内。Flink 针对上述场景提供了 boundedOutfordernessWatermarks 生成器,该生成器将 maxOutOfOrderness 作为参数,该参数代表在计算给定窗口的结果时,允许元素被忽略计算之前延迟到达的最长时间。其中延迟时长就等于 t - t_w ,其中 t 代表元素的(事件时间)时间戳,t_w 代表前一个 watermark 对应的(事件时间)时间戳。如果 lateness > 0,则认为该元素迟到了,并且在计算相应窗口的结果时默认会被忽略。有关使用延迟元素的详细内容,请参阅有关允许延迟的文档

    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
    
    • 1

    7. Watermark 策略与 Kafka 连接器

    当使用 Apache Kafka 连接器作为数据源时,每个 Kafka 分区可能有一个简单的事件时间模式(递增的时间戳或有界无序)。然而,当使用 Kafka 数据源时,多个分区常常并行使用,因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式(这是 Kafka 消费客户端所固有的)。

    在这种情况下,你可以使用 Flink 中可识别 Kafka 分区的 watermark 生成机制。使用此特性,将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。

    例如,如果每个 Kafka 分区中的事件时间戳严格递增,则使用单调递增时间戳分配器按分区生成的 watermark 将生成完美的全局 watermark。注意,我们在示例中未使用 TimestampAssigner,而是使用了 Kafka 记录自身的时间戳。

    下图展示了如何使用单 kafka 分区 watermark 生成机制,以及在这种情况下 watermark 如何通过 dataflow 传播。

    FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
    kafkaSource.assignTimestampsAndWatermarks(
            WatermarkStrategy
                    .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
    
    DataStream<MyType> stream = env.addSource(kafkaSource);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    8. 可以弃用 AssignerWithPeriodicWatermarks 和AssignerWithPunctuatedWatermarks 了

    在 Flink 新的 WatermarkStrategy,TimestampAssigner 和 WatermarkGenerator 的抽象接口之前,Flink 使用的是 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks。你仍可以在 API 中看到它们,但建议使用新接口,因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式。

  • 相关阅读:
    Unreal Engine(虚幻引擎)渲染 – 正确使用方法
    Spring配置类为什么要分Full和Lite模式
    工业相机基本知识理解:帧率、带宽(数据接口)、图像数据格式
    【Redis】专栏合集,从入门到高级业务场景实战
    每个后端都应该了解的OpenResty入门以及网关安全实战
    mysql语句locate与substring联合使用方法
    Prompt Tuning训练过程
    metinfo_5.0.4 EXP Python脚本编写
    2019-2021年上市公司润灵ESG评分评级数据
    python处理csv文件
  • 原文地址:https://blog.csdn.net/weixin_46376562/article/details/125604937