• Flink-水位线的设置以及传递


    6.2 水位线

    6.2.1 概述

    1. 分类
    • 有序流

    image.png

    • 无序流
      image.png
      判断的时间延迟
    1. 延迟时间判定

    6.2.2 水位线的设置

    1. 分析

    image.png
    DataStream下的assignTimstampsAndWatermarks方法,返回SingleOutputStreamOperator本质还是个算子,传入的参数是WatermarkStrategy的生成策略

    image.png
    但是WatermarkStrategy是一个接口

    • 有序流

    image.png

    因此调用静态方法forMonotonousTimeStamps后new AscendingTimestampsWatermarks返回WatermarkGernerator
    image.png

    AscendingTimestampsWatermarks这个继承自BoundOutOfOrdernessWatermarks

    image.png
    image.png
    image.png

    BoundOutOfOrdernessWatermarks这个类有onEvent和onPeriodicEmit这两方法,因为实现了WatermarkGenerator这个接口

    image.png

    然后在调用接口中的默认方法withTimestampAssigner得到返回WatermarkStrategy,参数是new SerializableTimestampAssigner的对象,重写extractTimestamp方法,这个方法作用是怎么样从数据里面提取时间戳

    image.png

    • 乱序流

    image.png
    因此调用静态方法forBoundedOutOfOrderness(参数为最大乱序程度,也就是延迟时间)后new BoundOutOfOrdernessWatermarks返回 WatermarkGernerator

    image.png

    BoundOutOfOrdernessWatermarks这个类有onEvent和onPeriodicEmit这两方法,因为实现了WatermarkGenerator这个接口(跟上面一样了)

    image.png

    后面也跟有序一样,然后在调用接口中的默认方法withTimestampAssigner得到返回WatermarkStrategy

    • 关系图
      在这里插入图片描述
    1. 完整代码
    public class WatermarkTest {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            //1.输入
            SingleOutputStreamOperator<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice", "./prod?id=100", 3000L),
                    new Event("Bob", "./prod?id=1", 3300L),
                    new Event("Alice", "./prod?id=200", 3000L),
                    new Event("Bob", "./home", 3500L),
                    new Event("Bob", "./prod?id=2", 3800L),
                    new Event("Bob", "./prod?id=3", 4200L))
                
    //                //有序流的watermark生成
    //                //forMonotonousTimestamps前指定泛型
    //                .assignTimestampsAndWatermarks(WatermarkStrategy
    //                        .forMonotonousTimestamps()//得到WatermarkGenerator
    //                        .withTimestampAssigner(new SerializableTimestampAssigner() {//返回WatermarkStrategy
    //                            @Override
    //                            //参数是当前传过来的数据element,另一个传出的recordTimestamp是时间戳
    //                            public long extractTimestamp(Event element, long recordTimestamp) {
    //                                return element.timestamp;
    //                            }
    //                        })
    //                )
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        //forMonotonousTimestamps前指定泛型
                        //forMonotonousTimestamps参数是最大乱序时间
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))//得到WatermarkGenerator
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    6.2.3 自定义水位线

    1. 分析

    image.png

    或者直接new 一个接口WatermarkStrategy重写createWatermarkGenerator的watermark生成器的方法(生成WatermarkGenerator)以及createTimeStampAssigner提取时间戳分配器的方法(生成TimeStampAssigner)创建watermark

    image.png

    image.png

    image.png

    image.png

    WatermarkGenerator是个接口,有两个方法分别是onEvent方法,主要目标是要发出一个WatermarkOutput,另一个是onperiodicEmit方法,表示周期性的生成,周期性生成时间默认是2秒,env调用getConfig后调用setAutoWatermarkInterval后可以更改周期性生成时间

    image.png
    image.png

    WatermarkOutput也是一个接口,调用emitWatermark就能发出一个watermark,

    image.png

    image.png

    除了WatermarkGenerator接口还有TimeStampAssigner也是个接口,里面只有一个方法叫做extractTimestamp,目的是从当前数据提取时间戳,同时也会作为WatermarkGenerator这个接口中onEvent方法中传入的参数eventTimestamp时间戳

    • 关系图
      在这里插入图片描述
      这图估计也就我自己能看的懂了。。。
    1. 代码
    • 正常水位线
    // 自定义水位线的产生
    public class CustomWatermarkTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.addSource(new ClickSource())
                    .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
                    .print();
            env.execute();
        }
        //内部静态类
        public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
            @Override
            //createTimestampAssigner方法生成TimeStampAssigner
            public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return new SerializableTimestampAssigner<Event>() {
                    @Override
                    //extractTimestamp,目的是从当前数据提取时间戳
                    public long extractTimestamp(Event element, long recordTimestamp)
                    {
                        return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
                    }
                };
            }
            @Override
            //createWatermarkGenerator生成WatermarkGenerator
            public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new CustomPeriodicGenerator();
            }
        }
        //CustomPeriodicGenerator实现WatermarkGenerator接口,并重写方法
        public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
            private Long delayTime = 5000L; // 延迟时间
            private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
            @Override
            //更新当前时间戳,这边不发送水位线,目的是保存时间戳
            public void onEvent(Event event, long eventTimestamp, WatermarkOutput
                    output) {
                // 每来一条数据就调用一次
                maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
            }
            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                // 发射水位线,默认 200ms 调用一次
                //-1毫秒都是为了贴切窗口闭合的时候左闭右开设计
                output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 断点水位线

    在onevent根据条件触发,onPeriodicEmit这个方法中就不用做了

        public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
            @Override
            public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
            // 只有在遇到特定的 itemId 时,才发出水位线
                if (r.user.equals("Mary")) {
                    output.emitWatermark(new Watermark(r.timestamp - 1));
                }
            }
            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 在自定义数据源中发送水位线

    使用 collectWithTimestamp 方法将数据发送出去,原来直接out.collect()的

    image.png

    参数是当前数据还有当前数据的时间戳,跟水位线生成中extractTimestamp(Event element, long recordTimestamp)这个类似,也是一个数据是什么,一个时间戳是啥

    然后发送水位线,用emitWatermark方法生成

    public class CustomWatermarkTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.addSource(new ClickSourceWithWatermark()).print();
            env.execute();
        }
        // 泛型是数据源中的类型
        public static class ClickSourceWithWatermark implements SourceFunction<Event>
        {
            private boolean running = true;
            @Override
            public void run(SourceFunction.SourceContext<Event> sourceContext) throws Exception {
                Random random = new Random();
                String[] userArr = {"Mary", "Bob", "Alice"};
                String[] urlArr = {"./home", "./cart", "./prod?id=1"};
                while (running) {
                    long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
                    String username = userArr[random.nextInt(userArr.length)];
                    String url = urlArr[random.nextInt(urlArr.length)];
                    Event event = new Event(username, url, currTs);
                    // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段
                    sourceContext.collectWithTimestamp(event, event.timestamp);
                    // 发送水位线
                    sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
                    Thread.sleep(1000L);
                }
            }
            @Override
            public void cancel() {
                running = false;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    6.2.4 水位线的传递

    针对多个分区,上游需要告诉下游水位线情况,采用的是广播的方式给所有下游子任务

    但是上游如果也是并行的,向下传输的水位线可能有多个,以上游发过来最小的时钟为准,并且下游会有一个分区专门保存上游发过来的水位线最小的数据

    image.png

    image.png

  • 相关阅读:
    Linux C/C++ 学习笔记(七):DNS协议与请求
    tiup dm template
    【详细】Java网络通信 TCP、UDP、InetAddress
    机器学习性能评估指标
    外包干了3个月,技术退步明显。。。。。
    从入门到一位合格的爬虫师,这几点很重要
    网课查题API接口(免费)
    《机械设计基础》题库
    NPOI组件下载、引用、基本使用
    我的企业证书是正常的但是下载应用app到手机提示无法安装“app名字”无法安装此app,因为无法验证其完整性解决方案
  • 原文地址:https://blog.csdn.net/m0_46507516/article/details/127940279