• 36、Flink 的 WindowAssigner之滑动窗口示例


    1、处理时间
    无需设置水位线策略和时间戳字段。

    input.keyBy(e -> e)
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                    .apply(new WindowFunction<String, String, String, TimeWindow>() {
                        @Override
                        public void apply(String s, TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {
                            for (String input : iterable) {
                                collector.collect(input);
                            }
                        }
                    })
                    .print();
    

    2、事件时间
    需设置水位线策略和时间戳字段。

    // 事件时间需要设置水位线策略和时间戳
            SingleOutputStreamOperator<Tuple2<String, Long>> map = input.map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String input) throws Exception {
                    String[] fields = input.split(",");
                    return new Tuple2<>(fields[0], Long.parseLong(fields[1]));
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = map.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                    .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                        @Override
                        public long extractTimestamp(Tuple2<String, Long> input, long l) {
                            return input.f1;
                        }
                    }));
    
            // 滑动 event-time 窗口
            watermarks.keyBy(e -> e.f0)
                    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                    .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                        @Override
                        public void apply(String s, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
                            for (Tuple2<String, Long> input : iterable) {
                                collector.collect(input.f0);
                            }
                        }
                    })
                    .print();
    

    3、完整代码示例

    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import java.time.Duration;
    
    public class _03_WindowAssignerSliding {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> input = env.socketTextStream("localhost", 8888);
    
            // 测试时限制了分区数,生产中需要设置空闲数据源
            env.setParallelism(2);
    
            // 事件时间需要设置水位线策略和时间戳
            SingleOutputStreamOperator<Tuple2<String, Long>> map = input.map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String input) throws Exception {
                    String[] fields = input.split(",");
                    return new Tuple2<>(fields[0], Long.parseLong(fields[1]));
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = map.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                    .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                        @Override
                        public long extractTimestamp(Tuple2<String, Long> input, long l) {
                            return input.f1;
                        }
                    }));
    
            // 滑动 event-time 窗口
            watermarks.keyBy(e -> e.f0)
                    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                    .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                        @Override
                        public void apply(String s, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
                            for (Tuple2<String, Long> input : iterable) {
                                collector.collect(input.f0);
                            }
                        }
                    })
                    .print();
    
            // 滑动 processing-time 窗口
            input.keyBy(e -> e)
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                    .apply(new WindowFunction<String, String, String, TimeWindow>() {
                        @Override
                        public void apply(String s, TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {
                            for (String input : iterable) {
                                collector.collect(input);
                            }
                        }
                    })
                    .print();
    
            // 滑动 processing-time 窗口,偏移量为 -8 小时
            input.keyBy(e -> e)
                    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
                    .apply(new WindowFunction<String, String, String, TimeWindow>() {
                        @Override
                        public void apply(String s, TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {
                            for (String input : iterable) {
                                collector.collect(input);
                            }
                        }
                    })
                    .print();
    
            env.execute();
        }
    }
    
  • 相关阅读:
    计算机组成原理4小时速成:存储器,内存ROM,RAM,Cache,高速缓存cache,外存,缓存命中率,效率
    有向图、无向图相关数据结构
    责任链模式:职责的传递与处理
    HTML5期末大作业:游戏网站设计与实现——基于bootstrap响应式游戏资讯网站制作HTML+CSS+JavaScript
    张一鸣:我遇到的优秀年轻人的5个特质
    Selenium安装WebDriver最新Chrome驱动(含116/117/118/119)
    Visual Studio 生产环境配置方案:SlowCheetah
    TensorFlow之分类模型-1
    pandas.eval()/pandas.Series()/lambda/itertools.product
    【Agora UID 踩坑记录 && Java 数据类型】
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/139433491