• Flink Watermark机制


    1. Watermark是什么?用来解决什么问题?

    Flink里涉及两个重要的时间,Processing Time(处理时间) 和 Event Time(事件时间),而支持事件时间的流处理器需要一种方法来度量事件时间的进度。例如,当事件时间超过一小时后,需要通知构建每小时窗口的窗口操作符,以便该操作符可以关闭正在运行的窗口。怎么确定一个窗口是否已经结束,这在流式数据处理系统中并非一个很容易解决的问题。如果窗口是基于处理时间的,那么问题确实容易解决,因为处理时间是完全基于本地时钟的;但是如果窗口基于事件时间,由于分布式系统中消息可能存在延迟、乱序到达的问题,即便系统已经接收到窗口边界以外的数据了,也不能确定前面的所有数据都已经到达了。水位线(Watermark)机制就是用于解决这个问题的。

    Watermark就是在使用事件时间时在内部度量进度的一种机制(本质就是个时间戳,详见org.apache.flink.streaming.api.watermark.Watermark extends StreamElement),换句话说,在处理使用事件时间属性的数据流时,Watermark 是系统测量数据处理进度的一种方法。Watermaker作为数据流的一部分携带着一个时间戳t。一个Watermark(t)表示在这个流中事件时间已经到达t,这意味着所有携带时间戳t'<=t的元素都已经到达,Watermark(t)后的元素的时间戳都应该 >t。即它定义了何时不再等待更早的数据。

    有了 Watermark,系统就可以确定使用事件时间的窗口是否已经完成。但是 Watermark 只是一种度量指标,系统借由它来评估当前的进度,并不能完全保证不会出现小于当前 Watermark 的消息。对于这种消息,即“迟到”的消息,需要进行特殊的处理。详见下面的部分。

    Watermark解决了什么?是为了解决数据到来的乱序以及延迟等问题。当基于事件时间的数据流进⾏计算时,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于设备故障、网络、背压等原因,导致乱序和延迟的产生。即乱序和延迟等问题导致我们无法判断当前的进度,而Watermark就代表了这个时间进度,从而让我们有了处理的基准,这也是窗口操作的基础。

    2. Watermark的生成和使用

    (时间戳和watermark都是自1970-01-01T00:00:00Z以来的毫秒数)

    2.1. Watermark Strategies介绍

    为了处理事件时间,Flink需要知道事件时间戳,这意味着需要为流中的每个元素分配事件时间戳。这通常通过使用TimestampAssigner从元素的某个字段中访问/提取时间戳来完成。

    时间戳分配与生成Watermark密切相关,Watermark告诉系统事件时间的进度。你可以通过指定一个WatermarkGenerator来对此进行配置。

    使用Flink Watermark API时期望一个包含TimestampAssigner和WatermarkGenerator的WatermarkStrategy。许多常用的策略都可以作为WatermarkStrategy上的静态方法使用,但用户也可以在需要时构建自己的策略。

    在Flink应用中有两个地方使用WatermarkStrategy:

    -》直接在数据源上
    -》数据源操作后(应该只在你不能直接在源上设置一个策略时使用)

    第一种选择是更好的,因为它允许源代码在做watermarking逻辑中利用关于shards/partitions/splits的信息。然后,源通常可以在更精细的水平上跟踪watermark,源产生的整体watermark将更加准确。直接在源上指定一个水印策略通常意味着你必须使用一个源特定的接口。这里可以参考WatermarkStrategy和Kafka连接器来了解它是如何在Kafka连接器上工作的,以及关于每个分区的watermark如何在那里工作的更多细节。

    使用WatermarkStrategy的示例:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<MyEvent> stream = env.readFile(
            myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
            FilePathFilter.createDefaultFilter(), typeInfo);

    DataStream<MyEvent> withTimestampsAndWatermarks = stream
            .filter( event -> event.severity() == WARNING )
            .assignTimestampsAndWatermarks(<watermark strategy>);

    withTimestampsAndWatermarks
            .keyBy( (event) -> event.getGroup() )
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
            .reduce( (a, b) -> a.add(b) )
            .addSink(...);

    上面的代码以这种方式使用WatermarkStrategy获取一个流,并产生一个带有时间戳元素和watermark的新流。如果原始流已经具有时间戳和/或watermark,则时间戳赋值器将覆盖它们。

    2.2. 编写WatermarkGenerators

    TimestampAssigner很简单,该接口里就只有一个方法extractTimestamp,这里主要介绍WatermarkGenerator接口。

    /**
     * The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a
     * fixed interval).
     *
     * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code
     * AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
     */
    @Public
    public interface WatermarkGenerator<T> {

        /**
         * Called for every event, allows the watermark generator to examine and remember the event
         * timestamps, or to emit a watermark based on the event itself.
         */
        void onEvent(T event, long eventTimestamp, WatermarkOutput output);

        /**
         * Called periodically, and might emit a new watermark, or not.
         *
         * <p>The interval in which this method is called and Watermarks are generated depends on {@link
         * ExecutionConfig#getAutoWatermarkInterval()}.
         */
        void onPeriodicEmit(WatermarkOutput output);
    }

    Watermark的生成有两种不同的方式:periodic 和 punctuated。

    periodic生成器通常通过onEvent()观察传入的事件,然后在框架调用onPeriodicEmit()时发出水印。
    puncutated生成器将查看onEvent()中的事件,并等待流中携带水印信息的特殊标记事件或标点。当它看到这些事件之一,它立即发出水印。通常,puncutated生成器不会从onPeriodicEmit()中发出水印。
    即两者产生watermark的地方和方式有些不同。
    -》实现Periodic WatermarkGenerator

    periodic生成器观察流事件并周期性地生成watermark(可能取决于流元素,也可能纯粹基于处理时间)。

    生成watermark的间隔(每n毫秒)是通过ExecutionConfig.setAutoWatermarkInterval(…)来定义的。每次都会调用生成器的onPeriodicEmit()方法,如果返回的watermark非空且大于前一个watermark,则会触发一个新的watermark。下面是两个简单的示例:

    /**
     * This generator generates watermarks assuming that elements arrive out of order,
     * but only to a certain degree. The latest elements for a certain timestamp t will arrive
     * at most n milliseconds after the earliest elements for timestamp t.
     */
    public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {

        private final long maxOutOfOrderness = 3500; // 3.5 seconds

        private long currentMaxTimestamp;

        @Override
        public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
            currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // emit the watermark as current highest timestamp minus the out-of-orderness bound
            output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
        }

    }

    /**
     * This generator generates watermarks that are lagging behind processing time 
     * by a fixed amount. It assumes that elements arrive in Flink after a bounded delay.
     */
    public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {

        private final long maxTimeLag = 5000; // 5 seconds

        @Override
        public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
            // don't need to do anything because we work on processing time
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
        }
    }

    -》实现Punctuated WatermarkGenerator

    puncutated生成器将观察事件流,并在它看到携带水印信息的特殊元素时发出水印。

    注意:可以在每个事件上生成watermark。但是,由于每一个watermark都会引起下游的一些计算,所以过多的watermark会降低性能。

    示例如下:

    public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

        @Override
        public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
            if (event.hasWatermarkMarker()) {
                output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
            }
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // don't need to do anything because we emit in reaction to events above
        }
    }

    2.3. operator如何处理watermark

    一般来说,operator需要在将给定watermark转发到下游之前对其进行完整处理。例如,WindowOperator将首先计算所有应该触发的窗口,只有在生成watermark触发的所有输出后,watermark本身才会被发送到下游。换句话说,由于出现watermark而产生的所有元素将在watermark之前发出。
    同样的规则也适用于TwoInputStreamOperator。然而,在这种情况下,运算符的当前watermark被定义为其两个输入的最小值。
    细节详见:
    OneInputStreamOperator#processWatermark,TwoInputStreamOperator#processWatermark1,TwoInputStreamOperator#processWatermark2

     2.4. 迟到的数据处理机制

    1. 丢弃(默认)
    2. allowedLateness 指定允许数据延迟的时间
    在某些情况下,我们希望对迟到的数据再提供一个宽容的时间。Flink 提供了 allowedLateness 方法可以实现对迟到的数据设置一个延迟时间,在指定延迟时 间内到达的数据还是可以触发 window 执行的。调用 .allowedLateness(Time lateness)
    3. sideOutputLateData 收集迟到的数据
    通过 sideOutputLateData 可以把迟到的数据统一收集,统一存储,方便后期排查问题。该⽅法会将延迟的数据发送到给定 OutputTag 的 side output 中去,然后你可以通过 SingleOutputStreamOperator.getSideOutput(OutputTag)  来获取这些延迟的数据。

    2.5. API中Watermark使用

    示例1:使用Periodic Watermark

    public class TestPeriodicWatermark {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.getConfig().setAutoWatermarkInterval(1000);
            DataStream<String> dataSource = env.socketTextStream("manager-1", 10009);
            DataStream<Tuple2<String, Long>> mapData = dataSource.map(s ->
                    new Tuple2<>(s.split(",")[0], Long.parseLong(s.split(",")[1]))).returns(Types.TUPLE(Types.STRING, Types.LONG));
            mapData.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(10))
            .withTimestampAssigner((SerializableTimestampAssigner<Tuple2<String, Long>>) (o, l) -> o.f1))
                    .keyBy((KeySelector<Tuple2<String, Long>, String>) tuple2 -> tuple2.f0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
                    .process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {
                        @Override
                        public void process(String key, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector) throws Exception {
                            long sum = 0;
                            int index = 0;
                            Iterator<Tuple2<String, Long>> it = iterable.iterator();
                            Tuple2<String, Long> first = null, end = null;
                            while (it.hasNext()) {
                                Tuple2<String, Long> item = it.next();
                                if (index == 0) {
                                    first = item;
                                }
                                end = item;
                                sum += 1;
                                index++;
                            }
                            System.out.println("窗口开始时间: " + context.window().getStart());
                            System.out.println("窗口结束时间: " + context.window().getEnd());
                            System.out.println("窗口第一条数据: " + first);
                            System.out.println("窗口最后数据: " + end);
                            System.out.println("当前watermark: " + context.currentWatermark());
                            System.out.println();
                            collector.collect(new Tuple2<>(key, sum));
                        }
                    }).print();

            env.setRestartStrategy(RestartStrategies.noRestart());
            env.execute("TestPeriodicWatermark");
        }
    }

    2.6. SQL中Watermark使用

    在创建表的 DDL 中定义
    事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 Watermark 生成表达式,同时标记这个已有字段为时间属性字段。

    CREATE TABLE user_actions (
    user_name STRING,
     data STRING,
    user_action_time TIMESTAMP(3),
     -- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
    WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
    ) WITH (
    ...
    );
     
    SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
    FROM user_actions
    GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

    3. 一些注意点

    3.1. 多并行度数据流中的 Watermark

    在多并行度的情况下(多数据源流或单流多partition),Watermark 会有一个对齐机制,这个对齐机制会取所有 Channel 中最小的 Watermark。

    3.2. window的触发和移除

    window的触发条件:

    1、watermark时间 >= window_end_time
    2、在[window_start_time,window_end_time)中有数据存在

    (注意在允许延时allowed lateness的场景下,假设是t,watermark的生成逻辑一般是 watermark=maxEventtime - t),比如:官方的BoundedOutOfOrdernessWatermarks的实现逻辑就是:

    public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
        }

    看看如何触发窗口,可以详见window的Trigger。

    假如我们设置10s的时间窗口(window),那么0~10s,10~20s都是一个窗口,以0~10s为例,0为start-time,10为end-time。假如有4个数据的event-time分别是8(A),12.5(B),9(C),13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒
     当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算
     当B到达的时候,Watermarks为max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不会触发计算
     当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算
     当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算
     触发计算的时候,会将A,C(因为他们都小于10)都计算进去,其中C是迟到的。
     在13.5s后0~10s的窗口就被移除。

    另外需要注意的是,比如某个算子10并发度,但是产生下来的watermark只有6个,则有4个无法接收到watermark,导致后续的窗口计算也就无法触发。所以注意并行度等问题。

    window的移除条件:

    时间(事件或处理时间)超过window_end_time加上allowed lateness

    3.3. 空闲数据源处理

    由于数据源有分区,比如kafka,有些分区没有数据,导致watermark无法产生,导致后续无法触发计算。这时就可以设置参数让其认为该分区是空闲的,可以暂且不用去管,等到有数据再说。

    WatermarkStrategy
            .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
            .withIdleness(Duration.ofMinutes(1));

     参考官网:

    https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/event-time/generating_watermarks/

  • 相关阅读:
    无线电编码和记录和静音检测器 PlayOutONE LiveStream 5.0
    Python3中的“加和”函数
    SparkCore系列-9、共享变量
    Proxmox VE Install 7.2
    第五章ARM处理器的嵌入式硬件系统设计——课后习题
    2022-08-24 第六小组 瞒春 学习笔记
    搭建Atlas2.2.0 集成CDH6.3.2 生产环境+kerberos
    一起来领略JDK8中的流式编程的魅力
    GC垃圾回收算法
    高并发架构设计经验
  • 原文地址:https://blog.csdn.net/chanyue123/article/details/125148937