• 聊聊flink的BoundedOutOfOrdernessTimestampExtractor


    本文研究一下Flink中的BoundedOutOfOrdernessTimestampExtractor

    • BoundedOutOfOrdernessTimestampExtractor抽象类实现AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同时声明抽象方法extractAscendingTimestamp供子类实现
    • BoundedOutOfOrdernessTimestampExtractor的构造器接收maxOutOfOrderness参数用于指定element允许滞后(t-t_w,t为element的eventTime,t_w为前一次watermark的时间)的最大时间,在计算窗口数据时,如果超过该值则会被忽略
    • BoundedOutOfOrdernessTimestampExtractor的extractTimestamp方法会调用子类的extractTimestamp方法抽取时间,如果该时间大于currentMaxTimestamp,则更新currentMaxTimestamp;getCurrentWatermark先计算potentialWM,如果potentialWM大于等于lastEmittedWatermark则更新lastEmittedWatermark(currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness,这里表示lastEmittedWatermark太小了所以差值超过了maxOutOfOrderness,因而调大lastEmittedWatermark),最后返回Watermark(lastEmittedWatermark)
      1. public abstract class BoundedOutOfOrdernessTimestampExtractor implements AssignerWithPeriodicWatermarks {
      2. private static final long serialVersionUID = 1L;
      3. /** The current maximum timestamp seen so far. */
      4. //定义当前最大时间戳
      5. private long currentMaxTimestamp;
      6. /** The timestamp of the last emitted watermark. */
      7. //最后提交的时间戳
      8. private long lastEmittedWatermark = Long.MIN_VALUE;
      9. /**
      10. * The (fixed) interval between the maximum seen timestamp seen in the records
      11. * and that of the watermark to be emitted.
      12. */
      13. private final long maxOutOfOrderness;
      14. public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
      15. if (maxOutOfOrderness.toMilliseconds() < 0) {
      16. throw new RuntimeException("Tried to set the maximum allowed " +
      17. "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
      18. }
      19. this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
      20. this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
      21. }
      22. public long getMaxOutOfOrdernessInMillis() {
      23. return maxOutOfOrderness;
      24. }
      25. /**
      26. * Extracts the timestamp from the given element.
      27. *
      28. * @param element The element that the timestamp is extracted from.
      29. * @return The new timestamp.
      30. */
      31. public abstract long extractTimestamp(T element);
      32. @Override
      33. public final Watermark getCurrentWatermark() {
      34. // this guarantees that the watermark never goes backwards.
      35. //这个句代码保证了生成的水印是单调递增的
      36. //当前最大的时间戳减去延时时间和上次最后提交的水印时间比较
      37. //保留最大的时间(减去延时时间)作为水印
      38. long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
      39. if (potentialWM >= lastEmittedWatermark) {
      40. lastEmittedWatermark = potentialWM;
      41. }
      42. return new Watermark(lastEmittedWatermark);
      43. }
      44. //提取数据中时间作为timestamp
      45. //如果timestamp 大于最大的currentMaxTimestamp 就把currentMaxTimestamp 置为 timestamp
      46. //返回当前提取到的timestamp
      47. @Override
      48. public final long extractTimestamp(T element, long previousElementTimestamp) {
      49. long timestamp = extractTimestamp(element);
      50. if (timestamp > currentMaxTimestamp) {
      51. currentMaxTimestamp = timestamp;
      52. }
      53. return timestamp;
      54. }
      55. }

  • 相关阅读:
    ORACLE not available如何解决
    Leetcode140-单词拆分 II
    dockerfile 快速部署jar项目运行
    Leetcode 目标和(递归)
    Unity之ShaderGraph如何实现积雪效果
    Flutter-自定义画板
    (02)Cartographer源码无死角解析-(29) LocalTrajectoryBuilder2D::AddRangeData()→多雷达数据时间同步
    Docker基础概念
    解决MySQL大版本升级导致.Net(C#)程序连接报错问题
    技术派Spring事件监听机制及原理
  • 原文地址:https://blog.csdn.net/m0_57320261/article/details/126097387