本文研究一下Flink中的BoundedOutOfOrdernessTimestampExtractor
t-t_w,t为element的eventTime,t_w为前一次watermark的时间
)的最大时间,在计算窗口数据时,如果超过该值则会被忽略currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness,这里表示lastEmittedWatermark太小了所以差值超过了maxOutOfOrderness,因而调大lastEmittedWatermark
),最后返回Watermark(lastEmittedWatermark)- public abstract class BoundedOutOfOrdernessTimestampExtractor
implements AssignerWithPeriodicWatermarks { -
- private static final long serialVersionUID = 1L;
-
- /** The current maximum timestamp seen so far. */
- //定义当前最大时间戳
- private long currentMaxTimestamp;
-
- /** The timestamp of the last emitted watermark. */
- //最后提交的时间戳
- private long lastEmittedWatermark = Long.MIN_VALUE;
-
- /**
- * The (fixed) interval between the maximum seen timestamp seen in the records
- * and that of the watermark to be emitted.
- */
- private final long maxOutOfOrderness;
-
- public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
- if (maxOutOfOrderness.toMilliseconds() < 0) {
- throw new RuntimeException("Tried to set the maximum allowed " +
- "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
- }
- this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
- this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
- }
-
- public long getMaxOutOfOrdernessInMillis() {
- return maxOutOfOrderness;
- }
-
- /**
- * Extracts the timestamp from the given element.
- *
- * @param element The element that the timestamp is extracted from.
- * @return The new timestamp.
- */
- public abstract long extractTimestamp(T element);
-
- @Override
- public final Watermark getCurrentWatermark() {
- // this guarantees that the watermark never goes backwards.
- //这个句代码保证了生成的水印是单调递增的
- //当前最大的时间戳减去延时时间和上次最后提交的水印时间比较
- //保留最大的时间(减去延时时间)作为水印
- long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
- if (potentialWM >= lastEmittedWatermark) {
- lastEmittedWatermark = potentialWM;
- }
- return new Watermark(lastEmittedWatermark);
- }
-
- //提取数据中时间作为timestamp
- //如果timestamp 大于最大的currentMaxTimestamp 就把currentMaxTimestamp 置为 timestamp
- //返回当前提取到的timestamp
- @Override
- public final long extractTimestamp(T element, long previousElementTimestamp) {
- long timestamp = extractTimestamp(element);
- if (timestamp > currentMaxTimestamp) {
- currentMaxTimestamp = timestamp;
- }
- return timestamp;
- }
- }
-