目录
4.3、内置Watermark生成器 - 有序流水位线生成器
4.4、内置Watermark生成器 - 乱序流水位线生成器
6.1、测试用例 - 不设置 withIdleness 超时时间
6.2、测试用例 - 设置 withIdleness 超时时间
开发语言:java1.8
Flink版本:1.17
官网链接:官网链接
Flink中水位线是一条特殊的数据(long timestamp)
它会以时间戳的形式作为一条标识数据插入到数据流中
使用事件时间(EventTime)做流式计算任务时,需要根据事件时间生成水位线(Watermark)
通过水位线来触发窗口计算,水位线作为衡量事件时间(EventTime)进展的标识
设计水位线主要是为了解决实时流中数据乱序和迟到的问题
思考:什么原因造成了数据流的乱序呢?
如今数据采集、数据传输大多都在分布式系统中完成
各个机器节点因为网络和自身性能的原因 导致了数据的乱序和迟到
Flink中支持在 数据源和普通DataStream上添加水位线生成策略(WatermarkStrategy)
标记 Watermark 生成器特点:
每条数据到来后,都会为其生成一条 Watermark
适用场景:
数据量小且数据有序
代码示例:
Step1:自定义 标记水位线生成器 实现类
- // 自定义 标记水位线生成器 实现类
- public class PeriodWatermarkGenerator
implements WatermarkGenerator { -
- // 每进入一条数据,都会调用一次 onEvent 方法
- @Override
- /*
- * 参数说明:
- * @event : 进入到该方法的事件数据
- * @eventTimestamp : 时间戳提取器提取的时间戳
- * */
- public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
- //发射水位线
- output.emitWatermark(new Watermark(eventTimestamp));
- }
-
- // 不需要实现
- @Override
- public void onPeriodicEmit(WatermarkOutput output) {
- }
- }
Step2:自定义 标记性水位线生成策略 实现类
- // TODO 自定义 标记性水位线生成策略
- public class PeriodWatermarkStrategy implements WatermarkStrategy
> { - // TODO 实例化一个 事件时间提取器
- @Override
- public TimestampAssigner
> createTimestampAssigner(TimestampAssignerSupplier.Context context) { - TimestampAssigner
> timestampAssigner = new TimestampAssigner>() { -
- @Override
- public long extractTimestamp(Tuple2
element, long recordTimestamp) { - return element.f1;
- }
- };
- return timestampAssigner;
- }
-
- // TODO 实例化一个 watermark 生成器
- @Override
- public WatermarkGenerator
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { - return new PeriodWatermarkGenerator<>();
- }
- }
Step3:使用 标记性水位线生成策略
- // TODO 使用 自定义标记 Watermark 生成器
- public class UserPeriodWatermarkStrategy {
- public static void main(String[] args) throws Exception {
- // 1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
- SingleOutputStreamOperator
> sourceDataStream = env.socketTextStream("localhost", 9999) - .map(new MapFunction
>() { - @Override
- public Tuple2 map(String value) throws Exception {
- return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
- }
- }
- );
-
- // 3.为 DataStream 添加水位线生成策略 (使用 自定义WatermarkStrategy 实现类)
- SingleOutputStreamOperator
> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(new PeriodWatermarkStrategy()); -
- // 4.通过 processFunction实例 查看生成的水位线
- SingleOutputStreamOperator
process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction()); - process.print();
-
- // 5.触发程序执行
- env.execute();
- }
- }
查看运行结果:
标记 Watermark 生成器特点:
基于处理时间,周期性生成 Watermark
适用场景:
数据量大且可能存在一定程度数据延迟(乱序)
代码示例:
Step1:自定义 周期性水位线生成器 实现类
- // 自定义 周期性水位线生成器
- public class PunctuatedWatermarkGenerator
implements WatermarkGenerator { - // 设置变量,用来保存 当前最大的事件时间
- private long currentMaxTimestamp;
- // 设置变量,指定最大的乱序时间(等待时间)
- private final long maxOutOfOrderness = 0000; // 3 秒
-
- @Override
- public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
- // 只更新当前最大时间戳,不再发生水位线
- if (currentMaxTimestamp < eventTimestamp) currentMaxTimestamp = eventTimestamp;
- }
-
- // 周期性 生成水位线
- // 每个 setAutoWatermarkInterval 时间,调用一次该方法
- @Override
- public void onPeriodicEmit(WatermarkOutput output) {
- // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
- output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
- }
- }
Setp2:自定义 周期性水位线生成策略 实现类
- // 自定义 周期性水位线生成策略
- public class PunctuatedWatermarkStrategy implements WatermarkStrategy
> { - // TODO 实例化一个 事件时间提取器
- @Override
- public TimestampAssigner
> createTimestampAssigner(TimestampAssignerSupplier.Context context) { - TimestampAssigner
> timestampAssigner = new TimestampAssigner>() { -
- @Override
- public long extractTimestamp(Tuple2
element, long recordTimestamp) { - return element.f1;
- }
- };
-
- return timestampAssigner;
- }
-
- // TODO 实例化一个 watermark 生成器
- @Override
- public WatermarkGenerator
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { - return new PunctuatedWatermarkGenerator<>();
- }
-
- }
Step3:周期性水位线生成策略
- // TODO 使用 自定义周期性 Watermark 生成器
- public class UserPunctuatedWatermarkStrategy {
- public static void main(String[] args) throws Exception {
- // 1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
- env.getConfig().setAutoWatermarkInterval(3 * 1000L);
-
- // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
- SingleOutputStreamOperator
> ds = env.socketTextStream("localhost", 9999) - .map(new MapFunction
>() { - @Override
- public Tuple2 map(String value) throws Exception {
- return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
- }
- }
- );
-
- // TODO 获取 WatermarkStrategy实例 (方式1:通过 WatermarkStrategy实现类获取)
- PunctuatedWatermarkStrategy punctuatedWatermarkStrategy = new PunctuatedWatermarkStrategy();
-
- // TODO 获取 WatermarkStrategy实例 (方式2:通过 WatermarkStrategy工具类获取) 推荐
- WatermarkStrategy
> punctuatedWatermarkStrategyByUtil = WatermarkStrategy.>forGenerator(context -> new PunctuatedWatermarkGenerator<>()) - .withTimestampAssigner((event, timestamp) -> event.f1);
-
- // 3.使用 自定义水位线策略实例 来提取时间戳&生成水位线
- SingleOutputStreamOperator
> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(punctuatedWatermarkStrategy); -
- // 4.通过 processFunction实例 查看生成的水位线
- SingleOutputStreamOperator
process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction()); - process.print();
-
- // 3.触发程序执行
- env.execute();
- }
- }
查看运行结果:
有序流水位线生成器特点:
基于处理时间,周期性生成 Watermark,最大乱序时间为0
适用场景:
大数量有序流
代码示例:
- // TODO 内置Watermark生成器 - 有序流水位线生成器
- public class UserForMonotonousTimestamps {
- public static void main(String[] args) throws Exception {
- // 1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
- env.getConfig().setAutoWatermarkInterval(3 * 1000L);
-
- // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
- SingleOutputStreamOperator
> sourceDataStream = env.socketTextStream("localhost", 9999) - .map(new MapFunction
>() { - @Override
- public Tuple2 map(String value) throws Exception {
- return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
- }
- }
- );
-
- // TODO 创建 内置水位线生成策略
- WatermarkStrategy
> watermarkStrategy = WatermarkStrategy.>forMonotonousTimestamps() - .withTimestampAssigner((element,recordTimestamp) -> element.f1);
-
- // 3.使用 内置水位线生成策略
- SingleOutputStreamOperator
> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(watermarkStrategy); -
- // 4.通过 processFunction实例 查看生成的水位线
- SingleOutputStreamOperator
process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction()); - process.print();
-
- // 3.触发程序执行
- env.execute();
- }
- }
查看运行结果:
乱序流水位线生成器特点:
基于处理时间,周期性生成 Watermark,可以这是最大乱序时间
适用场景:
大数量乱序流
代码示例:
- // TODO 内置Watermark生成器 - 乱序流水位线生成器
- public class UserForBoundedOutOfOrderness {
- public static void main(String[] args) throws Exception {
- // 1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)
- env.getConfig().setAutoWatermarkInterval(3 * 1000L);
-
- // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
- SingleOutputStreamOperator
> ds = env.socketTextStream("localhost", 9999) - .map(new MapFunction
>() { - @Override
- public Tuple2 map(String value) throws Exception {
- return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
- }
- }
- );
-
- // TODO 获取 WatermarkStrategy实例
- WatermarkStrategy
> watermarkStrategy = WatermarkStrategy - .
>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 设置最大乱序时间为1s - .withTimestampAssigner((element,recordTimestamp) -> element.f1);
-
- // 3.使用 内置水位线生成策略
- SingleOutputStreamOperator
> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(watermarkStrategy); -
- // 4.通过 processFunction实例 查看生成的水位线
- SingleOutputStreamOperator
process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction()); - process.print();
-
- // 3.触发程序执行
- env.execute();
- }
- }
查看运行结果:
- // 1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 2.创建 Source 对象
- Source source = DataGeneratorSource、KafkaSource...
-
- // 3.读取 source时添加水位线
- env
- .fromSource(source, WatermarkStrategy实例, "source name")
- .print()
- ;
-
- // 4.触发程序执行
- env.execute();
窗口什么时候创建?
当窗口内的第一条数据到达时
窗口什么时候触发计算?
当阈值水位线到达窗口时
下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值
测试代码:
- // TODO 测试水位线的传递
- public class TransmitWaterMark {
- public static void main(String[] args) throws Exception {
- // 1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
- DataStreamSource
source = env.socketTextStream("localhost", 9999); -
- source
- .partitionCustom(
- new Partitioner
() { - @Override
- public int partition(String key, int numPartitions) {
- if (key.equals("a")) {
- return 0;
- } else if (key.equals("b")) {
- return 1;
- } else {
- return 2;
- }
- }
- }, value -> value.split(",")[0]
- )
- .map(new MapFunction
>() { - @Override
- public Tuple2 map(String value) throws Exception {
- return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
- }
- })
- .assignTimestampsAndWatermarks(
- WatermarkStrategy
- //.
>forMonotonousTimestamps() - .
>forGenerator(new PeriodWatermarkStrategy()) - .withTimestampAssigner((element,recordTimestamp) -> element.f1)
- .withIdleness(Duration.ofSeconds(5)) //空闲等待5s
- )
- .process(new ShowProcessFunction()).setParallelism(1)
- .print();
-
- env.execute();
- }
- }
现象:如果上游某一个子任务一直没有数据更新,下游算子的水位线一直不会变化
现象:如果上游某一个子任务`在指定时间内`数据更新,下游算子的水位线将不受该子任务最小值的影响