目录
(1)按键分区(Keyed)和非按键分区(Non-Keyed)
*2)全窗口函数(full window functions)
2.按键分区处理函数(KeyedProcessFunction)
(1)定时器(Timer)和定时服务(TimerService)
(1)方法一:ProcessAllWindowFunction
在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。
Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。
Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。
到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。
*1)时间窗口
一定时间作为一个窗口
*2)计数窗口
达到多少数量作为一个窗口
*1)滚动窗口
以一个固定时间为窗口,第一个窗口结束的时间就是下一个窗口开始的时间。
*2)滑动窗口
窗口大小 + 步长。
如果步长 = 窗口大小,其实就是滚动窗口的情况。
步长 > 窗口大小,会有数据被漏掉。
步长 < 窗口大小,窗口会有重叠
*3)会话窗口
基于会话对数据分组
*4)全局窗口
全局有效,没有结束时间
定义窗口前,需要确认数据流是基于keyBy还是没有keyBy的。
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。
stream.keyBy(...).window(...)
窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。
对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。
stream.windowAll(...)
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)
.window()方法需要传入一个窗口分配器,它指明了窗口的类型。
.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。
窗口分配器指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。
窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。
- package com.atguigu.window;
-
- import com.atguigu.bean.WaterSensor;
- import com.atguigu.functions.WaterSensorMapFunction;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.datastream.WindowedStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class WindowApiDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
-
- SingleOutputStreamOperator<WaterSensor> sensorDS = env
- .socketTextStream("hadoop102", 7777)
- .map(new WaterSensorMapFunction());
-
-
- KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
-
-
-
-
- // TODO 1. 指定 窗口分配器: 指定 用 哪一种窗口 --- 时间 or 计数? 滚动、滑动、会话?
- // 1.1 没有keyby的窗口: 窗口内的 所有数据 进入同一个 子任务,并行度只能为1
- // sensorDS.windowAll()
- // 1.2 有keyby的窗口: 每个key上都定义了一组窗口,各自独立地进行统计计算
-
- // 基于时间的
- // sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口,窗口长度10s
- // sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))) // 滑动窗口,窗口长度10s,滑动步长2s
- // sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) // 会话窗口,超时间隔5s
- // sensorKS.window(GlobalWindows.create()) // 全局窗口,计数窗口的底层就是用的这个,需要自定义的时候才会用
-
- // 基于计数的
- // sensorKS.countWindow(5) // 滚动窗口,窗口长度=5个元素
- // sensorKS.countWindow(5,2) // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素
-
- // TODO 2. 指定 窗口函数 : 窗口内数据的 计算逻辑
- WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
-
- // 增量聚合: 来一条数据,计算一条数据,窗口触发的时候输出计算结果
- // sensorWS
- // .reduce()
- // .aggregate(, )
-
- // 全窗口函数:数据来了不计算,存起来,窗口触发的时候,计算并输出结果
- // sensorWS.process()
-
- env.execute();
- }
- }
- package com.atguigu.window;
-
- import com.atguigu.bean.WaterSensor;
- import com.atguigu.functions.WaterSensorMapFunction;
- import org.apache.flink.api.common.functions.ReduceFunction;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.Single