在flink中,watermark用于标识数据当前的进度、触发窗口计算、通过延迟设置容忍部分数据的乱序,详细定义可见:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/
那么,watermark具体如何计算以及怎样对乱序数据起作用?特此通过代码加以解析。
注:下文中所涉及的flink源码版本为 release-1.15,使用flink sql作为例子。
首先,以event time为例,定义watermark的方式为:
- CREATE TABLE Orders (
- `user` BIGINT,
- product STRING,
- order_time TIMESTAMP(3),
- WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
- ) WITH ( . . . );
该table的watermark策略为当前最大的时间戳 - 5 second,如下:
Emits watermarks, which are the maximum observed timestamp minus the specified delay, e.g.,
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECONDis a 5 seconds delayed watermark strategy.
首先,我们来看watermark的生成机制,以BoundedOutOfOrderTimestamps为例:
- @Override
- public void nextTimestamp(long timestamp) {
- if (timestamp > maxTimestamp) {
- maxTimestamp = timestamp;
- }
- }
- @Override
- public Watermark getWatermark() {
- return new Watermark(maxTimestamp - delay);
- }
有了watermark之后,就可以 进行窗口相关的操作了。
以常用的tumble滚动窗口为例,什么时候会触发水位的计算呢,我们可以找到如下判断逻辑
- @Override
- public boolean onElement(Object element, long timestamp, W window) throws Exception {
- if (triggerTime(window) <= ctx.getCurrentWatermark()) {
- // if the watermark is already past the window fire immediately
- return true;
- } else {
- ctx.registerEventTimeTimer(triggerTime(window));
- return false;
- }
- }
可以看到,当watermark超过窗口时,会触发计算。由于我们已经设置了乱序容忍即delay时间,因此,当所有event的最大timestamp超过窗口delay时间后,才会触发计算。
对于那些延迟到达(即timestamp产生了回退的event)有两种可能:
在实际的flink job中,我们会并行运行多个operator,同时每个operator有多个上下游input/output,此时,针对多个不同的输入watermark会进行合并处理,同时将计算得到的watermark下发至所有的下游operator(即所谓的广播)。
flink中实际处理record与window关系的类是WindowOperator,其继承了AbstractStreamOperator,在AbstractStreamOperator中,通IndexedCombinedWatermarkStatus管理多个input产生的watermark:
- private void processWatermark(Watermark mark, int index) throws Exception {
- if (combinedWatermark.updateWatermark(index, mark.getTimestamp())) {
- processWatermark(new Watermark(combinedWatermark.getCombinedWatermark()));
- }
- }
- public boolean updateCombinedWatermark() {
- long minimumOverAllOutputs = Long.MAX_VALUE;
-
- // if we don't have any outputs minimumOverAllOutputs is not valid, it's still
- // at its initial Long.MAX_VALUE state and we must not emit that
- if (partialWatermarks.isEmpty()) {
- return false;
- }
-
- boolean allIdle = true;
- for (PartialWatermark partialWatermark : partialWatermarks) {
- if (!partialWatermark.isIdle()) {
- minimumOverAllOutputs =
- Math.min(minimumOverAllOutputs, partialWatermark.getWatermark());
- allIdle = false;
- }
- }
-
- this.idle = allIdle;
-
- if (!allIdle && minimumOverAllOutputs > combinedWatermark) {
- combinedWatermark = minimumOverAllOutputs;
- return true;
- }
-
- return false;
- }
从上述分析我们可以明晰以下概念: