• flink重温笔记(九):Flink 高级 API 开发——flink 四大基石之WaterMark(Time为核心)


    Flink学习笔记

    前言:今天是学习 flink 的第 9 天啦!学习了 flink 四大基石之 Time的应用—> Watermark(水印,也称水位线),主要是解决数据由于网络延迟问题,出现数据乱序或者迟到数据现象,重点学习了水位线策略机制原理和应用,以及企业级的应用场景,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

    Tips:转码之路,溯洄从之,道阻且长!希望自己继续努力,学有所成,让华丽的分割线,成为闪耀明天的起跑线!


    三、Flink 高级 API 开发

    2. WaterMark

    2.1 为什么需要 WaterMark

    当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。
    在这里插入图片描述

    结论:

    • 只要使用 event time,就必须使用 watermark,在上游指定,比如:source、map算子后。

    • Watermark 的核心本质可以理解成一个延迟触发机制。

    • 因为前面提到,数据时间 >= 窗口结束时间,触发计算,这里想要延迟触发计算,所以水印时间一般比数据事件时间少几秒

    • 水印时间 = 事件时间 - 设置的水印长度

    • 水印的功能:在不影响按照事件时间判断数据属于哪个窗口的前提下,延迟某个窗口的关闭时间,让其等待一会儿延迟数据

    举例:

    窗口5秒,延迟(水印)3秒,按照事件时间计算
    
    来一条数据事件时间3, 落入窗口0-5.水印时间0
    来一条数据事件时间7, 落入窗口5-10,水印时间4
    来一条数据事件时间4,落入窗口0-5,水印时间1
    来一条数据事件时间8,落入窗口5-10,水印时间5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.2 多并行度与 WaterMark
    • 如果并行度是 n,那么watermark 就有 n 个
    • 触发条件以线程中最小的 watermark 为准

    在这里插入图片描述


    2.3 KeyBy 分流与 WaterMark
    • 一个程序中有多少个水印和并行度有关,和 keyby 无关

    举例:

    比如有单词hadoop spark
    按照keyby,会分成hadoop组 和spark组
    但是这两个组是共用1个水印的
    hadoop来的数据满足了触发条件,会将spark组的数据也触发
    
    • 1
    • 2
    • 3
    • 4

    2.4 水印的生成策略
    2.4.1 内置水印生成策略
    (1) 固定延迟生成水印

    简介:设置最大延迟时间

    例子:

    DataStream dataStream = ...... ;
    dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
    
    • 1
    • 2
    (2) 单调递增生成水印

    简介:当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小(网络延迟)。

    例子:

    DataStream dataStream = ...... ;
    dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
    
    • 1
    • 2
    2.4.2 自定义水印生成策略
    (1) 周期性 watermark 策略
    • 升序watermark:单调递增生成水印
    • 乱序watermark:固定延迟生成水印

    都是基于周期性生成,默认的周期是 200ms,一般不去改,保持在 ms 级别 onPeriodicEmit()

    (2) 间歇性 watermark 策略
    • 每一个事件时间都会产生一个watermark

    2.5 在非数据源之后使用水印 [重点]
    2.5.1 WaterMark 的四种使用情况
    (1) 本来有序的 Stream中的 Watermark

    例子:以 java bean 的数据输入作为有序事件时间

    package cn.itcast.day09.WaterMark;
    
    /**
     * @author lql
     * @time 2024-03-01 21:11:00
     * @description TODO
     */
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    /**
     * 使用单调递增水印,解决数据有序的场景(大多数情况都是乱序的数据,因此该场景比较少见)
     * 需求:从socket接受数据,进行转换操作,然后应用窗口每隔5秒生成一个窗口,使用水印时间触发窗口计算
     *
     * 使用水印的前提:
     * 1:数据必须要携带事件时间
     * 2:指定事件时间作为数据处理的时间
     * 3:指定并行度为1
     * 4:使用之前版本的api的时候,需要增加时间类型的代码
     *
     * 测试数据:
     * sensor_1,1547718199,35       -》2019-01-17 17:43:19
     * sensor_6,1547718201,15       -》2019-01-17 17:43:21
     * sensor_6,1547718205,15       -》2019-01-17 17:43:25
     * sensor_6,1547718210,15       -》2019-01-17 17:43:30
     *
     * todo 如果窗口销毁以后,有延迟数据的到达会被丢弃,无法再次触发窗口的计算了
     */
    public class MonotonousWaterMark {
        public static void main(String[] args) throws Exception {
            //todo 1)创建flink流处理的运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            //设置 Flink 程序中流数据时间语义为 EventTime。
            // 在处理数据时 Flink 程序会按照数据事件发生的时间进行处理,而不是按照数据到达 Flink 程序的时间进行处理。
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            //todo 2) 接入数据源
            SingleOutputStreamOperator<WaterSensor> lines = env.socketTextStream("node1", 9999)
                    .map(new MapFunction<String, WaterSensor>() {
                        @Override
                        public WaterSensor map(String value) throws Exception {
                            String[] data = value.split(",");
                            return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.parseInt(data[2]));
                        }
                    });
    
            //todo 3)添加水印处理
            //flink1.12之前版本的api编写(单调递增水印本质上还是周期性水印)
            SingleOutputStreamOperator<WaterSensor> waterMarkStream = lines.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<WaterSensor>() {
                @Override
                public long extractAscendingTimestamp(WaterSensor element) {
                    // 因为我们在转换时间戳,需要毫秒级别!
                    return element.getTs()*1000L;
                }
            });
    
            waterMarkStream.print("数据>>>");
    
            //todo 4)应用窗口操作,设置窗口长度为5秒
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS = waterMarkStream.keyBy(sensor -> sensor.getId()).window(TumblingEventTimeWindows.of(Time.seconds(5)));
    
            //todo 5)定义窗口函数
            SingleOutputStreamOperator<String> result = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                @Override
                public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                    out.collect("key" + s + "\n" +
                            "数据为" + elements + "\n" +
                            "数据条数为" + elements.spliterator().estimateSize() + "\n" +
                            "窗口时间为" + context.window().getStart() + "->" + context.window().getEnd());
                }
            });
    
            //todo 6)输出测试
            result.print();
    
            //todo 启动运行
            env.execute();
        }
    
        /**
         * 水位传感器,用来接受水位数据
         */
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        private static class WaterSensor {
            private String id;  //传感器id
            private long ts;    //时间
            private Integer vc; //水位
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106

    注意:flink 1.12 版本之后的有序流添加周期水印

    //注意:下面的代码使用的是Flink1.12中新的API
    SingleOutputStreamOperator<WaterSensor> sensorDS = lines
    	//TODO 有序流中的watermark
    	.assignTimestampsAndWatermarks(
        //指定watermark生成(单调递增)
    	WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()
        .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                //指定如何从数据提取时间戳
                return element.getTs() * 1000L;
    }));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    结果:

    情况一:一种类别输入
    sensor_6,1547718201,15       -2019-01-17 17:43:21
    sensor_6,1547718205,15       -2019-01-17 17:43:25
    sensor_6,1547718210,15       -2019-01-17 17:43:30
    
    输出:
    数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)
    数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718205, vc=15)
    keysensor_6
    数据为[MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)]
    数据条数为12019-01-17 17:43:20 - > 2019-01-17 17:43:25)
    
    数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718210, vc=15)
    keysensor_6
    数据为[MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718205, vc=15)]
    数据条数为1
    窗口时间为1547718205000->15477182100002019-01-17 17:43:25 - > 2019-01-17 17:43:30
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    情况二:两种类别输入
    sensor_1,1547718199,35       -2019-01-17 17:43:19
    sensor_6,1547718201,15       -2019-01-17 17:43:21
    sensor_6,1547718205,15       -2019-01-17 17:43:25
    sensor_6,1547718210,15       -2019-01-17 17:43:30
    
    输出:
    数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_1, ts=1547718199, vc=35)
    数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)
    数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718205, vc=15)
    keysensor_1
    数据为[MonotonousWaterMark.WaterSensor(id=sensor_1, ts=1547718199, vc=35)]
    数据条数为1
    窗口时间为1547718195000->15477182000002019-01-17 17:43:15 - > 2019-01-17 17:43:20)
        
    keysensor_6
    数据为[MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)]
    数据条数为1
    窗口时间为1547718200000->15477182050002019-01-17 17:43:20 - > 2019-01-17 17:43:25)
    
    数据>>>> MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718210, vc=15)
    keysensor_6
    数据为[MonotonousWaterMark.WaterSensor(id=sensor_6, ts=1547718205, vc=15)]
    数据条数为1
    窗口时间为1547718205000->15477182100002019-01-17 17:43:25 - > 2019-01-17 17:43:30
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    总结:

    • 1- 体现窗口左闭右开思想(即右端时间重合的数据不参与计算)
    • 2- 有序数据的水印窗口标准开始时间 :时间戳(秒级)// 窗口长度 * 窗口长度 * 1000 (这里的整除可以去掉余数
    // 如果是秒级,而不是时间戳:
    1)start = timestamp - (timestamp - offset + windowSize) % windowSize; ​
    
    事件时间 - (事件时间 - 0 + 窗口大小)%窗口大小 ​​​​​​​​​
    
    时间戳按照窗口长度 取整数倍(以1970110点为起点 => 伦敦时间) ​
    
    2)end = start + size ​​​​​​​​ 开始时间 + 窗口长度
    
    3)左闭右开: 属于本窗口的最大时间戳 = end -1ms , 所以时间为 end这条数据,不属于本窗口,所以是开区间
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 3- 有序数据的水印窗口标准结束时间 :标准开始时间 + 窗口长度

    • 4- 此时水位线的变化和事件时间保持一致(因为是有序时间,就不需设置延迟,那么 t 就是 0。

      ​ watermark = maxtime - 0 = maxtime)

    • 5- 环境并行度设置为 1,方便观察现象

    • 6- flink 1.12 之前版本,需要指定事件时间,env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    • 7- 转换时间戳时需要毫秒级别

    • 8- window().getStart() 获取窗口标准开始时间,window().getEnd()获取窗口标准结束时间

    • 9- spliterator().estimateSize() 获取窗口内数据条数

    • 10- api版本区别:

      • flink1.12之前:调用 assignTimestampAndwatermarks,new 一个 AscendingTimestampExtractor,重写方法获取时间戳
      • flink1.12之后:调用 assignTimestampAndWatermarks,有序流回调本质周期水印策略
        • WatermarkStrategy.forMonotonousTimestamps.withTimestampAssigner
        • new 一个序列化 SerializableTimestampAssigner,重写方法获取时间戳
    • 11- 应用场景:周期水印解决数据有序场景


    (2) 乱序事件中的Watermark

    例子:以 java bean 的数据输入作为乱序事件时间

    package cn.itcast.day09.WaterMark;
    
    /**
     * @author lql
     * @time 2024-03-02 15:20:38
     * @description TODO:
     */
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    /**
     * 使用固定延迟水印,解决数据乱序的场景(大多数情况都是乱序的数据,使用比较多)
     * 需求:从socket接受数据,进行转换操作,然后应用窗口每隔5秒生成一个窗口,使用水印时间触发窗口计算
     *
     * 使用水印的前提:
     * 1:数据必须要携带事件时间
     * 2:指定事件时间作为数据处理的时间
     * 3:指定并行度为1
     * 4:使用之前版本的api的时候,需要增加时间类型的代码
     *
     * 测试数据:
     * sensor_1,1547718199,35       -》2019-01-17 17:43:19
     * sensor_6,1547718201,15       -》2019-01-17 17:43:21
     * sensor_6,1547718205,15       -》2019-01-17 17:43:25
     * sensor_6,1547718210,15       -》2019-01-17 17:43:30
     *
     * todo 固定延迟触发,根据事件时间-最大乱序时间-1得到水印,根据水印时间作为触发窗口的条件
     * 触发窗口计算的两个条件:
     * 1:时间达到窗口的endtime
     * 2:窗口中存在数据
     */
    public class OutOfOrdernessWaterMark {
        public static void main(String[] args) throws Exception {
            // todo 1) 设置流处理环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            // todo 2) 数据源
            SingleOutputStreamOperator<WaterSensor> lines = env.socketTextStream("node1", 9999).map(new MapFunction<String, WaterSensor>() {
                @Override
                public WaterSensor map(String value) throws Exception {
                    String[] data = value.split(",");
                    return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.parseInt(data[2]));
                }
            });
    
            // todo 3) 设置水印
            //flink1.12之前版本的api编写
            SingleOutputStreamOperator<WaterSensor> waterMarkStream = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WaterSensor>(Time.seconds(3)) {
                @Override
                public long extractTimestamp(WaterSensor element) {
                    return element.getTs() * 1000L;
                }
            });
    
            waterMarkStream.print("数据>>>");
            //todo 4)应用窗口操作
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS = waterMarkStream.keyBy(t -> t.getId()).window(TumblingEventTimeWindows.of(Time.seconds(5)));
    
            //todo 5) 自定义窗口
            SingleOutputStreamOperator<String> result = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                @Override
                public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> collector) throws Exception {
                    collector.collect("key: " + s + "\n" +
                     "数据为: " + elements + "\n" +
                     "条数为:" + elements.spliterator().estimateSize() + "\n" +
                     "时间窗口为:" + context.window().getStart() + "->" + context.window().getEnd() + "\n");
                }
            });
    
            //todo 6) 打印操作
            result.print();
    
            //todo 7) 启动程序
            env.execute();
        }
    
        /**
         * 水位传感器,用来接受水位数据
         */
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        private static class WaterSensor {
            private String id;  //传感器id
            private long ts;    //时间
            private Integer vc; //水位
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104

    注意:flink 1.12 版本之后的无序流添加固定延迟水印

    SingleOutputStreamOperator<WaterSensor> waterMarkStream = lines
    	.assignTimestampsAndWatermarks(
        // 固定延迟水印,是 Duration 类型
    		WatermarkStrategy<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
    			.withTimestampAssigner(
    				new SerializableTimestampAssigner<WaterSensor>() {
    					@Override
    						public long extractTimestamp(WaterSensor waterSensor, long l) {
    							return waterSensor.getTs() * 1000L;
                				}}));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    结果:

    情况一:一种类别输入
    sensor_6,1547718201,15       -2019-01-17 17:43:21
    sensor_6,1547718205,15       -2019-01-17 17:43:25
    sensor_6,1547718210,15       -2019-01-17 17:43:30
    
    输出:
    数据>>>> OutOfOrdernessWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)
    数据>>>> OutOfOrdernessWaterMark.WaterSensor(id=sensor_6, ts=1547718205, vc=15)
    数据>>>> OutOfOrdernessWaterMark.WaterSensor(id=sensor_6, ts=1547718210, vc=15)
    key: sensor_6
    数据为: [OutOfOrdernessWaterMark.WaterSensor(id=sensor_6, ts=1547718201, vc=15)]
    条数为:1
    时间窗口为:1547718200000->15477182050002019-01-17 17:43:20 -> 2019-01-17 17:43:25
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    总结:

    • 1- 如果是有 key 类别的差异,触发窗口计算往往在 key 变化时,而不需要两个一样的 key 作为对照

    • 2- 因为设置了延迟,在触发窗口范围的时候,事件时间 - 延迟时间 = 水印时间

      • (例子中打印了 3 条数据,即第 3 条数据触发计算,第3条数据的水印时间的秒级:30 - 3 = 27 >= 窗口的 endTime)
      • 窗口触发两个条件,一是水印时间达到窗口 endTime,二是窗口内有数据
    • 3- api版本区别:

      • flink1.12之前:调用 assignTimestampAndWatermarks,new 一个 BoundedOutofOrdernessTimestampExtractor

        注意设置 延迟时间,重写方法获取事件时间

      • flink1.12之后:调用 assignTimestampAndWatermarks,有序流回调固定延迟水印策略

        • WatermarkStrategy.forBoundedOutOfOrderness(Duration 类型延迟时间).withTimestampAssigner
        • new 一个序列化 SerializableTimestampAssigner,重写方法获取时间戳
    • 4- 应用场景:固定延迟水印解决数据乱序场景


    (3) 并行数据流中的Watermark

    对齐机制会取所有 Channel 中最小的 Watermark,即:

    每个并行度中必须都有数据,且都满足触发窗口条件,才会有对齐机制

    例子:将并行度设置为2,带有线程号

    package cn.itcast.day09.WaterMark;
    
    /**
     * @author lql
     * @time 2024-03-02 19:27:53
     * @description TODO:多并行度下的水印操作演示
     */
    
    import org.apache.flink.api.common.eventtime.*;
    import org.apache.flink.api.common.functions.FilterFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.stringtemplate.v4.ST;
    
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Date;
    import java.util.Iterator;
    
    /**
     * 测试数据:
     * 并行度设置为2测试:
     * hadoop,1626934802000 ->2021-07-22 14:20:02
     * hadoop,1626934805000 ->2021-07-22 14:20:05
     * hadoop,1626934806000 ->2021-07-22 14:20:06
     */
    public class Watermark_Parallelism {
        //定义打印数据的日期格式
        final private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
        public static void main(String[] args) throws Exception {
            // todo 1) 流式环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
    
            // todo 2) 数据源
            SingleOutputStreamOperator<Tuple2<String, Long>> tupleDataStream = env.socketTextStream("node1", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String line) throws Exception {
                    try {
                        String[] array = line.split(",");
                        return Tuple2.of(array[0], Long.parseLong(array[1]));
                    } catch (NumberFormatException e) {
                        System.out.println("输入的数据格式不正确:" + line);
                        return Tuple2.of("null", 0L);
                    }
                }
            }).filter(new FilterFunction<Tuple2<String, Long>>() {
                @Override
                public boolean filter(Tuple2<String, Long> tuple) throws Exception {
                    if (!tuple.f0.equals("null") && tuple.f1 != 0L) {
                        return true;
                    }
                    return false;
                }
            });
    
            // todo 3) 水印操作
            SingleOutputStreamOperator<Tuple2<String, Long>> waterMarkDataStream = tupleDataStream.assignTimestampsAndWatermarks(
                    //TODO 自定义watermark生成器
                    WatermarkStrategy.forGenerator(new WatermarkGeneratorSupplier<Tuple2<String, Long>>() {
                        @Override
                        public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(Context context) {
                            return new MyWatermarkGenerator<>();
                        }
                    }).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                        @Override
                        public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                            // 获取数据中的 eventTime
                            Long timestamp = element.f1;
    
                            // 定义字符串并打印
                            System.out.println("键值:" + element.f0 + ",线程号:" + Thread.currentThread().getId() + "," +
                                    "事件时间:【" + sdf.format(timestamp) + "】");
                            return timestamp;
                        }
                    })
            );
    
            // todo 4) 分流和窗口
            SingleOutputStreamOperator<String> result = waterMarkDataStream
                    .keyBy(0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                        @Override
                        public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                            //定义该窗口所有时间字段的集合对象
                            ArrayList<Long> timeArr = new ArrayList<>();
                            // 首先获取了输入数据流(input)的迭代器
                            Iterator<Tuple2<String, Long>> iterator = input.iterator();
                            while (iterator.hasNext()) {
                                Tuple2<String, Long> tuple2 = iterator.next();
                                timeArr.add(tuple2.f1);
                            }
                            //对保存到集合列表的时间进行排序
                            Collections.sort(timeArr);
                            //打印输出该窗口触发的所有数据
                            String outputData = "" +
                                    "\n 键值:【" + tuple + "】" +
                                    "\n     触发窗口数据的个数:【" + timeArr.size() + "】" +
                                    "\n     触发窗口的数据:" + sdf.format(new Date(timeArr.get(timeArr.size() - 1))) +
                                    "\n     窗口计算的开始时间和结束时间:" + sdf.format(new Date(window.getStart())) + "----->" +
                                    sdf.format(new Date(window.getEnd()));
                            out.collect(outputData);
                        }
                    });
    
            //TODO 6)打印测试
            result.printToErr("触发窗口计算结果>>>");
    
            //TODO 7)启动作业
            env.execute();
        }
    
        public static class MyWatermarkGenerator<T> implements WatermarkGenerator<T>{
            //定义变量,存储当前窗口中最大的时间戳
            private long maxTimestamp = -1L;
            /**
             * 每条数据都会调用该方法
             * @param event
             * @param eventTimestamp
             * @param output
             */
            @Override
            public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
                //System.out.println("on Event...");
                maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
            }
    
            /***
             * 周期性的执行,默认是200ms调用一次
             * @param output
             */
            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                //System.out.println("on Periodic..."+System.currentTimeMillis());
                //发射watermark
                output.emitWatermark(new Watermark(maxTimestamp -1L));
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150

    结果:

    输入:
     * hadoop,1626934802000 ->2021-07-22 14:20:02
     * hadoop,1626934805000 ->2021-07-22 14:20:05
     * hadoop,1626934806000 ->2021-07-22 14:20:06
     
    输出:
    键值:hadoop,线程号:68,事件时间:【2021-07-22 14:20:02.000】
    键值:hadoop,线程号:69,事件时间:【2021-07-22 14:20:05.000】
    键值:hadoop,线程号:68,事件时间:【2021-07-22 14:20:06.000】
    触发窗口计算结果>>>:2> 
    键值:【(hadoop)】
         触发窗口数据的个数:【1】
         触发窗口的数据:2021-07-22 14:20:02.000
         窗口计算的开始时间和结束时间:2021-07-22 14:20:00.000----->2021-07-22 14:20:05.000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    总结:

    • 1- 获取线程号:Thread.currentThread().getId()
    • 2- 自定义日期格式:new SimpleDateFormat()
    • 3- 看到 input 是 Iterate 类型,需要调用 iterator()方法转化为迭代对象,运用 while 循环结合 hashNext()边迭代边加入列表
    • 4- Collections.sort(列表),可以对列表进行排序

    (4) 自定义 Watermark

    A. 周期性水印

    package cn.itcast.day09.WaterMark;
    
    /**
     * @author lql
     * @time 2024-03-02 17:07:17
     * @description TODO
     */
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.eventtime.*;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    /**
     * 自定义周期性水印,内置水印:固定延迟水印和单调递增水印都是基于周期性水印开发的,默认是200ms生成一次watermark
     */
    public class WaterMark_Periodic {
        public static void main(String[] args) throws Exception {
            // todo 1) 设置流处理环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            // todo 2) 数据源
            SingleOutputStreamOperator<WaterSensor> lines = env.socketTextStream("node1", 9999).map(new MapFunction<String, WaterSensor>() {
                @Override
                public WaterSensor map(String value) throws Exception {
                    String[] data = value.split(",");
                    return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.parseInt(data[2]));
                }
            });
    
            // todo 3) 设置水印操作
            SingleOutputStreamOperator<WaterSensor> sensorDS  = lines.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(
                    new WatermarkGeneratorSupplier<WaterSensor>() {
                        @Override
                        public WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {
                            return new MyWatermarkGenerator<>();
                        }
                    }).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                @Override
                public long extractTimestamp(WaterSensor waterSensor, long l) {
                    return waterSensor.getTs() * 1000L;
                }
            }));
    
            // todo 4) 分组
            KeyedStream<WaterSensor, String> sensorKS  = sensorDS.keyBy(t -> t.getId());
    
            // todo 5) 开窗
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS  = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
    
            // todo 6) 自定义窗口
            SingleOutputStreamOperator<String> result = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                @Override
                public void process(String s, Context context, Iterable<WaterSensor> iterable, Collector<String> out) throws Exception {
                    out.collect("key: " + s + "\n" +
                     "数据为: " + iterable + "\n" +
                     "条数为:" + iterable.spliterator().estimateSize() + "\n" +
                     "时间窗口为:" + context.window().getStart() + "->" + context.window().getEnd() + "\n");
                }
            });
    
            // todo 7) 打印和启动
            result.print();
            env.execute();
        }
    
        /**
         * 水位传感器,用来接受水位数据
         */
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        private static class WaterSensor {
            private String id;  //传感器id
            private long ts;    //时间
            private Integer vc; //水位
        }
    
    
        private static class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {
            private long maxTimestamp = -1L;
    
            /**
             * 每条数据执行一次
             * @param event
             * @param eventTimestamp
             * @param watermarkOutput
             */
            @Override
            public void onEvent(T event, long eventTimestamp, WatermarkOutput watermarkOutput) {
                System.out.println("onEvent……");
                maxTimestamp = Math.max(eventTimestamp, maxTimestamp);
            }
    
            /**
             * 周期性执行一次
             * @param watermarkOutput
             */
            @Override
            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                System.out.println("onPeriodicEmit……"+ +System.currentTimeMillis());
                // 发生水印
                watermarkOutput.emitWatermark(new Watermark(maxTimestamp));
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116

    结果:

    onPeriodicEmit……1709376044007
    onPeriodicEmit……1709376044214
    onPeriodicEmit……1709376044415
    onPeriodicEmit……1709376044631
    onPeriodicEmit……1709376044834
    
    • 1
    • 2
    • 3
    • 4
    • 5

    总结:

    • 1- 自定义水印:WatermarkStrategy.forGenerator(new WatermarkGeneratorSupplier
      • 重写方法,返回新的 Class<>()
      • 继承 WatermarkGenerator ,重写两个方法,一个每条数据执行一次,一个周期执行一次(默认是200ms)
    • 2- 更改执行周期:env.getConfig().setAutoWatermarkInterval(2000)
    • 3- 调用易出错:forGenerate 有 withTimestampAssigner 方法

    B. 间歇性水印:

    • 在上述自定义周期性水印方法的 onEvent 中实现 onPeriodicEmit 中的生成水印代码即可实现
    watermarkOutput.emitWatermark(new Watermark(maxTimestamp));
    
    • 1
    2.6 在数据源之后使用水印 (Kafka) [重点]
    2.6.1 kafka 向指定分区写入数据
    package cn.itcast.day09.watermark.kafka;
    
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.internals.Topic;
    
    import java.util.Properties;
    
    /**
     * kafka生产者工具类,模拟数据的生成,将数据写入到指定的分区中
     *
     * 第一个分区写入:1000,hadoop、7000,hadoop-》没有触发窗口计算
     * 第二个分区写入:7000,flink              -》触发了窗口计算
     */
    public class KafkaMock {
        private final KafkaProducer<String, String> producer;
        public final static String TOPIC = "test3";
    
        private KafkaMock(){
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            producer = new KafkaProducer<String, String>(props);
        }
        public void producer(){
            long timestamp = 1000;
            String value = "hadoop";
            String key = String.valueOf(value);
            String data = String.format("%s,%s", timestamp, value);
            producer.send(new ProducerRecord<String, String>(TOPIC, 1, key, data));
            producer.close();
        }
        public static void main(String[] args) {
            new KafkaMock().producer();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    2.6.2 水印机制消费 kafak 数据
    package cn.itcast.day09.watermark.kafka;
    
    import org.apache.flink.api.common.eventtime.TimestampAssigner;
    import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
    import org.apache.flink.util.Collector;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import scala.collection.convert.Wrappers;
    
    import java.time.Duration;
    import java.util.Iterator;
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 使用水印消费kafka里面的数据
     */
    public class WatermarkTest {
        public static void main(String[] args) throws Exception {
            //todo 1)初始化flink流处理环境
            Configuration configuration = new Configuration();
            configuration.setInteger("rest.port", 8081);//设置webui的端口号
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
    
            env.setParallelism(2);
            env.enableCheckpointing(5000);
            //todo 2)接入数据源
            //指定topic的名称
            String topicName = "test3";
            //实例化kafkaConsumer对象
            Properties props = new Properties();
            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
            props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
            props.setProperty("flink.partition-discovery.interval-millis", "5000");//开启一个后台线程每隔5s检测一次kafka的分区情况
    
            FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), props);
            kafkaSource.setCommitOffsetsOnCheckpoints(true);//todo 在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是true
    
            DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
    
            //在数据源上添加水印
            SingleOutputStreamOperator<String> watermarkStream = kafkaDS.assignTimestampsAndWatermarks(
                    WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                            .withTimestampAssigner(new TimestampAssignerSupplier<String>() {
                                @Override
                                public TimestampAssigner<String> createTimestampAssigner(Context context) {
                                    return new TimestampAssigner<String>() {
                                        @Override
                                        public long extractTimestamp(String element, long recordTimestamp) {
                                            return Long.parseLong(element.split(",")[0]);
                                        }
                                    };
                                }
                            }).withIdleness(Duration.ofSeconds(60))
            );
    
            //todo 3)单词计数操作
            SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = watermarkStream.map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    return new Tuple2<String, Long>(value.split(",")[1], 1L);
                }
            });
            //todo 4)单词分组操作
            wordAndOne.keyBy(x-> x.f0).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))).process(
                    new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                        @Override
                        public void process(String s, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                            long sum = 0L;
                            Iterator<Tuple2<String, Long>> iterator = elements.iterator();
                            while (iterator.hasNext()){
                                Tuple2<String, Long> tuple2 = iterator.next();
                                System.out.println(tuple2.f0);
                                sum += tuple2.f1;
                            }
                            out.collect(s + ","+sum);
                        }
                    }
            ).print();
    
            env.execute();
    
            //todo 6)启动作业
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104

    结果1:没加 withIdleness

    输入:
     * 第一个分区写入:1000,hadoop、7000,hadoop-》没有触发窗口计算
     * 第二个分区写入:7000,flink              -》触发了窗口计算
    
    • 1
    • 2
    • 3

    结果2:加上 withIdleness

    输入:
     * 第一个分区写入:1000,hadoop、7000,hadoop-30s 后触发窗口计算
    
    • 1
    • 2

    结论:

    • 1- 当某一个分区的触发机制达到的时候,其他的分区触发机制迟迟未触发的时候,无法触发机制
    • 2- withIdleness(Duration.ofSeconds(30)),允许 30s 等待其他分区触发计算,如果还没有触发,直接计算该分区
    • 3- 工作中一般设置 1 - 10分钟
    • 4- kafka 数据源添加水印,withTimestampAssigner 需要 new 一个 TimestampAssignerSupplier (第一次出现

    2.7 Flink 对严重迟到数据的处理

    例子:延迟数据处理机制设计

    package cn.itcast.day09.WaterMark;
    
    /**
     * @author lql
     * @time 2024-03-03 13:11:44
     * @description TODO
     */
    
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    import java.time.Duration;
    
    /**
     * flink默认情况下会将迟到的数据丢弃,但是对于绝大多数的业务中是不允许删除迟到数据的,因此可以使用flink的延迟数据处理机制进行数据的获取并处理
     */
    public class LatenessDataDemo {
        public static void main(String[] args) throws Exception {
            // 设置环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 数据源
            DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
    
            SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] data = value.split(",");
                    return new Tuple2<String, Long>(data[0], Long.parseLong(data[1]));
                }
            });
    
            // 水印操作 -> 水印3秒
            SingleOutputStreamOperator<Tuple2<String, Long>> watermarkStream = wordAndOne.assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                            .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                @Override
                                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                    // 报错地方:因为我们的数据源已经是毫秒级别了,就不需要转换 *1000L哦!
                                    return element.f1;
                                }
                            })
            );
    
            // 窗口操作 -> 5秒窗口
            // todo 1. 设置允许延迟的时间是通过allowedLateness(lateness: Time)设置
            WindowedStream<Tuple2<String, Long>, String, TimeWindow> windowStream = watermarkStream
                    .keyBy(t -> t.f0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .allowedLateness(Time.seconds(2));
    
            // todo 2.初始化延迟到达的数据对象
            OutputTag<Tuple2<String,Long>> outputTag = new OutputTag<>(
                    "side output",
                    TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})
            );
    
            // todo 3.保存延迟到达的数据
            WindowedStream<Tuple2<String, Long>, String, TimeWindow> sideOutputLateData = windowStream.sideOutputLateData(outputTag);
    
            // 数据聚合
            SingleOutputStreamOperator<Tuple2<String, Long>> result = sideOutputLateData.apply(new WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
                @Override
                public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception {
                    String key = null;
                    Long counter = 0L;
                    for (Tuple2<String, Long> element : input) {
                        key = element.f0;
                        counter += 1;
                    }
                    out.collect(Tuple2.of(key, counter));
                }
            });
            result.print("正常到达的数据>>>");
    
            // todo 4.获取延迟到达的数据
            DataStream<Tuple2<String, Long>> sideOutput = result.getSideOutput(outputTag);
    
            sideOutput.printToErr("延迟到达的数据>>>");
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    结果:

     /*
     * 每5s一个窗口,水印:3s,延迟等待:2s
     * 测试数据:
     * hadoop,1626936202000  -> 2021-07-22 14:43:22 第一个窗口的数据
     * hadoop,1626936207000  -> 2021-07-22 14:43:27 因为设置了水印,所以不会触发窗口计算
     * hadoop,1626936202000  -> 2021-07-22 14:43:22 第一个窗口的数据
     * hadoop,1626936203000  -> 2021-07-22 14:43:23 第一个窗口的数据
     * hadoop,1626936208000  -> 2021-07-22 14:43:28 触发了窗口计算(hadoop,3),水印时间满足窗口endtime
     *
     * ====================事件时间 28 秒 -> 水印时间 25 秒 刚好临界 endtime =======================
     * ===============延迟 2s 等待机制:延迟到事件时间 30s 即 水印时间 27s 关闭第一个窗口===============
     *
     * 第一个窗口时间 2021-07-22 14:43:20 -> 2021-07-22 14:43:25
     *
     * hadoop,1626936202000  -> 2021-07-22 14:43:22 已经触发过计算的窗口再次有新数据到达,(hadoop,4)(数据重复计算)
     * hadoop,1626936203000  -> 2021-07-22 14:43:23 已经触发过计算的窗口再次有新数据到达,(hadoop,5)
     * hadoop,1626936209000  -> 2021-07-22 14:43:29 虽然 水印时间达到endtime,但是窗口里面没有新数据,不触发计算
     * hadoop,1626936202000  -> 2020-07-22 14:43:22 已经触发过计算的窗口再次有新数据到达,(hadoop,6)
     * hadoop,1626936210000  -> 2021-07-22 14:43:30 满足了窗口销毁的条件,开始专注于第二个新窗口
     *
     * 第二个窗口时间 2021-07-22 14:43:25 -> 2021-07-22 14:43:30
     * 
     * ====================事件时间 33 秒 -> 水印时间 30 秒 刚好临界 endtime ====================================
     *  * ===============延迟 2s 等待机制:延迟到事件时间 35s 即 水印时间 32s 关闭第二个窗口===============
     *
     * hadoop,1626936202000  -> 2021-07-22 14:43:22 打印迟到数据,(hadoop,1626936202000)
     * hadoop,1626936215000  -> 2021-07-22 14:43:35 达到水印时间触发窗口计算:(hadoop,3),之前27,28,29秒的数据
     */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    总结:

    • 1- 设计允许迟到数据时间:

      在水印策略后面加上:allowedLateness(Times.seconds())

    • 2- 初始化迟到的数据对象:

      new OutputTag<>(id名字,TypeInformation.of(new TypeHint<迟到数据类型>(){ }))

    • 3- 保存延迟到达迟到数据:

      窗口流.sideOutputLateData(初始化对象)

    • 4- 获取延迟到达迟到数据:

      结果流.getSideOutput(初始化对象)

    • 5- 测输出流是之前 Window Function API 中的重要算子

      OutputTag(注意复习!)

    思考:

    • allowedLateness(Times.seconds) 设计允许迟到时间和

      withIdleness(Duration.ofSeconds(30)) 设计允许等待触发时间有什么不同呢?

    回答:

    • (1) 从概念上看,allowedLateness 是延迟窗口关闭,不影响触发时间,而 withIdleness 等待分区一段时间,等不到就触发
    • (2) 从应用来看,allowedLateness 适用于车联网入隧道一段时间没上报数据等待数据,而 withIdleness 适用于分区木桶原理等待数据,等不到数据就单独分区触发计算。

  • 相关阅读:
    tcp_v4_connect函数的解析
    如何搭建APP分发平台分发平台搭建教程
    政务服务一网通办云平台及智慧政务大数据资源中心建设方案
    debian和ubuntu
    nacos使用达梦数据库
    正则表达式
    C语言:strlen() --- 计算字符串长度
    力扣刷题day46|1143最长公共子序列、1035不相交的线、53最大子序和
    746. 使用最小花费爬楼梯 (Swift版本)
    uniapp APP下载流文件execl 并用WPS打开
  • 原文地址:https://blog.csdn.net/m0_60732994/article/details/136431811