时间语义,是 flink 中用于时间推进和时间判断的机制,以什么为判断标准,就产生了两种不同的时间语义:
flink 1.12及以后,flink 以event time 作为默认时间语义。在需要指定时间语义的相关操作(如时间窗口)时,可以通过显式的api来使用特定的时间语义。
keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)))
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(30)))
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
如果需要禁用event time,则可以通过设置 watermark 生成频率间隔来实现:
// 如果设置为0,则禁用了 watermark 的生成,从而失去了 event time 语义
ExcutionConfig.setAutoWatermarkInterval(long);
数据在接收处理过程中,数据时间可能存在乱序,所以引入 watermark,就是在事件时间语义中,用于单调递增向前推进时间的一种标记。
watermark的核心是在数据中周期性地插入一种时间戳单调递增的特殊数据(watermark),是 flink 内部自动产生并插入到数据流的,来不可逆转地在整个数据流中进行时间的推进。
// watermark 的生命周期(默认值为200ms)
env.getConfig().setAutoWatermarkInterval(200)
策略1: WatermarkStrategy.noWatermarks() 不生成 watermark,禁用了事件时间的推进机制
策略2: WatermarkStrategy.forMonotonousTimestamps() 紧跟最大事件时间
策略3: WatermarkStrategy.forBoundedOutOfOrderness() 允许乱序的 watermark生成策略
策略4: WatermarkStrategy.forGenerator() 自定义watermark生成算法
例 一
从最源头算子开始,生成watermark
// 1、构造一个watermark的生成策略对象(算法策略,及事件时间的抽取方法)
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofMillis(0)) // 允许乱序的算法策略
.withTimestampAssigner((element, recordTimestamp) -> Long.parseLong(element.split(",")[2])); // 时间戳抽取方法
// 2、将构造好的 watermark策略对象,分配给流(source算子)
s1.assignTimestampsAndWatermarks(watermarkStrategy);
例 二
不从最源头算子开始生成watermark,而是从中间环节的某个算子开始生成watermark。
注意:如果在源头就已经生成了watermark, 就不要在下游再次产生watermark
SingleOutputStreamOperator<EventBean> s2 = s1.map(s -> {
String[] split = s.split(",");
return new EventBean(Long.parseLong(split[0]), split[1], Long.parseLong(split[2]), split[3]);
}).returns(EventBean.class)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<EventBean>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<EventBean>() {
@Override
public long extractTimestamp(EventBean eventBean, long recordTimestamp) {
return eventBean.getTimeStamp();
}
})
);