• 7、如何使用Flink中的窗口(Window算子)


    目录

    1、如何理解 Flink中的窗口(window)

    2、Flink中窗口的类型

    2.1 根据上游DataStream类型分类

    2.2 根据驱动类型分类

    2.3 根据进入到窗口数据的分发规则分类

    3、怎样使用 Flink中的 Window算子

    4、怎样使用 Flink中的 Window Assigners

    4.1、基于处理时间的滑动窗口

    4.2、基于处理时间的滚动窗口

    4.3、基于处理时间的会话窗口

    4.4、基于事件时间的滑动窗口

    4.5、基于事件时间的滚动窗口

    4.6、基于事件时间的会话窗口

    4.7、计数窗口

    5、怎样使用 Flink中的 Window Funcation

    5.1、ReduceFunction

    5.2、AggregateFunction

    5.3、ProcessWindowFunction

    5.4、增量聚合的 ProcessWindowFunction

    6、Flink中的 Window的生命周期

    6.1、触发器 - Triggers

    6.2、移除器 - Evictors

    7、对迟到数据的处理

    7.1、生成水位线时,设置最大乱序时间

    7.2、设置窗口延迟关闭 - allowedLateness

    7.3、使用侧输出流获取迟到的数据 - sideOutputLateData


    1、如何理解 Flink中的窗口(window)

            Flink中的窗口好像一个桶,可以根据不同的时间语义,将无界流中的数据分配到指定的桶中去,再通过 水位线或者处理时间 触发对桶中的数据进行计算

            其目的就是为了将无限的数据 根据指定的规则进行切分成有限的数据进行计算


    2、Flink中窗口的类型

    2.1 根据上游DataStream类型分类

    按键分区窗口(Keyed Windows) :

             基于KeyedStream做窗口操作,窗口计算会在多个并行子任务上同时执行,相同的key的数据会进入到同一个窗口中去。

    非按键分区-Non-Keyed Windows :

            基于DataStream做窗口操作,流上的数据会进入同一窗口中,只能有一个Task处理(不推荐这种方式)

    2.2 根据驱动类型分类

    时间窗口(Time Window):

            通过指定的时间语义的时间点来定义窗口的开始和结束,流中的数据被分配到哪个窗口中,也由数据上的时间标识来决定,当接收到带有结束时间标识的数据时,将触发窗口计算,并销毁窗口

    计数窗口(Count Window):

           窗口的大小由数据个数来限制,当窗口内接收的数据个数到达窗口大小时,将触发窗口计算,并销毁窗口

     2.3 根据进入到窗口数据的分发规则分类

    滚动窗口(Tumbling Windows):

            滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)

    滑动窗口(Sliding Windows):

            滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

    比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)

    会话窗口(Session Windows): 

             与滚动窗口滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

    全局窗口(Global Windows): 

    全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。


    3、怎样使用 Flink中的 Window算子

    Keyed Windows API :

    1. stream
    2. .keyBy(...) <- 仅 keyed 窗口需要
    3. .window(...) <- 必填项:"assigner" 指定窗口类型
    4. [.trigger(...)] <- 可选项:"trigger" 指定触发器
    5. [.evictor(...)] <- 可选项:"evictor" 指定移除器
    6. [.allowedLateness(...)] <- 可选项:"lateness" 指定窗口延迟关闭时间
    7. [.sideOutputLateData(...)] <- 可选项:"output tag"
    8. .reduce/aggregate/apply() <- 必填项:"function" 指定窗口聚合函数
    9. [.getSideOutput(...)] <- 可选项:"output tag" 指定侧输出流(用来接收迟到数据)

    Non-Keyed Windows API :

    1. stream
    2. .windowAll(...) <- 必填项:"assigner" 指定窗口类型
    3. [.trigger(...)] <- 可选项:"trigger" 指定触发器
    4. [.evictor(...)] <- 可选项:"evictor" 指定移除器
    5. [.allowedLateness(...)] <- 可选项:"lateness" 指定窗口延迟关闭时间
    6. [.sideOutputLateData(...)] <- 可选项:"output tag" 指定侧输出流(用来接收迟到数据)
    7. .reduce/aggregate/apply() <- 必填项:"function" 指定窗口聚合函数
    8. [.getSideOutput(...)] <- 可选项:"output tag"

    4、怎样使用 Flink中的 Window Assigners

    功能说明:

            通过stream.window(WindowAssigner) 来指定 窗口的类型

    4.1、基于处理时间的滑动窗口

    使用场景:

            每隔x秒统计近y秒内的数据(y>x)

    代码示例:

    1. /*
    2. * TODO 基于处理时间的滑动窗口
    3. * 每2秒计算最近10秒内的数据
    4. * */
    5. public class SlidingProcessingTimeWindow {
    6. public static void main(String[] args) throws Exception {
    7. // 1.获取执行环境
    8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(3);
    10. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    11. // TODO Window:KeyedStream → WindowedStream
    12. Window(env);
    13. // TODO WindowAll:DataStream → AllWindowedStream
    14. //WindowAll(env);
    15. // 3.触发程序执行
    16. env.execute();
    17. }
    18. // TODO Keyed Windows
    19. private static void Window(StreamExecutionEnvironment env) {
    20. env
    21. .socketTextStream("localhost", 9999)
    22. .map(new MapFunction>() {
    23. @Override
    24. public Tuple2 map(String value) throws Exception {
    25. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    26. }
    27. }
    28. )
    29. .keyBy(value -> value.f0)
    30. // 滑动窗口,窗口长度10s,滑动步长2s
    31. .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)))
    32. .process(new ShowProcessWindowFunction())
    33. .print()
    34. ;
    35. }
    36. // TODO Non-Keyed Windows
    37. private static void WindowAll(StreamExecutionEnvironment env) {
    38. env
    39. .socketTextStream("localhost", 9999)
    40. .map(new MapFunction>() {
    41. @Override
    42. public Tuple2 map(String value) throws Exception {
    43. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    44. }
    45. }
    46. )
    47. // 滑动窗口,窗口长度5s,滑动步长2s
    48. .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2)))
    49. .process(new ShowProcessAllWindowFunction())
    50. .print()
    51. ;
    52. }
    53. }

    4.2、基于处理时间的滚动窗口

    使用场景:

            计算固定时间段内的数据

    代码示例:

    1. /*
    2. * TODO 基于处理时间的滚动窗口
    3. * 计算每小时内的用户访问数
    4. * */
    5. public class TumblingProcessingTimeWindow {
    6. public static void main(String[] args) throws Exception {
    7. // 1.获取执行环境
    8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(3);
    10. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    11. // TODO Window:KeyedStream → WindowedStream
    12. //Window(env);
    13. // TODO WindowAll:DataStream → AllWindowedStream
    14. WindowAll(env);
    15. // 3.触发程序执行
    16. env.execute();
    17. }
    18. // TODO Keyed Windows
    19. private static void Window(StreamExecutionEnvironment env) {
    20. env
    21. .socketTextStream("localhost", 9999)
    22. .map(new MapFunction>() {
    23. @Override
    24. public Tuple2 map(String value) throws Exception {
    25. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    26. }
    27. }
    28. )
    29. .keyBy(value -> value.f0)
    30. // 滚动窗口,窗口长度5s
    31. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    32. .process(new ShowProcessWindowFunction())
    33. .print()
    34. ;
    35. }
    36. // TODO Non-Keyed Windows
    37. private static void WindowAll(StreamExecutionEnvironment env) {
    38. env
    39. .socketTextStream("localhost", 9999)
    40. .map(new MapFunction>() {
    41. @Override
    42. public Tuple2 map(String value) throws Exception {
    43. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    44. }
    45. }
    46. )
    47. // 滚动窗口,窗口长度5s
    48. .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    49. .process(new ShowProcessAllWindowFunction())
    50. .print()
    51. ;
    52. }
    53. }

    4.3、基于处理时间的会话窗口

    使用场景:

            计算指定时间间隔内的数据

    代码示例:

    1. /*
    2. * TODO 基于处理时间的会话窗口
    3. * 相邻两个元素的处理时间间隔 大于指定会话周期 触发窗口计算
    4. * */
    5. public class ProcessingTimeSessionWindow {
    6. public static void main(String[] args) throws Exception {
    7. // 1.获取执行环境
    8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(3);
    10. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    11. // TODO Window:KeyedStream → WindowedStream
    12. //Window(env);
    13. // TODO WindowAll:DataStream → AllWindowedStream
    14. WindowAll(env);
    15. // 3.触发程序执行
    16. env.execute();
    17. }
    18. // TODO Keyed Windows
    19. private static void Window(StreamExecutionEnvironment env) {
    20. env
    21. .socketTextStream("localhost", 9999)
    22. .map(new MapFunction>() {
    23. @Override
    24. public Tuple2 map(String value) throws Exception {
    25. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    26. }
    27. }
    28. )
    29. .keyBy(value -> value.f0)
    30. // 会话窗口,超时间隔5s
    31. .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
    32. .process(new ShowProcessWindowFunction())
    33. .print()
    34. ;
    35. }
    36. // TODO Non-Keyed Windows
    37. private static void WindowAll(StreamExecutionEnvironment env) {
    38. env
    39. .socketTextStream("localhost", 9999)
    40. .map(new MapFunction>() {
    41. @Override
    42. public Tuple2 map(String value) throws Exception {
    43. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    44. }
    45. }
    46. )
    47. // 会话窗口,超时间隔5s
    48. .windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
    49. .process(new ShowProcessAllWindowFunction())
    50. .print()
    51. ;
    52. }
    53. }

    4.4、基于事件时间的滑动窗口

    代码示例:

    1. /*
    2. * TODO 基于事件时间的滑动窗口
    3. * 每2秒计算一次最近10秒内的数据
    4. * */
    5. public class SlidingEventTimeWindow {
    6. public static void main(String[] args) throws Exception {
    7. // 1.获取执行环境
    8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(3);
    10. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    11. // TODO Window:KeyedStream → WindowedStream
    12. Window(env);
    13. // TODO WindowAll:DataStream → AllWindowedStream
    14. //WindowAll(env);
    15. // 3.触发程序执行
    16. env.execute();
    17. }
    18. // TODO Keyed Windows
    19. private static void Window(StreamExecutionEnvironment env) {
    20. env
    21. .socketTextStream("localhost", 9999)
    22. .map(new MapFunction>() {
    23. @Override
    24. public Tuple2 map(String value) throws Exception {
    25. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    26. }
    27. }
    28. )
    29. .assignTimestampsAndWatermarks(
    30. WatermarkStrategy
    31. .>forMonotonousTimestamps()
    32. .withTimestampAssigner((element, recordTimestamp) -> element.f1)
    33. )
    34. .keyBy(value -> value.f0)
    35. // 滚动窗口,窗口长度10s,滑动步长2s
    36. .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
    37. .process(new ShowProcessWindowFunction())
    38. .print()
    39. ;
    40. }
    41. // TODO Non-Keyed Windows
    42. private static void WindowAll(StreamExecutionEnvironment env) {
    43. env
    44. .socketTextStream("localhost", 9999)
    45. .map(new MapFunction>() {
    46. @Override
    47. public Tuple2 map(String value) throws Exception {
    48. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    49. }
    50. }
    51. )
    52. .assignTimestampsAndWatermarks(
    53. WatermarkStrategy
    54. //.>forMonotonousTimestamps()
    55. .>forGenerator(new PeriodWatermarkStrategy())
    56. .withTimestampAssigner(
    57. (Tuple2 element, long recordTimestamp) -> {
    58. //System.out.println("Step1:extractTimestamp-从事件数据中提取时间戳");
    59. return element.f1;
    60. }
    61. )
    62. //.withIdleness(Duration.ofSeconds(5)) //空闲等待5s
    63. )
    64. .keyBy(value -> value.f0)
    65. // 滑动窗口,窗口长度10s,滑动步长2s
    66. .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(2)))
    67. .process(new ShowProcessAllWindowFunction())
    68. .print()
    69. ;
    70. }
    71. }

    4.5、基于事件时间的滚动窗口

    代码示例:

    1. /*
    2. * TODO 基于事件时间的滚动窗口
    3. * 计算每个窗口周期内的数据(计算每小时内的用户访问数)
    4. * */
    5. public class TumblingEventTimeWindow {
    6. public static void main(String[] args) throws Exception {
    7. // 1.获取执行环境
    8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(3);
    10. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    11. // TODO Window:KeyedStream → WindowedStream
    12. //Window(env);
    13. // TODO WindowAll:DataStream → AllWindowedStream
    14. WindowAll(env);
    15. // 3.触发程序执行
    16. env.execute();
    17. }
    18. // TODO Keyed Windows
    19. private static void Window(StreamExecutionEnvironment env) {
    20. env
    21. .socketTextStream("localhost", 9999)
    22. .map(new MapFunction>() {
    23. @Override
    24. public Tuple2 map(String value) throws Exception {
    25. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    26. }
    27. }
    28. )
    29. .assignTimestampsAndWatermarks(
    30. WatermarkStrategy
    31. .>forMonotonousTimestamps()
    32. .withTimestampAssigner((element, recordTimestamp) -> element.f1)
    33. )
    34. .keyBy(value -> value.f0)
    35. // 滚动窗口,窗口长度5s
    36. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    37. .process(new ShowProcessWindowFunction())
    38. .print()
    39. ;
    40. }
    41. // TODO Non-Keyed Windows
    42. private static void WindowAll(StreamExecutionEnvironment env) {
    43. env
    44. .socketTextStream("localhost", 9999)
    45. .map(new MapFunction>() {
    46. @Override
    47. public Tuple2 map(String value) throws Exception {
    48. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    49. }
    50. }
    51. )
    52. .assignTimestampsAndWatermarks(
    53. WatermarkStrategy
    54. .>forMonotonousTimestamps()
    55. .withTimestampAssigner((element, recordTimestamp) -> element.f1)
    56. )
    57. .keyBy(value -> value.f0)
    58. // 滚动窗口,窗口长度5s
    59. .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    60. .process(new ShowProcessAllWindowFunction())
    61. .print()
    62. ;
    63. }
    64. }

    4.6、基于事件时间的会话窗口

    代码示例:

    1. /*
    2. * TODO 基于会话时间的会话窗口
    3. * 相邻两个元素的处理时间间隔 大于指定会话周期 触发窗口计算
    4. * */
    5. public class EventTimeSessionWindow {
    6. public static void main(String[] args) throws Exception {
    7. // 1.获取执行环境
    8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. env.setParallelism(1);
    10. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    11. // TODO Window:KeyedStream → WindowedStream
    12. // Window(env);
    13. // TODO WindowAll:DataStream → AllWindowedStream
    14. WindowAll(env);
    15. // 3.触发程序执行
    16. env.execute();
    17. }
    18. // TODO Keyed Windows
    19. private static void Window(StreamExecutionEnvironment env) {
    20. env
    21. .socketTextStream("localhost", 9999)
    22. .map(new MapFunction>() {
    23. @Override
    24. public Tuple2 map(String value) throws Exception {
    25. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    26. }
    27. }
    28. )
    29. .assignTimestampsAndWatermarks(
    30. WatermarkStrategy
    31. .>forMonotonousTimestamps()
    32. .withTimestampAssigner((element, recordTimestamp) -> element.f1)
    33. )
    34. .keyBy(value -> value.f0)
    35. // 会话窗口,超时间隔5s
    36. .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
    37. .process(new ShowProcessWindowFunction())
    38. .print()
    39. ;
    40. }
    41. // TODO Non-Keyed Windows
    42. private static void WindowAll(StreamExecutionEnvironment env) {
    43. env
    44. .socketTextStream("localhost", 9999)
    45. .map(new MapFunction>() {
    46. @Override
    47. public Tuple2 map(String value) throws Exception {
    48. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    49. }
    50. }
    51. )
    52. .assignTimestampsAndWatermarks(
    53. WatermarkStrategy
    54. .>forMonotonousTimestamps()
    55. .withTimestampAssigner((element, recordTimestamp) -> element.f1)
    56. )
    57. // 会话窗口,超时间隔5s
    58. .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)))
    59. .process(new ShowProcessAllWindowFunction())
    60. .print()
    61. ;
    62. }
    63. }

    4.7、计数窗口

    代码示例:

    1. // TODO 计数窗口
    2. public class countWindow {
    3. public static void main(String[] args) throws Exception {
    4. // 1.获取执行环境
    5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    6. env.setParallelism(3);
    7. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    8. // TODO Window:KeyedStream → WindowedStream
    9. // Window(env);
    10. // TODO countWindowAll:DataStream → AllWindowedStream
    11. countWindowAll(env);
    12. // 3.触发程序执行
    13. env.execute();
    14. }
    15. // TODO Keyed Windows
    16. private static void Window(StreamExecutionEnvironment env) {
    17. env
    18. .socketTextStream("localhost", 9999)
    19. .map(new MapFunction>() {
    20. @Override
    21. public Tuple2 map(String value) throws Exception {
    22. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    23. }
    24. }
    25. )
    26. .keyBy(value -> value.f0)
    27. // TODO 滚动窗口,窗口长度=5个元素
    28. //.countWindow(5)
    29. // TODO 滑动窗口,窗口长度=5个元素,滑动步长=2个元素 (每当进入两个数据,就统计下窗口内最五个数据)
    30. .countWindow(5,2)
    31. .process(
    32. new ProcessWindowFunction, String, String, GlobalWindow>() {
    33. @Override
    34. public void process(String s, ProcessWindowFunction, String, String, GlobalWindow>.Context context, Iterable> elements, Collector out) throws Exception {
    35. // 当前水位线
    36. long watermark = context.currentWatermark();
    37. // 当前处理时间
    38. long processingTime = context.currentProcessingTime();
    39. // 窗口开始时间
    40. // 窗口结束时间
    41. // 计算窗口内数据数量
    42. long count = elements.spliterator().estimateSize();
    43. String record = "key=" + s
    44. + " 包含" + count + "条数据===>" + elements.toString()
    45. + " 当前Watermark:" + watermark
    46. + " 当前processingTime:" + processingTime;
    47. out.collect(record);
    48. }
    49. }
    50. )
    51. .print()
    52. ;
    53. }
    54. // TODO Non-Keyed Windows
    55. private static void countWindowAll(StreamExecutionEnvironment env) {
    56. env
    57. .socketTextStream("localhost", 9999)
    58. .map(new MapFunction>() {
    59. @Override
    60. public Tuple2 map(String value) throws Exception {
    61. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    62. }
    63. }
    64. )
    65. // TODO 滚动窗口,窗口长度=5个元素
    66. .countWindowAll(5)
    67. // TODO 滑动窗口,窗口长度=5个元素,滑动步长=2个元素 (每当进入两个数据,就统计下窗口内最近的五个数据)
    68. //.countWindowAll(5,2)
    69. .process(
    70. new ProcessAllWindowFunction, String, GlobalWindow>() {
    71. @Override
    72. public void process(ProcessAllWindowFunction, String, GlobalWindow>.Context context, Iterable> elements, Collector out) throws Exception {
    73. // 当前水位线
    74. // 当前处理时间
    75. // 窗口开始时间
    76. // 窗口结束时间
    77. // 计算窗口内数据数量
    78. long count = elements.spliterator().estimateSize();
    79. String record = "窗口包含" + count + "条数据===>" + elements.toString();
    80. out.collect(record);
    81. }
    82. }
    83. )
    84. .print()
    85. ;
    86. }
    87. }

    5、怎样使用 Flink中的 Window Funcation

    窗口函数的作用:

            窗口函数定义了当窗口触发后,对窗口内数据的计算逻辑

    窗口函数的分类:

    5.1、ReduceFunction

    函数功能:

            将两条数据合并成一条数据,输入和输出的数据类型必须相同

    代码示例:

    1. /*
    2. * TODO 增量聚合函数:ReduceFunction
    3. * 特点:
    4. * 1.增量聚合:进入窗口一条数据,就会被计算一次(调用reduce方法),但是不会立刻输出(只会更新状态)
    5. * 2.第一条数据进入窗口后,不会调用 reduce 方法
    6. * 3.数据类型不能改变:输入数据类型 = 输出数据类型 = `中间累加器数据类型`
    7. * 4.窗口触发计算时,才会输出窗口的计算结果
    8. * */
    9. public class ReduceFunctions {
    10. public static void main(String[] args) throws Exception {
    11. // 1.获取执行环境
    12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    13. env.setParallelism(3);
    14. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    15. env
    16. .socketTextStream("localhost", 9999)
    17. .map(new MapFunction>() {
    18. @Override
    19. public Tuple2 map(String value) throws Exception {
    20. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    21. }
    22. }
    23. )
    24. .keyBy(value -> value.f0)
    25. // TODO 指定窗口类型:滚动计数窗口,窗口长度=5个元素
    26. .countWindow(5)
    27. // TODO 对窗口内的元素求和
    28. .reduce(
    29. new ReduceFunction>() {
    30. /**
    31. * @param value1 参与聚合的第一个值,也就是累加器的值
    32. * @param value2 参与聚合的第二个值,也就是新进入窗口的时间数据
    33. * @return
    34. * @throws Exception
    35. */
    36. @Override
    37. public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
    38. System.out.println("触发计算:" + value1 + " ," + value2);
    39. long sum = value1.f1 + value2.f1;
    40. return new Tuple2<>(value1.f0, sum);
    41. }
    42. }
    43. )
    44. .print()
    45. ;
    46. // 3.触发程序执行
    47. env.execute();
    48. }
    49. }

    5.2、AggregateFunction

    函数功能:

            将两条数据合并成一条数据,输入和输出的数据类型可以不同

    代码示例:

    1. /*
    2. * TODO 增量聚合函数:AggregateFunction
    3. * 特点:
    4. * 1.增量聚合:进入窗口一条数据,就会被计算一次(调用add方法),但是不会立刻输出(只会更累加器的状态)
    5. * 2.窗口内的第一条数据进入后,会创建窗口,创建累加器并初始化
    6. * 3.窗口触发计算时,才会输出窗口的计算结果(调用getResult方法)
    7. * 4.数据类型可以不同:输入数据类型、输出数据类型、累加器数据类型
    8. * 泛型参数:
    9. * AggregateFunction
    10. * @IN : 输入数据类型
    11. * @ACC : 累加器数据类型
    12. * @OUT : 输出数据类型
    13. *
    14. * */
    15. public class AggregateFunctions {
    16. public static void main(String[] args) throws Exception {
    17. // 1.获取执行环境
    18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    19. env.setParallelism(3);
    20. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    21. env
    22. .socketTextStream("localhost", 9999)
    23. .map(new MapFunction>() {
    24. @Override
    25. public Tuple2 map(String value) throws Exception {
    26. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    27. }
    28. }
    29. )
    30. .keyBy(value -> value.f0)
    31. // TODO 指定窗口类型:滚动计数窗口,窗口长度=5个元素
    32. .countWindow(5)
    33. // TODO 对窗口内的元素求和
    34. .aggregate(
    35. new AggregateFunction, Integer, String>() {
    36. /**
    37. * 创建累加器,并对累加器做初始化操作
    38. * @return
    39. */
    40. @Override
    41. public Integer createAccumulator() {
    42. System.out.println("创建累加器");
    43. return 0;
    44. }
    45. /**
    46. * 事件数据与累加器的Merge逻辑,用来更新累加器状态
    47. * 进入一条数据,调用一次
    48. * @param value The value to add
    49. * @param accumulator The accumulator to add the value to
    50. * @return
    51. */
    52. @Override
    53. public Integer add(Tuple2 value, Integer accumulator) {
    54. System.out.println("调用add方法,value=" + value + " 当前累加器:" + accumulator);
    55. return accumulator + 1;
    56. }
    57. /**
    58. * 获取累加器的状态值,窗口触发时调用
    59. * @param accumulator The accumulator of the aggregation
    60. * @return
    61. */
    62. @Override
    63. public String getResult(Integer accumulator) {
    64. System.out.println("调用getResult 方法");
    65. return accumulator.toString();
    66. }
    67. /**
    68. * 合并窗口逻辑(只有sessionWindow才会用的)
    69. * @param a An accumulator to merge
    70. * @param b Another accumulator to merge
    71. * @return
    72. */
    73. @Override
    74. public Integer merge(Integer a, Integer b) {
    75. System.out.println("调用merge方法");
    76. return null;
    77. }
    78. }
    79. )
    80. .print()
    81. ;
    82. // 3.触发程序执行
    83. env.execute();
    84. }
    85. }

    5.3、ProcessWindowFunction

    函数功能:

            将窗口内所有的数据缓存到Iterable,在窗口触发后对所有数据进行计算

    代码示例:

    1. /*
    2. * TODO 全窗口函数:ProcessWindowFunction
    3. * 特点:
    4. * 1.窗口触发时才会调用一次process方法,对窗口内的数据统一计算
    5. * 泛型参数说明:
    6. * ProcessWindowFunction
    7. * @IN : 输入数据类型
    8. * @OUT : 输出数据类型
    9. * @KEY : key的数据类型
    10. * @W : window类型(时间窗口、计数窗口)
    11. * 上下文对象说明:
    12. * 时间信息:currentProcessingTime()、currentWatermark()
    13. * 状态信息:windowState()、globalState()
    14. * 侧输出流:
    15. * 窗口信息:window()
    16. * 重点说明:
    17. * 由于WindowFunction会存储窗口内的所有数据,当窗口内数量特别大时,慎用
    18. * 思考:
    19. * 1.什么时候需要使用全窗口函数呢?
    20. * 计算平均数、计算中位数
    21. * */
    22. public class ProcessWindowFunctions {
    23. public static void main(String[] args) throws Exception {
    24. // 1.获取执行环境
    25. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    26. env.setParallelism(3);
    27. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    28. // TODO 基于处理时间的 滚动时间窗口,窗口长度10秒
    29. timewindow(env);
    30. // TODO 计数窗口
    31. //countwindow(env);
    32. // 3.触发程序执行
    33. env.execute();
    34. }
    35. private static void timewindow(StreamExecutionEnvironment env) {
    36. env
    37. .socketTextStream("localhost", 9999)
    38. .map(new MapFunction>() {
    39. @Override
    40. public Tuple2 map(String value) throws Exception {
    41. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    42. }
    43. }
    44. )
    45. .keyBy(value -> value.f0)
    46. // TODO 指定窗口类型:基于处理时间的 滚动时间窗口,窗口长度5秒
    47. .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    48. // TODO 对窗口内的元素求和
    49. .process(
    50. new ProcessWindowFunction, String, String, TimeWindow>() {
    51. /**
    52. * @param s 窗口中所属的key
    53. * @param context 上下文对象
    54. * @param elements 窗口中存储的数据
    55. * @param out 采集器,用来向下游发送数据
    56. * @throws Exception
    57. */
    58. @Override
    59. public void process(String s, ProcessWindowFunction, String, String, TimeWindow>.Context context, Iterable> elements, Collector out) throws Exception {
    60. // 通过窗口对象,获取窗口的范围信息
    61. long startTs = context.window().getStart();
    62. long endTs = context.window().getEnd();
    63. String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
    64. String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    65. long count = elements.spliterator().estimateSize();
    66. // 当前处理数据
    67. long currentProcessingTime = context.currentProcessingTime();
    68. String currentProcessingTimeS = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    69. // 当前水位线
    70. long currentWatermark = context.currentWatermark();
    71. out.collect("key=" + s + ",的窗口[" + windowStart + "," + windowEnd + ") 包含" + count + "条数据 ===> " + elements.toString() + " 当前处理时间:" + currentProcessingTimeS);
    72. }
    73. }
    74. )
    75. .print()
    76. ;
    77. }
    78. private static void countwindow(StreamExecutionEnvironment env) {
    79. env
    80. .socketTextStream("localhost", 9999)
    81. .map(new MapFunction>() {
    82. @Override
    83. public Tuple2 map(String value) throws Exception {
    84. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    85. }
    86. }
    87. )
    88. .keyBy(value -> value.f0)
    89. // TODO 指定窗口类型:计数窗口,窗口长度为5
    90. .countWindow(5)
    91. // TODO 对窗口内的元素求和
    92. .process(
    93. new ProcessWindowFunction, String, String, GlobalWindow>() {
    94. /**
    95. * @param s 窗口中所属的key
    96. * @param context 上下文对象
    97. * @param elements 窗口中存储的数据
    98. * @param out 采集器,用来向下游发送数据
    99. * @throws Exception
    100. */
    101. @Override
    102. public void process(String s, ProcessWindowFunction, String, String, GlobalWindow>.Context context, Iterable> elements, Collector out) throws Exception {
    103. // 通过窗口对象,获取窗口的范围信息
    104. long count = elements.spliterator().estimateSize();
    105. // 当前处理数据
    106. long currentProcessingTime = context.currentProcessingTime();
    107. String currentProcessingTimeS = DateFormatUtils.format(currentProcessingTime, "yyyy-MM-dd HH:mm:ss.SSS");
    108. // 当前水位线
    109. long currentWatermark = context.currentWatermark();
    110. out.collect("key=" + s + ",的窗口 包含" + count + "条数据 ===> " + elements.toString() + " 当前处理时间:" + currentProcessingTimeS);
    111. }
    112. }
    113. )
    114. .print()
    115. ;
    116. }
    117. }

    5.4、增量聚合的 ProcessWindowFunction

    函数功能:

            既可以使用 ReduceFunction 或 AggregateFunction 增量聚合功能

            又可以使用 ProcessWindowFunction 中的元数据信息(窗口信息、水位线信息)

    代码示例:

    1. /*
    2. * TODO 全窗口函数:ProcessWindowFunction
    3. * 特点:
    4. * 1.窗口触发时才会调用一次process方法,对窗口内的数据统一计算
    5. * 泛型参数说明:
    6. * ProcessWindowFunction
    7. * @IN : 输入数据类型
    8. * @OUT : 输出数据类型
    9. * @KEY : key的数据类型
    10. * @W : window类型(时间窗口、计数窗口)
    11. * 上下文对象说明:
    12. * 时间信息:currentProcessingTime()、currentWatermark()
    13. * 状态信息:windowState()、globalState()
    14. * 侧输出流:
    15. * 窗口信息:window()
    16. * 重点说明:
    17. * 由于WindowFunction会存储窗口内的所有数据,当窗口内数量特别大时,慎用
    18. * 思考:
    19. * 1.什么时候需要使用全窗口函数呢?
    20. * 计算平均数、计算中位数
    21. * */
    22. public class ReduceAggredateProcessWindowFunctions {
    23. public static void main(String[] args) throws Exception {
    24. // 1.获取执行环境
    25. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    26. env.setParallelism(3);
    27. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    28. // TODO 基于处理时间的 滚动时间窗口,窗口长度10秒
    29. // TODO 使用 ReduceFunction + ProcessWindowFunction 增量聚合
    30. //reduceAndProcessWindowFunction(env);
    31. // TODO 使用 AggregateFunction + ProcessWindowFunction 增量聚合
    32. aggreateAndProcessWindowFunction(env);
    33. // 3.触发程序执行
    34. env.execute();
    35. }
    36. private static void reduceAndProcessWindowFunction(StreamExecutionEnvironment env) {
    37. env
    38. .socketTextStream("localhost", 9999)
    39. .map(new MapFunction>() {
    40. @Override
    41. public Tuple2 map(String value) throws Exception {
    42. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    43. }
    44. }
    45. )
    46. .keyBy(value -> value.f0)
    47. // TODO 指定窗口类型:基于处理时间的 滚动时间窗口,窗口长度5秒
    48. .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    49. // TODO 获取窗口中的最小元素和窗口的开始时间、结束时间
    50. .reduce(
    51. new ReduceFunction>() {
    52. /**
    53. * @param value1 参与聚合的第一个值,也就是累加器的值
    54. * @param value2 参与聚合的第二个值,也就是新进入窗口的时间数据
    55. * @return
    56. * @throws Exception
    57. */
    58. @Override
    59. public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
    60. System.out.println("触发计算:" + value1 + " ," + value2);
    61. long min = Math.min(value1.f1, value2.f1);
    62. return new Tuple2<>(value1.f0, min);
    63. }
    64. }
    65. , new ProcessWindowFunction, String, String, TimeWindow>() {
    66. /**
    67. * @param s 窗口中所属的key
    68. * @param context 上下文对象
    69. * @param elements 窗口中存储的数据
    70. * @param out 采集器,用来向下游发送数据
    71. * @throws Exception
    72. */
    73. @Override
    74. public void process(String s, ProcessWindowFunction, String, String, TimeWindow>.Context context, Iterable> elements, Collector out) throws Exception {
    75. // 通过窗口对象,获取窗口的范围信息
    76. long startTs = context.window().getStart();
    77. long endTs = context.window().getEnd();
    78. String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
    79. String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    80. long count = elements.spliterator().estimateSize();
    81. // 当前处理数据
    82. long currentProcessingTime = context.currentProcessingTime();
    83. String currentProcessingTimeS = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    84. // 当前水位线
    85. long currentWatermark = context.currentWatermark();
    86. out.collect("key=" + s + ",的窗口[" + windowStart + "," + windowEnd + ") 包含" + count + "条数据 ===> " + elements.toString() + " 当前处理时间:" + currentProcessingTimeS);
    87. }
    88. }
    89. )
    90. .print()
    91. ;
    92. }
    93. private static void aggreateAndProcessWindowFunction(StreamExecutionEnvironment env) {
    94. env
    95. .socketTextStream("localhost", 9999)
    96. .map(new MapFunction>() {
    97. @Override
    98. public Tuple2 map(String value) throws Exception {
    99. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    100. }
    101. }
    102. )
    103. .keyBy(value -> value.f0)
    104. // TODO 指定窗口类型:基于处理时间的 滚动时间窗口,窗口长度5秒
    105. .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    106. // TODO 计算窗口内数据平均值并与窗口对应的key一同输出
    107. .aggregate(
    108. new AggregateFunction, Tuple2, Double>() {
    109. /**
    110. * @return
    111. */
    112. @Override
    113. public Tuple2 createAccumulator() {
    114. return new Tuple2(0L, 0L);
    115. }
    116. /**
    117. * @param value The value to add
    118. * @param accumulator The accumulator to add the value to
    119. * @return
    120. */
    121. @Override
    122. public Tuple2 add(Tuple2 value, Tuple2 accumulator) {
    123. return new Tuple2(accumulator.f0 + value.f1, accumulator.f1 + 1L);
    124. }
    125. /**
    126. * @param accumulator The accumulator of the aggregation
    127. * @return
    128. */
    129. @Override
    130. public Double getResult(Tuple2 accumulator) {
    131. return (double) (accumulator.f0 * 1.0000 / accumulator.f1);
    132. }
    133. /**
    134. * @param a An accumulator to merge
    135. * @param b Another accumulator to merge
    136. * @return
    137. */
    138. @Override
    139. public Tuple2 merge(Tuple2 a, Tuple2 b) {
    140. return null;
    141. }
    142. }
    143. ,new ProcessWindowFunction(){
    144. /**
    145. * @param s The key for which this window is evaluated.
    146. * @param context The context in which the window is being evaluated.
    147. * @param elements The elements in the window being evaluated.
    148. * @param out A collector for emitting elements.
    149. * @throws Exception
    150. */
    151. @Override
    152. public void process(String s, ProcessWindowFunction.Context context, Iterable elements, Collector out) throws Exception {
    153. // 通过窗口对象,获取窗口的范围信息
    154. long startTs = context.window().getStart();
    155. long endTs = context.window().getEnd();
    156. String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
    157. String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    158. long count = elements.spliterator().estimateSize();
    159. // 当前处理数据
    160. long currentProcessingTime = context.currentProcessingTime();
    161. String currentProcessingTimeS = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    162. // 当前水位线
    163. long currentWatermark = context.currentWatermark();
    164. out.collect("key=" + s + ",的窗口[" + windowStart + "," + windowEnd + ") 包含" + count + "条数据 ===> " + elements.toString() + " 当前处理时间:" + currentProcessingTimeS);
    165. }
    166. }
    167. )
    168. .print()
    169. ;
    170. }
    171. }

    6、Flink中的 Window的生命周期

    窗口什么时候创建?

            当窗口所属的第一条数据到达时,窗口会被创建

    窗口什么时候删除?

            当 水位线 或者 processing time 超过窗口的结束时间戳 + allowedLateness时,窗口会被删除

    窗口什么时候被触发计算?

            当定义的触发器触发时, window function 会处理窗口内的数据

            默认触发器:

                    当 水位线 或者 processing time 超过窗口的结束时间戳时,窗口会被计算

    6.1、触发器 - Triggers

    触发器的作用:

            Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理

    内置 Trigger:

    自定义 Triggers:

    1. Trigger 接口:
    2. // 每个元素被加入窗口时调用
    3. @Override
    4. public TriggerResult onElement(Tuple2 element, long timestamp, GlobalWindow window, TriggerContext ctx)
    5. // 在注册的 event-time timer 触发时调用
    6. @Override
    7. public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
    8. // 在注册的 processing-time timer 触发时调用
    9. @Override
    10. public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx)
    11. // 对应窗口被移除时所需的逻辑
    12. @Override
    13. public void clear(GlobalWindow window, TriggerContext ctx)
    14. TriggerResult:
    15. CONTINUE: 什么也不做
    16. FIRE: 触发计算
    17. PURGE: 清空窗口内的元素
    18. FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素

    代码示例:

    1. // TODO 自定义触发器
    2. public class Triggers {
    3. public static void main(String[] args) throws Exception {
    4. // 1.获取执行环境
    5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    6. env.setParallelism(3);
    7. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    8. // TODO Window:KeyedStream → WindowedStream
    9. // countWindow(env);
    10. // TODO countWindowAll:DataStream → AllWindowedStream
    11. timeWindow(env);
    12. // 3.触发程序执行
    13. env.execute();
    14. }
    15. // TODO 基于 计数窗口 的触发器
    16. private static void countWindow(StreamExecutionEnvironment env) {
    17. env
    18. .socketTextStream("localhost", 9999)
    19. .map(new MapFunction>() {
    20. @Override
    21. public Tuple2 map(String value) throws Exception {
    22. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    23. }
    24. }
    25. )
    26. .keyBy(value -> value.f0)
    27. // TODO 滚动窗口,窗口长度=5个元素
    28. //.countWindow(5)
    29. // TODO 滑动窗口,窗口长度=5个元素,滑动步长=2个元素 (每当进入两个数据,就统计下窗口内最五个数据)
    30. .countWindow(5, 2)
    31. .trigger(
    32. new Trigger, GlobalWindow>() {
    33. @Override
    34. public TriggerResult onElement(Tuple2 element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
    35. System.out.println("调用onElement方法");
    36. if (element.f1 == 999) {
    37. return TriggerResult.FIRE;
    38. } else {
    39. return TriggerResult.CONTINUE;
    40. }
    41. }
    42. @Override
    43. public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    44. System.out.println("调用onProcessingTime方法");
    45. return null;
    46. }
    47. @Override
    48. public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    49. System.out.println("调用onEventTime方法");
    50. return null;
    51. }
    52. @Override
    53. public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
    54. System.out.println("调用clear方法");
    55. }
    56. }
    57. )
    58. .process(
    59. new ProcessWindowFunction, String, String, GlobalWindow>() {
    60. @Override
    61. public void process(String s, ProcessWindowFunction, String, String, GlobalWindow>.Context context, Iterable> elements, Collector out) throws Exception {
    62. // 当前水位线
    63. long watermark = context.currentWatermark();
    64. // 当前处理时间
    65. long processingTime = context.currentProcessingTime();
    66. // 窗口开始时间
    67. // 窗口结束时间
    68. // 计算窗口内数据数量
    69. long count = elements.spliterator().estimateSize();
    70. String record = "key=" + s
    71. + " 包含" + count + "条数据===>" + elements.toString()
    72. + " 当前Watermark:" + watermark
    73. + " 当前processingTime:" + processingTime;
    74. out.collect(record);
    75. }
    76. }
    77. )
    78. .print()
    79. ;
    80. }
    81. // TODO 基于 处理时间窗口 的触发器
    82. private static void timeWindow(StreamExecutionEnvironment env) {
    83. env
    84. .socketTextStream("localhost", 9999)
    85. .map(new MapFunction>() {
    86. @Override
    87. public Tuple2 map(String value) throws Exception {
    88. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    89. }
    90. }
    91. )
    92. .keyBy(value -> value.f0)
    93. // 滚动窗口,窗口长度5s
    94. .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    95. .trigger(
    96. new Trigger, TimeWindow>() {
    97. @Override
    98. public TriggerResult onElement(Tuple2 element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    99. System.out.println("调用onElement方法");
    100. return TriggerResult.CONTINUE;
    101. }
    102. @Override
    103. public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
    104. System.out.println("调用onProcessingTime方法");
    105. return TriggerResult.FIRE;
    106. }
    107. @Override
    108. public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
    109. System.out.println("调用onEventTime方法");
    110. return TriggerResult.CONTINUE;
    111. }
    112. @Override
    113. public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
    114. System.out.println("调用clear方法");
    115. }
    116. }
    117. )
    118. .process(new ShowProcessWindowFunction())
    119. .print()
    120. ;
    121. }
    122. }

    6.2、移除器 - Evictors

    移除器的作用:

            Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素

    内置 Evictors :

            CountEvictor(元素个数) :一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除

            DeltaEvictor :接收 DeltaFunction 和 threshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素

            TimeEvictor :接收 interval 参数,以毫秒表示。 它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素

    代码示例:

    1. public class Evictors {
    2. public static void main(String[] args) throws Exception {
    3. // 1.获取执行环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. env.setParallelism(3);
    6. env
    7. .socketTextStream("localhost", 9999)
    8. .map(new MapFunction>() {
    9. @Override
    10. public Tuple2 map(String value) throws Exception {
    11. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    12. }
    13. }
    14. )
    15. .keyBy(value -> value.f0)
    16. // TODO 滚动窗口,窗口长度=5个元素
    17. .countWindow(5)
    18. .evictor(CountEvictor.of(2L))
    19. .process(
    20. new ProcessWindowFunction, String, String, GlobalWindow>() {
    21. @Override
    22. public void process(String s, ProcessWindowFunction, String, String, GlobalWindow>.Context context, Iterable> elements, Collector out) throws Exception {
    23. // 当前水位线
    24. long watermark = context.currentWatermark();
    25. // 当前处理时间
    26. long processingTime = context.currentProcessingTime();
    27. // 窗口开始时间
    28. // 窗口结束时间
    29. // 计算窗口内数据数量
    30. long count = elements.spliterator().estimateSize();
    31. String record = "key=" + s
    32. + " 包含" + count + "条数据===>" + elements.toString()
    33. + " 当前Watermark:" + watermark
    34. + " 当前processingTime:" + processingTime;
    35. out.collect(record);
    36. }
    37. }
    38. )
    39. .print()
    40. ;
    41. // 3.触发程序执行
    42. env.execute();
    43. }
    44. }

    7、对迟到数据的处理

            在使用 event-time 窗口时,数据可能会迟到,默认情况下,watermark 一旦越过窗口结束的 timestamp,迟到的数据就会被直接丢弃,Flink中提供多种多处理迟到数据的策略。

    7.1、生成水位线时,设置最大乱序时间

            在生成 watermark 时,设置一个可容忍的最大乱序时间,保证窗口计算被延迟执行,保证更多的迟到的数据能够进入窗口

            注意:迟到的数据超过了设置的最大乱序时间,将会被丢弃

    1. WatermarkStrategy
    2. .>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 设置最大乱序时间为1s
    3. .withTimestampAssigner((element,recordTimestamp) -> element.f1);

    7.2、设置窗口延迟关闭 - allowedLateness

    可以通过 window.allowedLateness 来设置窗口延迟关闭的时间

    allowedLateness 默认为0

            表示 当watermark 或 processing time 超过 窗口结束的timestamp时,触发计算 并销毁窗口

    allowedLateness 大于0时

            表示  当watermark 或 processing time 超过 窗口结束的timestamp时,会触发计算 但是并不会销毁窗口
            而是当 窗口结束的timestamp + allowedLateness 的数据到来后,才会销毁窗口
            并且 每次接收到迟到的数据后 都会触发窗口计算

    代码示例:

    1. public class AllowedLateness {
    2. public static void main(String[] args) throws Exception {
    3. // 1.获取执行环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. env.setParallelism(1);
    6. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    7. env
    8. .socketTextStream("localhost", 9999)
    9. .map(new MapFunction>() {
    10. @Override
    11. public Tuple2 map(String value) throws Exception {
    12. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    13. }
    14. }
    15. )
    16. .assignTimestampsAndWatermarks(
    17. WatermarkStrategy
    18. .>forMonotonousTimestamps()
    19. .withTimestampAssigner((element, recordTimestamp) -> element.f1)
    20. )
    21. .keyBy(value -> value.f0)
    22. // 滚动窗口,窗口长度5s
    23. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    24. // 窗口延迟10s关闭
    25. .allowedLateness(Time.seconds(10))
    26. .process(new ShowProcessWindowFunction())
    27. .print()
    28. ;
    29. // 3.触发程序执行
    30. env.execute();
    31. }
    32. }

    运行结果:


    7.3、使用侧输出流获取迟到的数据 - sideOutputLateData

    在 Flink中可以使用 侧输出流-OutputTag 来获取窗口中迟到的数据

    代码示例:

    1. // TODO 使用侧流接收迟到的数据
    2. public class UseSideOutputReceiveLatedata {
    3. public static void main(String[] args) throws Exception {
    4. // 1.获取执行环境
    5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    6. env.setParallelism(1);
    7. // 声明 OutputTag 对象,用来接收迟到的数据
    8. OutputTag> outputTag = new OutputTag>("side-output"){};
    9. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
    10. SingleOutputStreamOperator processDataStream = env
    11. .socketTextStream("localhost", 9999)
    12. .map(new MapFunction>() {
    13. @Override
    14. public Tuple2 map(String value) throws Exception {
    15. return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
    16. }
    17. }
    18. )
    19. .assignTimestampsAndWatermarks(
    20. WatermarkStrategy
    21. .>forMonotonousTimestamps()
    22. .withTimestampAssigner((element, recordTimestamp) -> element.f1)
    23. )
    24. .keyBy(value -> value.f0)
    25. // 滚动窗口,窗口长度5s
    26. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    27. .sideOutputLateData(outputTag)
    28. .process(new ShowProcessWindowFunction());
    29. // 输出主流
    30. processDataStream.print("主流输出");
    31. // 从主流获取侧输出流,打印迟到的数据
    32. processDataStream.getSideOutput(outputTag).printToErr("关窗后的迟到数据");
    33. // 3.触发程序执行
    34. env.execute();
    35. }
    36. }

    运行结果:

     

  • 相关阅读:
    判断当前shell版本
    TBarCode SDK 11.15.1 Crack
    k8s笔记资源限制,亲和和性 污点和容忍
    el-popover和el-tooltip样式修改(普通的组件样式修改方法,对popover是不生效的)
    解决Windows 10更新安装失败的问题
    TS以及webpack的es module
    Pytest教程:Fixture详解
    如何在 Ubuntu 22.04.1 上安装并使用 PostgreSQL 14.5?
    开源多系统启动工具-ventoy
    「实用技巧」后端如何使用 Eolink Apikit 快速调试接口?
  • 原文地址:https://blog.csdn.net/weixin_42845827/article/details/131541254