目录
2.8 KeyedBroadcastProcessFunction
3.1 定时器(Timer)和定时服务(TimerService)
它是底层提炼的一个可以自定义处理逻辑的操作,被叫作“处理函数”(process function)。
在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。
内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。
- public abstract class ProcessFunction extends AbstractRichFunction {
-
- ...
- public abstract void processElement(I value, Context ctx, Collector
out) throws Exception; -
- public void onTimer(long timestamp, OnTimerContext ctx, Collector
out) throws Exception {} - ...
-
- }
用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值value,上下文ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器out来定义的。
通过几个参数的分析不难发现,ProcessFunction可以轻松实现flatMap、map、filter这样的基本转换功能;而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。
定时方法.onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的timestamp是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。
注意:在Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作。
最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。
对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。
开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。
同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。
合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。
广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。
按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。
只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。
定时服务与当前运行的环境有关。ProcessFunction的上下文(Context)中提供了.timerService()方法,可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口,包含以下六个方法:
- // 获取当前的处理时间
- long currentProcessingTime();
-
- // 获取当前的水位线(事件时间)
- long currentWatermark();
-
- // 注册处理时间定时器,当处理时间超过time时触发
- void registerProcessingTimeTimer(long time);
-
- // 注册事件时间定时器,当水位线超过time时触发
- void registerEventTimeTimer(long time);
-
- // 删除触发时间为time的处理时间定时器
- void deleteProcessingTimeTimer(long time);
-
- // 删除触发时间为time的处理时间定时器
- void deleteEventTimeTimer(long time);
TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。
- public class KeyedProcessTimerDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
-
- SingleOutputStreamOperator
sensorDS = env - .socketTextStream("hadoop102", 7777)
- .map(new WaterSensorMapFunction())
- .assignTimestampsAndWatermarks(
- WatermarkStrategy
- .
forBoundedOutOfOrderness(Duration.ofSeconds(3)) - .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
- );
-
-
- KeyedStream
sensorKS = sensorDS.keyBy(sensor -> sensor.getId()); -
- // TODO Process:keyed
- SingleOutputStreamOperator
process = sensorKS.process( - new KeyedProcessFunction
() { - /**
- * 来一条数据调用一次
- * @param value
- * @param ctx
- * @param out
- * @throws Exception
- */
- @Override
- public void processElement(WaterSensor value, Context ctx, Collector
out) throws Exception { - //获取当前数据的key
- String currentKey = ctx.getCurrentKey();
-
- // TODO 1.定时器注册
- TimerService timerService = ctx.timerService();
-
- // 1、事件时间的案例
- Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间
- timerService.registerEventTimeTimer(5000L);
- System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");
-
- // 2、处理时间的案例
- // long currentTs = timerService.currentProcessingTime();
- // timerService.registerProcessingTimeTimer(currentTs + 5000L);
- // System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");
-
-
- // 3、获取 process的 当前watermark
- // long currentWatermark = timerService.currentWatermark();
- // System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);
-
-
-
- // 注册定时器: 处理时间、事件时间
- // timerService.registerProcessingTimeTimer();
- // timerService.registerEventTimeTimer();
- // 删除定时器: 处理时间、事件时间
- // timerService.deleteEventTimeTimer();
- // timerService.deleteProcessingTimeTimer();
-
- // 获取当前时间进展: 处理时间-当前系统时间, 事件时间-当前watermark
- // long currentTs = timerService.currentProcessingTime();
- // long wm = timerService.currentWatermark();
- }
-
-
- /**
- * TODO 2.时间进展到定时器注册的时间,调用该方法
- * @param timestamp 当前时间进展,就是定时器被触发时的时间
- * @param ctx 上下文
- * @param out 采集器
- * @throws Exception
- */
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector
out) throws Exception { - super.onTimer(timestamp, ctx, out);
- String currentKey = ctx.getCurrentKey();
-
- System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");
- }
- }
- );
-
- process.print();
-
- env.execute();
- }
- }
除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。
- stream.keyBy( t -> t.f0 )
- .window( TumblingEventTimeWindows.of(Time.seconds(10)) )
- .process(new MyProcessWindowFunction())
- public abstract class ProcessWindowFunction
extends Window> extends AbstractRichFunction { - ...
-
- public abstract void process(
- KEY key, Context context, Iterable
elements, Collector out) throws Exception; -
- public void clear(Context context) throws Exception {}
-
- public abstract class Context implements java.io.Serializable {...}
- }
ProcessWindowFunction依然是一个继承了AbstractRichFunction的抽象类,它有四个类型参数:
ProcessWindowFunction里面处理数据的核心方法.process()。方法包含四个参数。
可以明显看出,这里的参数不再是一个输入数据,而是窗口中所有数据的集合。而上下文context所包含的内容也跟其他处理函数有所差别:
- public abstract class Context implements java.io.Serializable {
-
- public abstract W window();
-
- public abstract long currentProcessingTime();
- public abstract long currentWatermark();
-
- public abstract KeyedStateStore windowState();
- public abstract KeyedStateStore globalState();
- public abstract
void output(OutputTag outputTag, X value) ; -
- }
除了可以通过.output()方法定义侧输出流不变外,其他部分都有所变化。这里不再持有TimerService对象,只能通过currentProcessingTime()和currentWatermark()来获取当前时间,所以失去了设置定时器的功能;另外由于当前不是只处理一个数据,所以也不再提供.timestamp()方法。与此同时,也增加了一些获取其他信息的方法:比如可以通过.window()直接获取到当前的窗口对象,也可以通过.windowState()和.globalState()获取到当前自定义的窗口状态和全局状态。注意这里的“窗口状态”是自定义的,不包括窗口本身已经有的状态,针对当前key、当前窗口有效;而“全局状态”同样是自定义的状态,针对当前key的所有窗口有效。
所以我们会发现,ProcessWindowFunction中除了.process()方法外,并没有.onTimer()方法,而是多出了一个.clear()方法。从名字就可以看出,这主要是方便我们进行窗口的清理工作。如果我们自定义了窗口状态,那么必须在.clear()方法中进行显式地清除,避免内存溢出。
至于另一种窗口处理函数ProcessAllWindowFunction,它的用法非常类似。区别在于它基于的是AllWindowedStream,相当于对没有keyBy的数据流直接开窗并调用.process()方法:
- stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) )
- .process(new MyProcessAllWindowFunction())