• flink自定义窗口分配器


    背景

    我们知道处理常用的滑动窗口分配器,滚动窗口分配器,全局窗口分配器,会话窗口分配器外,我们可以实现自己的自定义窗口分配器,以实现我们的自己的窗口逻辑

    自定义窗口分配器的实现

    package wikiedits.assigner;
    
    import com.google.common.collect.Lists;
    import org.apache.flink.api.common.ExecutionConfig;
    import org.apache.flink.api.common.typeutils.TypeSerializer;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
    import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
    import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
    import org.apache.flink.streaming.api.windowing.triggers.Trigger;
    import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    import java.util.Collection;
    import java.util.Collections;
    
    public class IntervalWindowAssigner
            extends WindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
        private long windowSize = 60 * 1000L;
    
        private IntervalWindowAssigner() {}
    
        @Override
        public Collection<TimeWindow> assignWindows(
                Object element, long timestamp, WindowAssignerContext context) {
    
            long startTime = timestamp -  (timestamp % windowSize);
            long endTime = startTime + windowSize;
    
            return Lists.newArrayList(new TimeWindow(startTime, endTime));
        }
    
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
    
        @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override public boolean isEventTime() {
            return true;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    注意,TimeWindow时间窗口是左边右开的形式,参见下图所示
    在这里插入图片描述
    代码里面是以maxTimeStamp()为准的

  • 相关阅读:
    奇异值和零空间
    责任链模式
    【淘宝API】商品详情+搜索商品列表接口
    nfs client端 故障
    Unity 设置Inspect上问号的跳转链接
    js基础笔记学习53-练习2计算水仙花数2
    基于Echarts实现可视化数据大屏机械设备监测大数据统计平台HTML页面
    npm install导致的OOM解决方案
    Python Language Megadoc
    Canvas画个饼图
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133550103