窗口,就是把无界的数据流,依据一定规则划分成一段一段的有界数据流来计算;
既然划分成有界数据段,通常都是为了"聚合";

Keyedwindow重要特性:任何一个窗口,都绑定在自己所属的key上;不同key的数据肯定不会划分到相同窗口中去!
滚动窗口

滑动窗口

会话窗口

没有固定的窗口长度,也没有固定的滑动步长,而是根据数据流中前后两个事件的时间间隔是否超出阈值(session gap)来划分;
KeyedWindows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
NonKeyedWindows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
代码模板示例
- package com.blok2;
-
- import org.apache.flink.api.common.RuntimeExecutionMode;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.AllWindowedStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.*;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
- import org.apache.flink.table.api.Tumble;
-
- /**
- * @Date: 22.12.4
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- * 窗口分配API代码模板示例
- */
- public class _04_Windows {
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 9999);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
- see.setRuntimeMode(RuntimeExecutionMode.BATCH);
- // 加载数据源
-
- DataStreamSource
ds = see.readTextFile("data/dayao.txt"); -
- SingleOutputStreamOperator
beans = ds.map(line -> { - String[] arr = line.split(",");
- User user = new User(Integer.parseInt(arr[0]), arr[1], arr[2], Integer.parseInt(arr[3]));
- return user;
- }).returns(User.class);
-
- // ============= keyBy前
- // 1 指定时间语义 滚动窗口
- beans.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)));
- beans.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)));
- // 2 指定时间语义 滑动窗口
- // 参数一 size 参数2 步进
- beans.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(2), Time.seconds(1)));
- beans.windowAll(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)));
- // 滚动计数
- beans.countWindowAll(100);
- // 滑动计数
- beans.countWindowAll(100, 20);
-
- //处理算子 ,keyBy
- KeyedStream
keyed = beans.keyBy(new KeySelector() { - @Override
- public String getKey(User user) throws Exception {
- return user.getCity();
- }
- });
- // ============= keyBy后
- // 处理时间 滚动
- // 处理时间 滑动
- // 事件时间 滚动
- // 事件时间 滑动
- keyed.window(TumblingProcessingTimeWindows.of(Time.seconds(2))) ;
- keyed.window(TumblingEventTimeWindows.of(Time.seconds(2))) ;
- keyed.window(SlidingEventTimeWindows.of(Time.seconds(10) , Time.seconds(2)));// 窗口大小 步进
- keyed.window(SlidingProcessingTimeWindows.of(Time.seconds(10) , Time.seconds(2))) ;
-
- keyed.countWindow(10) ; // 计数滚动
- keyed.countWindow(10 , 2) ; // 计数滑动
-
- // 开窗口以后 触发窗口内的数据的聚合操作
-
- // beans.print() ;
- see.execute();
- }
- }