• 21.flink 水位线,彻底站起来


    摘要

    flink从1.12版本开始水位线分配做了不小的改动,以前的老代码需要改变,今天通过阅读源码,正好做个整理。
    首先现在的版本默认情况下使用event time。如果你想用process time请用:
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

    目前的版本WatermarkStrategy接口用来分配事件时间,和水位线,基本上是水位线和事件时间的入口类了,这个接口内置了很多已经被实现的策略,甚至我们拿来直接用,基本不用自己实现,当然了你也可以自己实现这个接口,去实现createTimestampAssigner,createWatermarkGenerator这两个方法。

    这篇文章有几个目的:1.让你彻底明白水位线 2.学会用最新的api来生成水位线
    本文章基于1.13版本

    一:flink三种时间

    窗口的分配基于时间,flink的时间有以下三种

    1. event time:流数据元素本身携带了时间戳
    2. processing time:指的是流数据使用算子的时间,也就是被算子处理的时间
    3. ingestion time:值得是事件到达 flink source的事件

    1.1 event time 优点缺点

    event time:使用数据元素携带的时间,首先数据有可能乱序到达,也就是存在时间延迟的可能性,因此如想结果准确,那么必然要增加延迟时间,如此一来就会增加处理延迟,这是最大的缺点。优点语义清晰更接近业务层实际场景。使用过程中要考虑对延迟到来的元素如何处理。 常见的处理方式:a.直接丢弃 2.siding 分流后续自己处理。c.给窗口设置一个延迟时间(治标不治本)一般生产环境会采用2和3联合处理

    1.2 processing time 优点缺点

    使用算子处理的时候该元素所在计算机的时间, 也就是说只考虑数据被真实处理的事件,不考虑数据什么时候产生的,只要该数据在某个事件段内到达了算子就会被flink自动划分窗口, 因此这是flink根据系统事件自动化分的,也不存在数据延迟的问题,更不存在水位线,这是一种高效率的处理方式。而且不需要设置水位线策略,代码也更加简洁。只需要在window指定即可:window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

    1.3 Ingestion time 优点缺点

    指的是事件到达flink source的时间,这是一个折中方案,效率低于processing time 高于event tion。 其水位线设置代码也更简洁一些。

    1.4 水位线是干嘛的

    流处理中,时间就是一个基石,我们已经知道时间分为事件时间,处理时间和接入时间这三类。那么水位线和时间有什么关系呢?思考一个问题,在流处理中我们一般是按照时间窗口来进行计算的。那么时间窗口是怎么触发的呢? 你有没有想过这个问题。而水位线就是为了触发计算而存在的。水位线其实就是携带自增时间的流数据,它和真正的数据(指事件)掺杂在一起随着数据流而流动。

    二.水位线

    2.1水位线的特点

    1. 水位线和事件的时间息息相关
      因为水位线本身就是为了衡量事件是否已经到来的, 也就意味着当水位线t到达算子的时候,那就意味着 事件时间小于t的事件都已经到了, 为什么这么说呢? 那是因为水位线的产生并不是随意而为的,而是基于事件时间的,你可以理解为基于事件的时间做了一些计算进而产生了水位线, 我们假设一个时间为t-01的事件,基于t-01此时产生了一个时间为t的水位线,则必然 t>=t-01 的关系也就意味着数据流中事件先传送, 基于这个事件生成的水位线后传送。水位所以t>=t-01, 因此当算子收到t的水位线的时候,正常来说t-01已经到达算子了。 这一点请仔细理解。
    2. 水位线一定是递增的,
      因为它衡量的是整个流数据的时间进度条, 所以生成水位线务必保证递增。否则会导致水位线失效,为什么这么说,那是因为partition watermark会取比上次值大的水位线进行自我更新。
      如果水位线不是递增的,会导致算子时钟无法更新,也就是窗口无法触发。如果读不懂没关系,下面会介绍partition watermark和算子时钟。
    3. 迟到的乱序数据
      当算子接收到水位线 t的时候意味着t之前的数据已经到来,但是因为一些其他原因可能会存在晚来的数据,有可能其事件事件小于 t但是此时却没有到来。 这被称为乱序数据,也被称为迟到数据,flink提供了针对迟到数据的处理策略,后面会说。
    4. 水位线的紧凑和宽松
      因为水位线的生成和事件时间息息相关,且水位线衡量了流数据的进度。
      a.水位线越紧凑:
      优点:应用的延迟很低,水位线紧凑那就触发的快。
      缺点:意味着迟到数据越多,结果越不准确, 数据流越大。
      b.水位线宽松:
      优点:迟到数据较少,计算结果比较准确,数据流就越小。
      缺点:应用延迟较高,因为flink可能会等待更多的时间才能进行计算。

    2.2水位线的传播

    对初学者来说恐怕很难很难理解水位线是怎么传播的,以及怎么工作的。这一小节我们来谈谈水位线的传播。 我们来看一句代码:ds.flatMap().keyBy(),假设ds是携带水位线的数据流,且ds有两个分区,且携带三个不同的key,在keyBy之后会出现三个分区。此时数据流动如下:

    2.21数据流动

    在这里插入图片描述

    上图介绍的是数据传输,放心我没有跑题,请确保明白了上图所示。我们接着再来说水位线。通过上图我们知道了,上下游算子数据传输其实就是上下游task线程数据的传输 , 我们说过水位线是和数据掺杂在一起的,这就意味着水位线传输也是通过socket传输的。task其实就是真正进行数据处理的线程 , 所谓的窗口触发,其实就是说task开始从上游往下游依次执行, 所以说一个窗口数据的处理,从上游到下游会经过很多的task。 那么task是怎么知道该执行了?抛开flink, 你仔细想想如果你写一个五分钟后执行的线程你会怎么写?答案很简单啊,首先定义一个五分钟之后的时间戳,然后判断当前时间是否大于你定义的时间,是的话就启动线程。flink也是这样啊,相对而言flink 中task的执行一定也要有一个触发时间,问题是触发时间是哪里来的呢?带着这个问题请往下看。

    2.22 水位线如何触发

    上面已经说了 keyBytask-01接收的数据和水位线分别来自maptask-01和maptask-02, 接下来研究的就是keyBytask-01的触发时间怎么来的。 首先肯定和水位线有关啊, 我们看到keyBytask-01拿到了maptask-01和maptask-02流数据中的水位线,然后取出最小的那个水位线时间去更新自己的触发时间。数据一直流动,触发时间一直更新。 这个触发时间flink称为 算子时钟,这个时钟其实就是task线程级别的时钟,由每个task线程维护,本质上就是task线程的一个时间变量。 当我们调用window划分时间的时候,flink会基于自然时间划分窗口,每个窗口都有start_time 和end_time.
    然后根据流数据中的event_time为 当前数据分配所属窗口。
    当算子时钟 >end_time的时候,并且窗口中的数据数量大于零的时候触发计算。
    你看懂了上面的基本明白了七七八八,甚至应付面试已经不成问题了。但是我还想多说点,上面说到了算子时钟,算子时钟存在于每一个task线程中, 且会随着下游socket链接传递给下游的task,也就是说对于下游的数据来说,算子时钟就是真正的水位线了。对于我们生成的水位线其实在第一个算子处理之后就不存在了(严格来说是被转换成算子时钟), 在第一个算子中,该算子对应的每个task都有一个时钟,该时钟被水位线更新,并被传递到下游算子最为下游数据的水位线。

    2.23 什么是分区水位线partition watermark

    这个部分按理说应该在 2.22 之前讲解的,但是为了方便理解整体逻辑,所以放在了这里。上面已经说了 keyBytask-01接收的数据和水位线分别来自maptask-01和maptask-02。 maptask-01 和maptask-o2其实对应的就是数据的两个分区。 对于keyBytask-01来收其消费的数据其实来自 maptask-01 和maptask-o2对应的两个分区(虽然本质上是maptask处理后的数据分区数据), keyBytask-01会维护一个列表,这个列表对应的就是maptask-o1,maptask-o2当前的水位线,这就是分区水位线partition watermark。 keyBytask-01遍历该列表取出最小的水位下那作为算子时钟。 列表中的值其实就是maptask遇到的水位线, 每个task对应一个水位线,在本例子中该列表就有两个水位线,一个是maptask-01对应的值,另一个是maptask-02对应的值,该值会随着数据的流动被更新,每次遇到新的水位线的时候都会和当前值比较,如果新遇到的水位下那比当前值大的话就更新列表中的水位线。 说白了, keyBytask-01维护的partition watermark 就对应着maptask-01和maptask-02 的处理进度。 假设这两个水位线的值分别为 {t1 , t2 }, 那就说明对maptask-01来说 所有事件时间小于t1的事件都已经被maptask-01接收了,同理对maptask-02来说 所有事件时间小于t2的事件都已经被maptask-02接收了. 我们再假设 t1 < t2, 那么keyBytask-01会遍历partition watermark 取出最小的值 t1作为算子时钟,然后将t1最为新的水位线转发给keyBy下游的算子, 此时对keyBy算子来水意味着所有事件时间小于t1的数据都被keyBy接收了。

    2.3什么时候需要水位线

    其实,正常来说,processing time和ingestion time 都可以不设置水位线,而如果要使用event time 一般都要设置水位线的。
    我个人觉得你一定要理解啊, 水位线是为了触发窗口计算而存在的,如果你根本不涉及到窗口计算,那么设置水位线屁用没有。

    三:设置水位线的必要两步骤

    本小节不涉及代码,因为不同版本的水位线设置代码略有不同,这里讲一下思路。
    1.提取时间
    不是说数据里已经有时间戳了吗,为什么这里还要“提取”呢?这是因为原始的时间戳只是流数据数据对象的一个字段,如果不提取出来并明确把它分配给数据,Flink 是无法知道数据真正产生的时间的,也就是说你得让flink知道你用的哪个时间。
    2.指定一个水位线生成策略
    水位线的生成很好理解,就是将生成的水位线发送到数据流中,使得水位线随着数据流一起流动。水位线的生成和事件中的时间息息相关。

    四. 设置水位线的时机

    1.直接在source端生成(只在event time模式下可用)
    此方式一般用于自定义source的时候,在获取数据的方法类用context直接提取时间,以及生成水位线。 下面伪代码,真正有用的是:collectWithTimestamp 和emitWatermark

    public class Device{
    public String deviceId;
    public long deviceTime;
    }
    public class MySource extends RichSourceFunction{
    @Override
    public void run(SourceContext ctx) throws Exception{
    while(某条件){
    Device device = getDevice();
    ctx.collectWithTimestamp(device,device.deviceTime);//提取时间 
    ctx.emitWatermark(new Watermark(device.deviceTime));//生成水位线
    
    }
    }
     } 
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.在接收到source之后统一分配

     KeyedStream, String> ds =...;
    ds .assignTimestampsAndWatermarks(
    WatermarkStrategy
    .forGenerator(WatermarkGeneratorSupplier generatorSupplier)//生成水位线
    .withTimestampAssigner(SerializableTimestampAssigner timestampAssigner)//提取时间 
    );
    forGenerator中接受一个水位线生成器具,你自己实现即可。withTimestampAssigner就很简单了,
    可以直接用lambda表达式提取时间即可
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    上面是部分理解到核心逻辑即可,下面会讲解代码的,不要急。
    所以核心逻辑:1.提取时间 2.生成水位线

    五.source后生成水位线 的{提取时间接口}和{生成水位西安接口}介绍

    5.1 总览

     KeyedStream, String> ds =...;
        ds .assignTimestampsAndWatermarks(
    	WatermarkStrategy
    		.forGenerator(WatermarkGeneratorSupplier generatorSupplier)//生成水位线
    		.withTimestampAssigner(SerializableTimestampAssigner timestampAssigner)//提取时间 
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    5.2. 提取时间(TimestampAssigner接口)

    withTimestampAssigner接受一个时间提取器SerializableTimestampAssigner,这个很简单,比如

    
    class MySerializableTimestampAssigner implements SerializableTimestampAssigner >{
    
        @Override
        public long extractTimestamp(Tuple2 element, long recordTimestamp) {
            return element.f1;//提取时间
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    5.3.指定水位线生成策略(WatermarkGenerator接口)

    forGenerator参数是一个WatermarkGeneratorSupplier,实现如下:

    class MyWatermarkGenerator implements WatermarkGeneratorSupplier>{
    
        @Override
        public WatermarkGenerator> createWatermarkGenerator(Context context) {
           return new WatermarkGenerator>() {
                @Override
                public void onEvent(Tuple2 event, long eventTimestamp, WatermarkOutput output) {
    										//每个事件调用一次
                }
    
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
    											//间隔一定周期调用一次
                }
            } ;
    
        }
    }
    这个接口对于新手其实也不好理解。在这里我想多说几句。
    flink的水位线也是一种流数据,它和流数据一起移动,
    只不过水位线数据不参与逻辑计算
    只是作为触发标志而存在。 
    所以说水位线是不参与逻辑计算的流数据,
    它由flink调用WatermarkGenerator去生成并塞入流数据中,
    这个行为对我们来说是不可见的。 那么这就很好理解了。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    1.onEvent:这个方法的意思是我们可以根据event的特性再加上一些逻辑去生成我们想要的水位线,此时根本不需要onPeriodicEmit方法了,在这种情况下水位线的生成可以说由我们完全控制了,比如当遇到某些特殊的event的时候去生成水位线,那么就可以在这个方法中去分析event,判断是否需要发出水位线。 再看参数eventTimestamp,这个参数其实就是TimestampAssigner分配的事件的时间,根据这个时间我们可以自定义一些生成水位线的方式。此时需要注意,因为生成水位线和eventTimestamp有关,而且水位线又是必须递增的,所以你要判断下。
    2.onPeriodicEmit
    这个方法很好理解啊,就是周期生成递增的水位线,你甚至可以字节用系统时间简单发出水位线,顾名思义此方法被周期性调用,而onEvent是每来一个元素都会调用的。
    再来看参数WatermarkOutput ,这个参数可以发出水位线,就是一个普通对象,包含一个发送方法,方法接受一个水位线对象而已。 我们观察到无论是
    onEvent还是onPeriodicEmit都有这个参数也就是说它们都可以发出水位线。
    这两个方法很灵活,你可以用enEvent发出水位线也可以用onPeriodicEmit发出水位线那。但是记住enEvent这是每个元素都会调用一次的方法,而onPeriodicEmit是被周期性调用的方法,所以主意啊enEvent可能会发出更多的水位线从而影响性能。 更多时候二者是结合使用的,比如onEvent只是做一些时间处理,存为类变量,而 onPeriodicEmit用于发出水位线。
    我们来看一个flink提供的内部的自增的周期性水位线的例子:

    public class BoundedOutOfOrdernessWatermarks implements WatermarkGenerator {
    
    
        @Override
        public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
            maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
        }
    
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    这个例子展示了结合onEvent和onPeriodicEmit这两个方法生成水位线的方法。
    onPeriodicEmit调用的周期可以再evn中设置:env.getConfig().setAutoWatermarkInterval(10)

    5.4.一个自定义的完整的例子

      
    import org.apache.flink.api.common.eventtime.*;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    ;
    //生成水位线有两处位置1.在source内部生成  2.在source到来之后生成,下面展示的是在source后面生成
    
    public class waterMarkDemo {
    
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setAutoWatermarkInterval(1000);//水位线周期,毫秒,即onPeriodicEmit调用的周期
            KeyedStream, String> ds = env.fromElements(
                    Tuple2.of("a", 1660819010000L),
                    Tuple2.of("b", 1660819020000L),
                    Tuple2.of("a", 1660819030000L),
                    Tuple2.of("b", 1660819040000L),
                    Tuple2.of("c", 1660819050000L)
            ).assignTimestampsAndWatermarks(WatermarkStrategy.>forGenerator(new MyWatermarkGenerator())
                    .withTimestampAssigner(
                            new MySerializableTimestampAssigner()
                    )
            ).keyBy(new KeySelector, String>() {
                @Override
                public String getKey(Tuple2 value) throws Exception {
                    return value.f0;
                }
            });
            ds.print();
    
            env.execute();
        }
    }
    
    class MyWatermarkGenerator implements WatermarkGeneratorSupplier> {
        private long maxTimestamp;
        private final long outOfOrdernessMillis = 60*1000;//一分钟 毫秒
    
        @Override
        public WatermarkGenerator> createWatermarkGenerator(Context context) {
            return new WatermarkGenerator>() {
                @Override
                public void onEvent(Tuple2 event, long eventTimestamp, WatermarkOutput output) {
                    maxTimestamp = Math.max(maxTimestamp, eventTimestamp);//eventTimestamp是 提取的flink事件时间
                }
    
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis));
                }
            };
    
        }
    }
    
    class MySerializableTimestampAssigner implements SerializableTimestampAssigner> {
    
        @Override
        public long extractTimestamp(Tuple2 element, long recordTimestamp) {
            return element.f1;//提取时间
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    5.5.来看WatermarkStrategy的源码

    
    @Public
    public interface WatermarkStrategy
            extends TimestampAssignerSupplier, WatermarkGeneratorSupplier {
    
      
        @Override
        WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
    
       
        @Override
        default TimestampAssigner createTimestampAssigner(
                TimestampAssignerSupplier.Context context) {
            // By default, this is {@link RecordTimestampAssigner},
            // for cases where records come out of a source with valid timestamps, for example from
            // Kafka.
            return new RecordTimestampAssigner<>();
        }
    
        
        default WatermarkStrategy withTimestampAssigner(
                TimestampAssignerSupplier timestampAssigner) {
            checkNotNull(timestampAssigner, "timestampAssigner");
            return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
        }
    
        default WatermarkStrategy withTimestampAssigner(
                SerializableTimestampAssigner timestampAssigner) {
            checkNotNull(timestampAssigner, "timestampAssigner");
            return new WatermarkStrategyWithTimestampAssigner<>(
                    this, TimestampAssignerSupplier.of(timestampAssigner));
        }
    
       
        default WatermarkStrategy withIdleness(Duration idleTimeout) {
            checkNotNull(idleTimeout, "idleTimeout");
            checkArgument(
                    !(idleTimeout.isZero() || idleTimeout.isNegative()),
                    "idleTimeout must be greater than zero");
            return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
        }
    
       
        static  WatermarkStrategy forMonotonousTimestamps() {
            return (ctx) -> new AscendingTimestampsWatermarks<>();
        }
    
      
        static  WatermarkStrategy forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
            return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
        }
    
       
        static  WatermarkStrategy forGenerator(WatermarkGeneratorSupplier generatorSupplier) {
            return generatorSupplier::createWatermarkGenerator;
        }
    
        
        static  WatermarkStrategy noWatermarks() {
            return (ctx) -> new NoWatermarksGenerator<>();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    六.总结

    基本上讲解的差不多了,上面都搞明白的话应付开发完全不成问题,另外仔细看WatermarkStrategy源码,发现forMonotonousTimestamps和forBoundedOutOfOrderness这两个水位线可以拿来直接用,就像下面这样:

    ds.assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps()
                    .withTimestampAssigner(new SerializableTimestampAssigner>() {
                        @Override
                        public long extractTimestamp(Tuple2 element, long recordTimestamp) {
                            return element.f1;
                        }
                    })
            )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    forMonotonousTimestamps和forBoundedOutOfOrderness都是自增周期性水位线,有兴趣的可以点开源码,你会发现其实现方式基本雷同,甚至和我上面自定义的水位线很相似。forBoundedOutOfOrderness和forMonotonousTimestamps唯一的区别就是forBoundedOutOfOrderness允许一个窗口闭合的延迟时间,简单说就是数据最大允许迟到多久。

  • 相关阅读:
    vue部署,chunk文件有部分404,解决方案
    ChatGPT研究报告:AIGC带来新一轮范式转移
    猿创征文|vue中使用Axios最佳实践
    redis 缓存击穿问题(互斥锁,逻辑过期)
    关键词生成原创文章软件-原创文章生成软件
    createNodeIterator认识
    2022UUCTF-web
    Fiddler抓手机包
    生产者消费者模型
    【统计分析】(task1) 假设检验1:方法论与一元数值检验
  • 原文地址:https://blog.csdn.net/qq_36066039/article/details/126370741