
观察源码
- public class BoundedOutOfOrdernessWatermarks
implements WatermarkGenerator { -
- /** The maximum timestamp encountered so far. */
- private long maxTimestamp;
-
- /** The maximum out-of-orderness that this watermark generator assumes. */
- private final long outOfOrdernessMillis;
-
- /**
- * Creates a new watermark generator with the given out-of-orderness bound.
- *
- * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
- */
- public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
- checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
- checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
-
- this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
-
- // start so that our lowest watermark would be Long.MIN_VALUE.
- this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
- // 视当前的事件时间戳,更新(或不更新)maxTimestamp
- maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
- }
-
- @Override
- public void onPeriodicEmit(WatermarkOutput output) {
- // 以当前eventTimeStamp-乱序延迟数-1,作为生成的watermark值
- output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
- }
- }

构造两条单并行度流,合并成一个单并行度流,来watermark,及重点观察“接收多个上游分区”的算子的watermark推进规律;
- package com.blok2;
-
- import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.util.Collector;
-
- import java.time.Duration;
-
- /**
- * @Date: 22.12.4
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _03_测试水位线生成 {
-
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
- // 两个数据流 , 并行度都是1
- DataStreamSource
ds1 = see.socketTextStream("linux01", 8888); - DataStreamSource
ds2 = see.socketTextStream("linux01", 9999); - // 为两个 源数据流 分配WM
- WatermarkStrategy
wm = WatermarkStrategy - .
forBoundedOutOfOrderness(Duration.ofSeconds(1)) - .withTimestampAssigner(
- new SerializableTimestampAssigner
() { - @Override
- public long extractTimestamp(String element, long recordTimestamp) {
- return Long.parseLong(element.split(",")[1]);
- }
- }
- //自动推进水位线更新 ,避免长时间没有数据 ,造成水位线无法向前推进
- ).withIdleness(Duration.ofSeconds(1));
-
- SingleOutputStreamOperator
sso1 = ds1.assignTimestampsAndWatermarks(wm); - SingleOutputStreamOperator
sso2 = ds2.assignTimestampsAndWatermarks(wm); -
-
- DataStream
res = sso1.union(sso2); - res.process(new ProcessFunction
() { - @Override
- public void processElement(String value, ProcessFunction
.Context ctx, Collector out) throws Exception { - long wmTime = ctx.timerService().currentWatermark();
-
- Long currentTime = ctx.timestamp();
-
- System.out.println(value + " waterMaker: " + wmTime + " currentTime: " + currentTime);
- out.collect(value.toUpperCase());
- }
- }).print();
-
- see.execute("测试水位线策略");
-
-
- }
- }
当前最大时间 - 允许乱序时间 -1
a,10000 waterMaker: -9223372036854775808 currentTime: 10000
A,10000
1,20000 waterMaker: 8999 currentTime: 20000
1,20000
a,30000 waterMaker: 18999 currentTime: 30000
A,30000
a,10000 waterMaker: 28999 currentTime: 10000
A,10000