在前一篇文章 Flink 源码解读系列 DataStream 窗口 Window 实现 中,我们了解到 Flink 窗口 Window 有两种具体实现,一个是 TimeWindow,一个是 GlobalWindow。有了窗口之后,我们如何将元素分配给窗口呢?在这篇文章中我们重点了解一下窗口分配器 WindowAssigner 是如何将输入流中的元素划分给窗口的。
在了解窗口分配器 WindowAssigner 内部实现之前,我们先看一下如何为窗口算子指定窗口分配器。Flink 为我们提供了几种指定窗口分配器的方式,具体取决于输入流是不是 KeyedStream。如果是在 KeyedStream 上使用窗口,可以使用如下三个方法指定窗口分配器:
如果是在 DataStream 上使用窗口,可以使用如下三个方法指定窗口分配器:
在 KeyedStream 和 DataStream 上使用窗口的方式基本一致。
在 KeyedStream 上可以通过 window 方法指定窗口分配器,而对于 DataStream 则需要使用 windowAll 方法指定:
// KeyedStream 上使用
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
return new WindowedStream<>(this, assigner);
}
// DataStream 上使用
public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
return new AllWindowedStream<>(this, assigner);
}
Flink 1.12 版本中标记为弃用,推荐使用 window 方法
除了使用 window(或者 windowAll) 方法来指定窗口分配器之外,也可以使用 timeWindow(或者 timeWindowAll) 来指定窗口分配器。这种方式需要与时间特性配合使用,具体是基于事件时间的窗口还是基于处理时间的窗口,取决于你设置的 TimeCharacteristic:
// KeyedStream 滚动时间窗口
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
// KeyedStream 滑动时间窗口
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
// DataStream 滚动时间窗口
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return windowAll(TumblingProcessingTimeWindows.of(size));
} else {
return windowAll(TumblingEventTimeWindows.of(size));
}
}
// DataStream 滑动时间窗口
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return windowAll(SlidingProcessingTimeWindows.of(size, slide));
} else {
return windowAll(SlidingEventTimeWindows.of(size, slide));
}
}
从上面代码中可以看到 timeWindow 函数只是对 window 函数的一次封装,封装之后我们不用关心到底是使用滚动事件时间窗口分配器 TumblingEventTimeWindows、滚动处理时间窗口分配器 TumblingProcessingTimeWindows、滑动事件时间窗口分配器 SlidingEventTimeWindows 还是滑动处理时间窗口分配器 SlidingProcessingTimeWindows。timeWindow 会根据你设置的时间特性 TimeCharacteristic 以及是否有滑动步长来自选选择对应的窗口分配器。例如时间特性为事件时间 EventTime,只有窗口大小没有滑动步长,timeWindow 会你提供滚动事件时间窗口分配器 TumblingEventTimeWindows。这样方式更简洁一些,出错的可能性也更低,不需要记住各种不同的窗口分配器。
需要注意的是在 Flink 1.12 版本中,DataStream API 中的 timeWindow() 方法已经标注为 @Deprecated。Flink 社区推荐使用带 TumblingEventTimeWindows、SlidingEventTimeWindows、TumblingProcessingTimeWindows 或 SlidingProcessingTimeWindows 的 window(WindowAssigner) 方法。主要原因是在这个版本中弃用了 DataStream API 中的时间特性,从而导致无法继续基于时间特性来判断是基于处理时间的窗口还是基于事件时间的窗口。
对于使用全局窗口 GlobalWindow 实现计数的话,Flink 提供了便捷方法 countWindow 实现:
// KeyedStream
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create())