• 36、Flink 的 WindowAssigner之滑动窗口示例


    1、处理时间
    无需设置水位线策略和时间戳字段。

    input.keyBy(e -> e)
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                    .apply(new WindowFunction<String, String, String, TimeWindow>() {
                        @Override
                        public void apply(String s, TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {
                            for (String input : iterable) {
                                collector.collect(input);
                            }
                        }
                    })
                    .print();
    

    2、事件时间
    需设置水位线策略和时间戳字段。

    // 事件时间需要设置水位线策略和时间戳
            SingleOutputStreamOperator<Tuple2<String, Long>> map = input.map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String input) throws Exception {
                    String[] fields = input.split(",");
                    return new Tuple2<>(fields[0], Long.parseLong(fields[1]));
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = map.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                    .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                        @Override
                        public long extractTimestamp(Tuple2<String, Long> input, long l) {
                            return input.f1;
                        }
                    }));
    
            // 滑动 event-time 窗口
            watermarks.keyBy(e -> e.f0)
                    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                    .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                        @Override
                        public void apply(String s, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
                            for (Tuple2<String, Long> input : iterable) {
                                collector.collect(input.f0);
                            }
                        }
                    })
                    .print();
    

    3、完整代码示例

    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import java.time.Duration;
    
    public class _03_WindowAssignerSliding {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> input = env.socketTextStream("localhost", 8888);
    
            // 测试时限制了分区数,生产中需要设置空闲数据源
            env.setParallelism(2);
    
            // 事件时间需要设置水位线策略和时间戳
            SingleOutputStreamOperator<Tuple2<String, Long>> map = input.map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String input) throws Exception {
                    String[] fields = input.split(",");
                    return new Tuple2<>(fields[0], Long.parseLong(fields[1]));
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = map.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                    .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                        @Override
                        public long extractTimestamp(Tuple2<String, Long> input, long l) {
                            return input.f1;
                        }
                    }));
    
            // 滑动 event-time 窗口
            watermarks.keyBy(e -> e.f0)
                    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                    .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                        @Override
                        public void apply(String s, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
                            for (Tuple2<String, Long> input : iterable) {
                                collector.collect(input.f0);
                            }
                        }
                    })
                    .print();
    
            // 滑动 processing-time 窗口
            input.keyBy(e -> e)
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                    .apply(new WindowFunction<String, String, String, TimeWindow>() {
                        @Override
                        public void apply(String s, TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {
                            for (String input : iterable) {
                                collector.collect(input);
                            }
                        }
                    })
                    .print();
    
            // 滑动 processing-time 窗口,偏移量为 -8 小时
            input.keyBy(e -> e)
                    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
                    .apply(new WindowFunction<String, String, String, TimeWindow>() {
                        @Override
                        public void apply(String s, TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {
                            for (String input : iterable) {
                                collector.collect(input);
                            }
                        }
                    })
                    .print();
    
            env.execute();
        }
    }
    
  • 相关阅读:
    《红蓝攻防对抗实战》八.利用OpenSSL对反弹shell流量进行加密
    CSDN客诉周报第11期|修复6个重大bug,解决32个次要bug
    小学生python游戏编程arcade----可旋转的坦克
    浅谈Rust--学习心得及rust的优势与劣势
    【C++】运算符重载 ④ ( 一元运算符重载 | 使用 全局函数 实现 前置 ++ 自增运算符重载 | 使用 全局函数 实现 前置 - - 自减运算符重载 )
    基于动态分级策略的改进正余弦算法-附代码
    NFNet:NF-ResNet的延伸,不用BN的4096超大batch size训练 | 21年论文
    C与C++中的常用符号与标点用法详解及实例
    vector详解以及一些问题(C++)
    [C/C++] 数据结构 LeetCode:用队列实现栈
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/139433491