• 7.2、如何理解Flink中的水位线(Watermark)


    目录

    0、版本说明

    1、什么是水位线?

    2、水位线使用场景?

    3、设计水位线主要为了解决什么问题?

    4、怎样在flink中生成水位线?

    4.1、自定义标记 Watermark 生成器

    4.2、自定义周期性 Watermark 生成器

    4.3、内置Watermark生成器 - 有序流水位线生成器

    4.4、内置Watermark生成器 - 乱序流水位线生成器

    4.5、在 读取数据源时 添加水位线

    5、水位线和窗口的关系?

    6、水位线在各个算子间的传递

    6.1、测试用例 - 不设置 withIdleness 超时时间

    6.2、测试用例 - 设置 withIdleness 超时时间


    0、版本说明

            开发语言:java1.8

            Flink版本:1.17

            官网链接:官网链接

    1、什么是水位线?

            Flink中水位线是一条特殊的数据(long timestamp)

            它会以时间戳的形式作为一条标识数据插入到数据流中


    2、水位线使用场景?

            使用事件时间(EventTime)做流式计算任务时,需要根据事件时间生成水位线(Watermark)

            通过水位线来触发窗口计算,水位线作为衡量事件时间(EventTime)进展的标识


    3、设计水位线主要为了解决什么问题?

            设计水位线主要是为了解决实时流中数据乱序和迟到的问题

            思考:什么原因造成了数据流的乱序呢?

                    如今数据采集、数据传输大多都在分布式系统中完成

                    各个机器节点因为网络和自身性能的原因 导致了数据的乱序和迟到


    4、怎样在flink中生成水位线?

            Flink中支持在 数据源和普通DataStream上添加水位线生成策略(WatermarkStrategy)

    4.1、自定义标记 Watermark 生成器

    标记 Watermark 生成器特点:

            每条数据到来后,都会为其生成一条 Watermark

    适用场景:

            数据量小且数据有序

    代码示例:        

    Step1:自定义 标记水位线生成器 实现类

    1. // 自定义 标记水位线生成器 实现类
    2. public class PeriodWatermarkGenerator implements WatermarkGenerator {
    3. // 每进入一条数据,都会调用一次 onEvent 方法
    4. @Override
    5. /*
    6. * 参数说明:
    7. * @event : 进入到该方法的事件数据
    8. * @eventTimestamp : 时间戳提取器提取的时间戳
    9. * */
    10. public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
    11. //发射水位线
    12. output.emitWatermark(new Watermark(eventTimestamp));
    13. }
    14. // 不需要实现
    15. @Override
    16. public void onPeriodicEmit(WatermarkOutput output) {
    17. }
    18. }

    Step2:自定义 标记性水位线生成策略 实现类

    1. // TODO 自定义 标记性水位线生成策略
    2. public class PeriodWatermarkStrategy implements WatermarkStrategy> {
    3. // TODO 实例化一个 事件时间提取器
    4. @Override
    5. public TimestampAssigner> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
    6. TimestampAssigner> timestampAssigner = new TimestampAssigner>() {
    7. @Override
    8. public long extractTimestamp(Tuple2 element, long recordTimestamp) {
    9. return element.f1;
    10. }
    11. };
    12. return timestampAssigner;
    13. }
    14. // TODO 实例化一个 watermark 生成器
    15. @Override
    16. public WatermarkGenerator> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
    17. return new PeriodWatermarkGenerator<>();
    18. }
    19. }

    Step3:使用 标记性水位线生成策略

    1. // TODO 使用 自定义标记 Watermark 生成器
    2. public class UserPeriodWatermarkStrategy {
    3. public static void main(String[] args) throws Exception {
    4. // 1.获取执行环境
    5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    6. env.setParallelism(1);
    7. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    8. SingleOutputStreamOperator> sourceDataStream = env.socketTextStream("localhost", 9999)
    9. .map(new MapFunction>() {
    10. @Override
    11. public Tuple2 map(String value) throws Exception {
    12. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    13. }
    14. }
    15. );
    16. // 3.为 DataStream 添加水位线生成策略 (使用 自定义WatermarkStrategy 实现类)
    17. SingleOutputStreamOperator> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(new PeriodWatermarkStrategy());
    18. // 4.通过 processFunction实例 查看生成的水位线
    19. SingleOutputStreamOperator process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
    20. process.print();
    21. // 5.触发程序执行
    22. env.execute();
    23. }
    24. }

    查看运行结果:


    4.2、自定义周期性 Watermark 生成器

    标记 Watermark 生成器特点:

            基于处理时间,周期性生成 Watermark

    适用场景:

            数据量大且可能存在一定程度数据延迟(乱序)

    代码示例:        

    Step1:自定义 周期性水位线生成器 实现类

    1. // 自定义 周期性水位线生成器
    2. public class PunctuatedWatermarkGenerator implements WatermarkGenerator {
    3. // 设置变量,用来保存 当前最大的事件时间
    4. private long currentMaxTimestamp;
    5. // 设置变量,指定最大的乱序时间(等待时间)
    6. private final long maxOutOfOrderness = 0000; // 3 秒
    7. @Override
    8. public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
    9. // 只更新当前最大时间戳,不再发生水位线
    10. if (currentMaxTimestamp < eventTimestamp) currentMaxTimestamp = eventTimestamp;
    11. }
    12. // 周期性 生成水位线
    13. // 每个 setAutoWatermarkInterval 时间,调用一次该方法
    14. @Override
    15. public void onPeriodicEmit(WatermarkOutput output) {
    16. // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
    17. output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
    18. }
    19. }

    Setp2:自定义 周期性水位线生成策略 实现类

    1. // 自定义 周期性水位线生成策略
    2. public class PunctuatedWatermarkStrategy implements WatermarkStrategy> {
    3. // TODO 实例化一个 事件时间提取器
    4. @Override
    5. public TimestampAssigner> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
    6. TimestampAssigner> timestampAssigner = new TimestampAssigner>() {
    7. @Override
    8. public long extractTimestamp(Tuple2 element, long recordTimestamp) {
    9. return element.f1;
    10. }
    11. };
    12. return timestampAssigner;
    13. }
    14. // TODO 实例化一个 watermark 生成器
    15. @Override
    16. public WatermarkGenerator> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
    17. return new PunctuatedWatermarkGenerator<>();
    18. }
    19. }

    Step3:周期性水位线生成策略

    1. // TODO 使用 自定义周期性 Watermark 生成器
    2. public class UserPunctuatedWatermarkStrategy {
    3. public static void main(String[] args) throws Exception {
    4. // 1.获取执行环境
    5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    6. env.setParallelism(1);
    7. // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
    8. env.getConfig().setAutoWatermarkInterval(3 * 1000L);
    9. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    10. SingleOutputStreamOperator> ds = env.socketTextStream("localhost", 9999)
    11. .map(new MapFunction>() {
    12. @Override
    13. public Tuple2 map(String value) throws Exception {
    14. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    15. }
    16. }
    17. );
    18. // TODO 获取 WatermarkStrategy实例 (方式1:通过 WatermarkStrategy实现类获取)
    19. PunctuatedWatermarkStrategy punctuatedWatermarkStrategy = new PunctuatedWatermarkStrategy();
    20. // TODO 获取 WatermarkStrategy实例 (方式2:通过 WatermarkStrategy工具类获取) 推荐
    21. WatermarkStrategy> punctuatedWatermarkStrategyByUtil = WatermarkStrategy.>forGenerator(context -> new PunctuatedWatermarkGenerator<>())
    22. .withTimestampAssigner((event, timestamp) -> event.f1);
    23. // 3.使用 自定义水位线策略实例 来提取时间戳&生成水位线
    24. SingleOutputStreamOperator> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(punctuatedWatermarkStrategy);
    25. // 4.通过 processFunction实例 查看生成的水位线
    26. SingleOutputStreamOperator process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
    27. process.print();
    28. // 3.触发程序执行
    29. env.execute();
    30. }
    31. }

    查看运行结果:


    4.3、内置Watermark生成器 - 有序流水位线生成器

    有序流水位线生成器特点:

            基于处理时间,周期性生成 Watermark,最大乱序时间为0

    适用场景:

            大数量有序流

    代码示例:

    1. // TODO 内置Watermark生成器 - 有序流水位线生成器
    2. public class UserForMonotonousTimestamps {
    3. public static void main(String[] args) throws Exception {
    4. // 1.获取执行环境
    5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    6. env.setParallelism(1);
    7. // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
    8. env.getConfig().setAutoWatermarkInterval(3 * 1000L);
    9. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    10. SingleOutputStreamOperator> sourceDataStream = env.socketTextStream("localhost", 9999)
    11. .map(new MapFunction>() {
    12. @Override
    13. public Tuple2 map(String value) throws Exception {
    14. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    15. }
    16. }
    17. );
    18. // TODO 创建 内置水位线生成策略
    19. WatermarkStrategy> watermarkStrategy = WatermarkStrategy.>forMonotonousTimestamps()
    20. .withTimestampAssigner((element,recordTimestamp) -> element.f1);
    21. // 3.使用 内置水位线生成策略
    22. SingleOutputStreamOperator> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(watermarkStrategy);
    23. // 4.通过 processFunction实例 查看生成的水位线
    24. SingleOutputStreamOperator process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
    25. process.print();
    26. // 3.触发程序执行
    27. env.execute();
    28. }
    29. }

    查看运行结果:


    4.4、内置Watermark生成器 - 乱序流水位线生成器

    乱序流水位线生成器特点:

            基于处理时间,周期性生成 Watermark,可以这是最大乱序时间

    适用场景:

            大数量乱序流

    代码示例:

    1. // TODO 内置Watermark生成器 - 乱序流水位线生成器
    2. public class UserForBoundedOutOfOrderness {
    3. public static void main(String[] args) throws Exception {
    4. // 1.获取执行环境
    5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    6. env.setParallelism(1);
    7. // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
    8. env.getConfig().setAutoWatermarkInterval(3 * 1000L);
    9. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    10. SingleOutputStreamOperator> ds = env.socketTextStream("localhost", 9999)
    11. .map(new MapFunction>() {
    12. @Override
    13. public Tuple2 map(String value) throws Exception {
    14. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    15. }
    16. }
    17. );
    18. // TODO 获取 WatermarkStrategy实例
    19. WatermarkStrategy> watermarkStrategy = WatermarkStrategy
    20. .>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 设置最大乱序时间为1s
    21. .withTimestampAssigner((element,recordTimestamp) -> element.f1);
    22. // 3.使用 内置水位线生成策略
    23. SingleOutputStreamOperator> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(watermarkStrategy);
    24. // 4.通过 processFunction实例 查看生成的水位线
    25. SingleOutputStreamOperator process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
    26. process.print();
    27. // 3.触发程序执行
    28. env.execute();
    29. }
    30. }

    查看运行结果:


    4.5、在 读取数据源时 添加水位线

    1. // 1.获取执行环境
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. // 2.创建 Source 对象
    4. Source source = DataGeneratorSource、KafkaSource...
    5. // 3.读取 source时添加水位线
    6. env
    7. .fromSource(source, WatermarkStrategy实例, "source name")
    8. .print()
    9. ;
    10. // 4.触发程序执行
    11. env.execute();

    5、水位线和窗口的关系?

    窗口什么时候创建?

            当窗口内的第一条数据到达时

    窗口什么时候触发计算?

            当阈值水位线到达窗口时


    6、水位线在各个算子间的传递

            下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值

    测试代码:

    1. // TODO 测试水位线的传递
    2. public class TransmitWaterMark {
    3. public static void main(String[] args) throws Exception {
    4. // 1.获取执行环境
    5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    6. env.setParallelism(3);
    7. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    8. DataStreamSource source = env.socketTextStream("localhost", 9999);
    9. source
    10. .partitionCustom(
    11. new Partitioner() {
    12. @Override
    13. public int partition(String key, int numPartitions) {
    14. if (key.equals("a")) {
    15. return 0;
    16. } else if (key.equals("b")) {
    17. return 1;
    18. } else {
    19. return 2;
    20. }
    21. }
    22. }, value -> value.split(",")[0]
    23. )
    24. .map(new MapFunction>() {
    25. @Override
    26. public Tuple2 map(String value) throws Exception {
    27. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    28. }
    29. })
    30. .assignTimestampsAndWatermarks(
    31. WatermarkStrategy
    32. //.>forMonotonousTimestamps()
    33. .>forGenerator(new PeriodWatermarkStrategy())
    34. .withTimestampAssigner((element,recordTimestamp) -> element.f1)
    35. .withIdleness(Duration.ofSeconds(5)) //空闲等待5s
    36. )
    37. .process(new ShowProcessFunction()).setParallelism(1)
    38. .print();
    39. env.execute();
    40. }
    41. }

    6.1、测试用例 - 不设置 withIdleness 超时时间

    现象:如果上游某一个子任务一直没有数据更新,下游算子的水位线一直不会变化


    6.2、测试用例 - 设置 withIdleness 超时时间

    现象:如果上游某一个子任务`在指定时间内`数据更新,下游算子的水位线将不受该子任务最小值的影响

  • 相关阅读:
    AWVS漏洞扫描使用基础与介绍
    打卡记录编程成长/CSDN编程竞赛(第6期)
    Oracle数据库安装及配置
    工控安全PLC固件逆向三
    Nginx搭载负载均衡及前端项目部署
    node.js-模块化
    【HMS】地图标记聚合HWMarkerCluster支持设置聚合计算时网格的像素大小、最大的聚合级别
    最全HTTP/HTTPS面试题整理(三)
    VSCode-常用快捷键
    day04-前台首页、导出项目依赖
  • 原文地址:https://blog.csdn.net/weixin_42845827/article/details/133080491