在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:

Event Time(事件时间):是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。
Ingestion Time(摄入时间):是数据进入Flink的时间。
Processing Time(处理时间):是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。
例如电影《星球大战》的例子:

时间时间为上面的1-5,处理时间为下面的年份。
再例如,一条日志进入Flink的时间为2022-11-12 10:00:00.123,到达Window的系统时间为2022-11-12 10:00:01.234,日志的内容如下:
2022-11-02 18:37:15.624 INFO Fail over to rm2
对于业务来说,要统计1min内的故障日志个数,eventTime 时间是最有意义,因此要根据日志的生成时间进行统计。Flink 1.12 默认使用事件时间,无需设置。
在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:
但是使用事件时间会带来一个问题,我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的,如下图:
理想情况:希望12345依次按序到达。

实际情况:145先到达,23然后到达,这时如果一个0-5S的窗口,在接收到数据5时就不能够关闭窗口,因为在其后面还有数据。

乱序事件的影响:
那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。
使用水位线需要考虑以下问题:
下述文字抽象度较高,如果难以理解,请看关于水位线的测试,然后再回来理解这段文字。
有序流的Watermarker如下图所示(Watermark设置为0):

乱序流的Watermarker如下图所示(Watermark设置为2):
当Flink接收到数据时,会按照一定的规则去生成Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime - 最大延迟时长 -1ms,也就是说,Watermark是基于数据携带的时间戳生成的,一旦Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于event time是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是7s - 2s - 1ms = 4999ms,时间戳为12s的事件的Watermark是12s - 2s - 1ms = 9999ms,如果窗口1是0s~5s,窗口2是5s~10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。
Watermark 就是触发前一窗口的“关窗时间”,一旦触发关窗那么以当前时刻为准,在窗口范围内的所有数据都会收入窗中。
只要没有达到水位线,那么不管现实中的时间推进了多久都不会触发关窗,从这里可以看到,水位线只是在一定程度上解决了数据延迟问题,并不能全部解决乱序问题,那么Flink针对迟到数据也会进行处理,如何处理请见下文。
为每个用户的PV定义水位线:
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
// 输入的内容格式类似:'a 1'
.socketTextStream("localhost",9999