• 【API篇】十一、Flink水位线传递与迟到数据处理


    1、水位线传递

    上游task处理完水位线,时钟改变后,要把数据和当前水位线继续往下游算子的task发送。当一个任务接收到多个上游并行任务传递来的水位线时,以最小的那个作为当前任务的事件时钟。如图:上游算子并行度为4,:

    - 第一波的2.4.3.6传递到下游task,取2
    - 其中一个上游task的数据4到了,传递到下游,4.4.3.6,此时,水位线被更新为最小的3
    - 其中一个上游task的7到了,下游task为4.7.3.6,最小仍为3,不更新
    - 上游task的6到下游,下游为4.7.6.6,最小为4,水位线再更新
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    总结:

    • 接收到上游多个,取最小
    • 往下游多个发送,广播

    使用上篇的乱序流来查看水位线的传递,这次把并行度不能再是1,设置为2

    public class WatermarkOutOfOrdernessDemo {
        public static void main(String[] args) throws Exception {
        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(2);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("node01", 9527)
                    .map(new WaterSensorMapFunction());
    
            // TODO 1.定义Watermark策略
            WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                    // 1.1 指定watermark生成:乱序的,等待3s
                    .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    // 1.2 指定 时间戳分配器,从数据中提取
                    .withTimestampAssigner(
                            (element, recordTimestamp) -> {
                                // 返回的时间戳,要 毫秒
                                System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                                return element.getTs() * 1000L;
                            });
    
            // TODO 2. 指定 watermark策略
            SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
    
            sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                    // TODO 3.使用 事件时间语义 的窗口
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .process(
                            new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
    
                                @Override
                                public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                    long startTs = context.window().getStart();
                                    long endTs = context.window().getEnd();
                                    String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                    String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    
                                    long count = elements.spliterator().estimateSize();
    
                                    out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                                }
                            }
                    )
                    .print();
    
            env.execute();
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    执行:

    在这里插入图片描述
    画个示意图:

    在这里插入图片描述

    2、水位线设置空闲等待

    结合上图,上面是并行度为2,数据进来了会轮询到两个上游task,如果此时一个上游task一直没有数据进来,而当前Task是以最小的那个作为当前任务的事件时钟,就会导致下游接收的Task时钟一直为起始值而无法推进,进而导致窗口无法触发。

    public class WatermarkIdlenessDemo {
    
        public static void main(String[] args) throws Exception {
        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(2);
    
            //  MyPartitioner是自定义分区器:数据%分区数,只输入奇数,都只会去往map的一个子任务,余数总为1,0.1两个map的task总去1
            SingleOutputStreamOperator<Integer> socketDS = env
                    .socketTextStream("hadoop102", 7777)
                    .partitionCustom(new MyPartitioner(), r -> r)
                    .map(r -> Integer.parseInt(r))
                    .assignTimestampsAndWatermarks(
                            WatermarkStrategy
                                    .<Integer>forMonotonousTimestamps()
                                    .withTimestampAssigner((r, ts) -> r * 1000L)
                                    
                    );
    
    
            // 分成两组: 奇数一组,偶数一组 , 开10s的事件时间滚动窗口
            socketDS
                    .keyBy(r -> r % 2)
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
                        @Override
                        public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
                            long startTs = context.window().getStart();
                            long endTs = context.window().getEnd();
                            String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                            String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    
                            long count = elements.spliterator().estimateSize();
    
                            out.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
    
                        }
                    })
                    .print();
    
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    运行:

    在这里插入图片描述

    分析:以上demo中,为了实现数据总流向一个子task,用了自定义分区器:

    在这里插入图片描述

    .partitionCustom(new MyPartitioner(), r -> r)
    
    • 1

    以输出数据为key,key除以并行度2区域为分区逻辑,如果我一直输入奇数,分区值就一直为1,就可以实现数据只流向其中一个子task。流向下游算子时,一个task始终没数据,导致取小的时候一直取到了没数据的原始time,时钟无法更新,窗口无法触发。此时就需要设置最大空闲时间,太久没数据来时,就不让它参与比较。

    .withIdleness(Duration.ofSeconds(5))  //空闲等待5s
    
    • 1

    在这里插入图片描述

    此时,输入到9时,已到5s时间,不再比较另一个没数据的task,11一进来,立马触发窗口

    在这里插入图片描述

    3、迟到数据处理:窗口允许迟到

    前面为了解决乱序流,提出了延迟的概念:

    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3));
    
    • 1

    以上,即窗口延迟触发3秒,即让水位线的推进值 = 当前值 - 3,以便争取为乱序数据更多的时间进入窗口。但当延迟完成,窗口触发计算和关闭后,再来的属于已关闭窗口的数据就不会被统计在内了,这些数据也成为迟到数据。(本来8.30上课,老师等等家远的学生,说8.40开始讲课,结果你却9.00才到,那就门口站着取,别听了,类比数据不会再被对应窗口统计)

    Flink窗口允许迟到数据,即触发窗口后,会先计算当前结果,但不关闭窗口(触发计算和关窗是两个动作)。 以后每来一条迟到数据,就触发一次这条数据所在窗口的增量计算。直到水位线被推进到了窗口结束时间 + 推迟时间。

    注意区分延迟和推迟,延迟是老师等你到8.40上课(触发计算时间延长了),推迟则是,8.40课开始上了(触发计算了),但教室门不关,你在开始上课后(开始上课类比触发计算)10分钟的铃声没响之前(类比推迟时间为10分钟),能到的话,你依旧可以进教室听课。如果过了推迟时间,你仍没有到,那就窗口关闭,教室关门,你去网吧游荡吧。总结就是:

    • 延迟时间,操作的是触发计算的时间,用来处理乱序问题
    • 推迟时间,操作的是触发关窗的时间,用来处理迟到数据
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))  //窗口10s
    .allowedLateness(Time.seconds(3))  //触发关窗延迟3秒
    
    
    • 1
    • 2
    • 3

    还是乱序流的例子,多一个allowedLateness

    public class WatermarkOutOfOrdernessDemo {
        public static void main(String[] args) throws Exception {
        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("node01", 9527)
                    .map(new WaterSensorMapFunction());
    
            // TODO 1.定义Watermark策略
            WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                    // 1.1 指定watermark生成:乱序的,等待3s
                    .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    // 1.2 指定 时间戳分配器,从数据中提取
                    .withTimestampAssigner(
                            (element, recordTimestamp) -> {
                                // 返回的时间戳,要 毫秒
                                return element.getTs() * 1000L;
                            });
    
            // TODO 2. 指定 watermark策略
            SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
    
            sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                    // TODO 3.使用 事件时间语义 的窗口
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .allowedLateness(Time.seconds(3))
                    .process(
                            new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
    
                                @Override
                                public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                    long startTs = context.window().getStart();
                                    long endTs = context.window().getEnd();
                                    String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                    String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    
                                    long count = elements.spliterator().estimateSize();
    
                                    out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                                }
                            }
                    )
                    .print();
    
            env.execute();
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    此时窗口为10s,延迟3s触发计算,窗口结束时间 + 推迟时间才触发关闭,即水位线到达10+3=13s时,才触发关窗。在水位线未被推到13前,对于迟到的数据,会再次触发计算,且是来一条,触发一次计算。关窗后,再来迟到数据就在不管了,不会触发计算。

    在这里插入图片描述

    这也和前面整理的窗口生命周期对上了:计算和关窗实际是两个动作,窗口销毁的时机(关窗)是在时间进展 >= 窗口最大时间戳(end-1ms) + 允许迟到时间(默认0)

    4、迟到数据处理:侧流输出

    在上面的延迟关窗与允许迟到的基础上,肯定还是不能囊括所有数据,因为乱序程度理论上可以无限大,如上的例子,对于等了10分钟才开课,且到了关教室门的时间还没到的学生,让去网吧游荡也不合理(类比流中直接丢弃这个数据),可以考虑把严重迟到的学生领到保安室,对应到Flink,那就是把乱序极大的数据使用侧流输出。关键代码:

    OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));  //侧流Tag对象
    
    • 1
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .allowedLateness(Time.seconds(3))
    .sideOutputLateData(lateTag)  //迟到数据侧流输出
    
    
    • 1
    • 2
    • 3
    • 4
    //主流
    process.print();
    // 从主流获取侧输出流,打印
    process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    完整demo:

    public class WatermarkLateDemo {
        public static void main(String[] args) throws Exception {
        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("node01", 9527)
                    .map(new WaterSensorMapFunction());
    
            WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                    .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L);
    
            SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
    
    
            OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));
    
            SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .allowedLateness(Time.seconds(2)) // 推迟2s关窗
                    .sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流
                    .process(
                            new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
    
                                @Override
                                public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                    long startTs = context.window().getStart();
                                    long endTs = context.window().getEnd();
                                    String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                    String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    
                                    long count = elements.spliterator().estimateSize();
    
                                    out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                                }
                            }
                    );
    
    
            process.print();
            // 从主流获取侧输出流,打印
            process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    执行:

    在这里插入图片描述

    5、问

    如果watermark设置延时等待3s,窗口允许迟到2s,为什么不直接延时等待5s?

    答:
    
    • 1
    • 首先延时时间不能设置太大,因为这会导致计算延迟太大,失去结果的实时性
    • 其次,窗口允许迟到是对迟到数据的补偿处理,尽量让结果准确,修正结果的
    • 因此,一般延时时间不设置一个较大的值,常为秒级,而允许迟到时间则可以用来处理大部分迟到数据,极端迟到的数据,可使用侧流输出,获取后再做对应的处理
  • 相关阅读:
    树莓派开发笔记(十二):入手研华ADVANTECH工控树莓派UNO-220套件(一):介绍和运行系统
    使用Unity制作3D驾驶游戏
    JavaScript数据类型学习脑图
    mac在linux服务器上部署前端项目
    Ubuntu安装AndroidStudio
    线上问题排查实例分析|关于 Redis 内存泄漏
    Spring Security(3)
    薪资17K,在字节外包工作是一种什么体验...
    使用Plsql+oracle client 连接 Oracle数据库
    如何降低MCU系统功耗?
  • 原文地址:https://blog.csdn.net/llg___/article/details/134035187