• 【API篇】十、生成Flink水位线


    1、水位线的生成原则

    水位线出现,即代表这个时间之前的数据已经全部到齐,之后不会再出现之前的数据了。参考前面的乱序流,可以得出:

    • 想要保证数据绝对正确,就得加足够大的延迟,但实时性就没保障了
    • 想要实时性强,就得把延迟设置小,但此时迟到数据可能遗漏,准确性降低

    水位线的定义,是对低延迟和结果准确性的一个权衡。Flink生成水位线的方法是.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间

    DataStream<Event> stream = env.addSource(xxx);
    
    DataStream withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(WatermarkStrategy对象);
    
    
    • 1
    • 2
    • 3
    • 4

    WatermarkStrategy是一个接口,包含了一个时间戳分配器TimestampAssigner和一个水位线生成WatermarkGenerator:

    public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{
    
        // 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
        @Override
        TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
    
        // 主要负责按照既定的方式,基于时间戳生成水位线
        @Override
        WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2、有序流内置水位线

    有序流的时间戳全部单调递增,没有迟到数据,直接WatermarkStrategy.forMonotonousTimestamps()就可以拿到WatermarkStrategy对象

    public class WatermarkMonoDemo {
        public static void main(String[] args) throws Exception {
        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("node01", 9527)
                    .map(new WaterSensorMapFunction());
    
            // TODO 1.定义Watermark策略
            WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                    // 1.1 指定watermark生成:升序的watermark,没有等待时间
                    .<WaterSensor>forMonotonousTimestamps()
                    // 1.2 指定 时间戳分配器,从数据中提取
                    .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                        @Override
                        public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                            System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                            // 返回的时间戳,要毫秒,这里拿自定义对象的ts属性做为时间戳
                            return element.getTs() * 1000L;
                        }
                    });
    
            // TODO 2. 指定 watermark策略
            SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
    
    
            sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                    // TODO 3.使用事件时间语义的窗口,别再用处理时间TumblingProcessTime
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .process(
                            new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
    
                                @Override
                                public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                    long startTs = context.window().getStart();
                                    long endTs = context.window().getEnd();
                                    String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                    String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    
                                    long count = elements.spliterator().estimateSize();
    
                                    out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                                }
                            }
                    )
                    .print();
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    执行下,输入10时,逻辑时钟被推到了10s,到达区间,触发窗口,执行全窗口函数的process,输出当前窗口的数据:

    在这里插入图片描述

    3、乱序流内置水位线

    调用WatermarkStrategy. forBoundedOutOfOrderness(),传入延迟时间:

    public class WatermarkOutOfOrdernessDemo {
        public static void main(String[] args) throws Exception {
        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("node01", 9527)
                    .map(new WaterSensorMapFunction());
    
            // TODO 1.定义Watermark策略
            WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                    // 1.1 指定watermark生成:乱序的,等待3s
                    .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    // 1.2 指定 时间戳分配器,从数据中提取
                    .withTimestampAssigner(
                            (element, recordTimestamp) -> {
                                // 返回的时间戳,要 毫秒
                                System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                                return element.getTs() * 1000L;
                            });
    
            // TODO 2. 指定 watermark策略
            SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
    
            sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                    // TODO 3.使用 事件时间语义 的窗口
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .process(
                            new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
    
                                @Override
                                public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                    long startTs = context.window().getStart();
                                    long endTs = context.window().getEnd();
                                    String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                    String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    
                                    long count = elements.spliterator().estimateSize();
    
                                    out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                                }
                            }
                    )
                    .print();
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    执行:

    在这里插入图片描述

    简单分析下结果:

    • 第一条数据s1,1,1进来,创建窗口,水位线为1s-3s(延迟3s)
    • s1,10,10进来,水位线为10-3 =7s,还未到达10,窗口不触发(若是有序流,无等待下,此时窗口已被触发了)
    • 此时进来一条乱序数据,比如s1,6,6,6-3=3s,水位线保持上面的7不变,watermark不会推进,且6这条数据也会被统计在[0,10)的区间内
    • s1,11,11进来,11-3=8,也不会触发,但这条数据是属于[10,20)区间的那个桶的
    • s1,13,13进来,达到10,窗口触发

    4、自定义周期性水位线生成器

    上面只是定义了时间戳的提取逻辑,水位线的生成采用的默认内置策略。接下来自定义水位线生成器:周期性水位生成器。

    周期性生成器是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发射生成的水位线

    // 自定义水位线的产生
    public class CustomPeriodicWatermarkExample {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("node01", 9527)
                    .map(new WaterSensorMapFunction());
    
            // 定义Watermark策略
            WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                    // 1.1 指定watermark生成器
                    .<WaterSensor>forGenerator(context -> MyPeriodWatermarkGenerator<>(3000L))
                    // 1.2 指定时间戳分配器,从数据中提取
                    .withTimestampAssigner(
                            (element, recordTimestamp) -> {
                                // 返回的时间戳,要 毫秒
                                System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                                return element.getTs() * 1000L;
                            });
    
            // TODO 2. 指定 watermark策略
            SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
    
            sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                    // TODO 3.使用 事件时间语义 的窗口
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .process(
                            new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
    
                                @Override
                                public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                    long startTs = context.window().getStart();
                                    long endTs = context.window().getEnd();
                                    String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                    String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    
                                    long count = elements.spliterator().estimateSize();
    
                                    out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                                }
                            }
                    )
                    .print();
    
            env.execute();
        }
    
       
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    模仿前面的内置生成器,定义自己的水位线生成器:

    public class MyPeroidWatermarkGenerator implements WatermarkGenerator<Event> {
    
         private Long delayTime = 5000L; // 延迟时间
         private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳
    	
    	//构造方法,传入延迟时间,构造水位线生成器对象
    	public MyPeroidWatermarkGenerator(long delayTime){
    		this.delayTime = delayTime;
    		this.maxTs = Long.MIN_VALUE + this.delayTime + 1;
    	}
    	
    	/**
    	* 每条数据进来都调用一次,用来提取最大的事件事件
    	*/
         @Override
         public void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {
             // 每来一条数据就调用一次
             maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳
             System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp);
         }
    
    	/**
    	* 周期性调用,默认20ms
    	*/
         @Override
         public void onPeriodicEmit(WatermarkOutput output) {
             // 发射水位线,默认200ms调用一次
             output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
             System,out,println("调用了onPeriodicEmit方法,生成watermark==" + (maxTimestamp - delayTs - 1) );
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    核心部分,指定水位线生成器的Lamdba表达式展开就是:

    在这里插入图片描述

    运行:

    • 数据没进来前,每200ms调用一次发射水位线的方法,此时的水位线是构造方法里Long.MIN_VALUE那个
    • 进来一条数据,调用onEvent,最大时间戳被更新,到周期后再发射水位线maxTs-delayTs-1
    • 继续周期性调用onPeriodicEmit方法

    在这里插入图片描述

    onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了,这个方法由系统框架周期性地调用,默认200ms一次

    修改默认的周期,比如改为400ms:

    env.getConfig().setAutoWatermarkInterval(400L);
    
    • 1

    5、自定义断点式水位线生成器

    断点式生成器会不停地检测onEvent()中的事件,发现带有水位线信息的当事件时,就立即发出水位线。改下代码,定义水位线生成器:

    public class PointWatermarkGenerator implements WatermarkGenerator<Event> {
    
         private Long delayTime = 5000L; // 延迟时间
         private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳
    	
    	//构造方法,传入延迟时间,构造水位线生成器对象
    	public MyPeroidWatermarkGenerator(long delayTime){
    		this.delayTime = delayTime;
    		this.maxTs = Long.MIN_VALUE + this.delayTime + 1;
    	}
    	
    	/**
    	* 每条数据进来都调用一次,用来提取最大的事件事件
    	*/
         @Override
         public void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {
             // 每来一条数据就调用一次
             maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳
             // 发射水位线
             output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
             System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp + ",生成watermark==" + (maxTimestamp - delayTs - 1));
         }
    
    	/**
    	* 周期性调用,默认20ms
    	*/
         @Override
         public void onPeriodicEmit(WatermarkOutput output) {
             
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    周期性代码改为:

    //...
    		// 定义Watermark策略
            WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                    // 1.1 指定watermark生成器
                    .<WaterSensor>forGenerator(context -> PointWatermarkGenerator<>(3000L))
                    // 1.2 指定时间戳分配器,从数据中提取
                    .withTimestampAssigner(
                            (element, recordTimestamp) -> {
                                // 返回的时间戳,要 毫秒
                                return element.getTs() * 1000L;
                            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    运行:此时不再周期性的发射水位线

    在这里插入图片描述

    6、从数据源中发送水位线

    在自定义的数据源中抽取事件时间,然后发送水位线:

    env.fromSource(
    kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource" 
    )
    
    //注意fromSorce方法的第二个传参,之前用的WatermarkStrategy.noWatermark()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    注意此时不用再assignTimestampsAndWatermarks了,在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一

  • 相关阅读:
    第18集丨不立志,天下无可成之事
    Jmeter之JSON提取器说明示例
    论文阅读——Detection Hub(cvpr2023)
    互联网摸鱼日报(2023-11-07)
    MYSQL--存储引擎和日志管理
    ps多种去水印方法与技巧-适合各种水印
    【SQL引擎 - analyze.cpp分析(二)】
    渗透测试-apt攻击与防御系列-利用WinRAR跨目录获取Net-NTLM Hash和DLL劫持
    ES-初识ES
    LastPass 开发者系统被黑客窃取源代码
  • 原文地址:https://blog.csdn.net/llg___/article/details/134021481