• 周期性触发的自定义触发器


    背景

    本文我们实现一个周期性触发的自定义触发器,顺便看下实现自定义触发器的一些要点

    周期性触发器实现

    实现一个每分钟触发一次的自定义事件时间触发器,实现代码和注意事项如下所示

    package wikiedits.trigger;
    
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.streaming.api.windowing.triggers.Trigger;
    import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
    import org.apache.flink.streaming.api.windowing.windows.Window;
    
    public class OneMinuteIntervalTrigger<W extends Window> extends Trigger<Object, W> {
        private static final long serialVersionUID = 1L;
    
        private final long interval;
    
        // 触发时间的状态对象
        private final ValueStateDescriptor<Long> stateDesc =
                new ValueStateDescriptor<>("fire-time", TypeInformation.of(Long.class));
    
        private OneMinuteIntervalTrigger(long interval) {
            this.interval = interval;
        }
    
        @Override
        public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
    
            if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// 这里其实不是必要的,取决于窗口结束时间到之后是否要触发一次计算
                // if the watermark is already past the window fire immediately
                return TriggerResult.FIRE;
            } else {// 多次注册也没事,反正是同一个计时器,这表明窗口结束时想要触发一次计算,此外注意getEnd和maxTimestamp方法的区别
                ctx.registerEventTimeTimer(window.maxTimestamp());
            }
    
            // 仅仅在第一次未注册时注册一次,后续由ontimer触发
            ValueState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
            if (fireTimestamp.value() == null) {
                long start = timestamp - (timestamp % interval);
                long nextFireTimestamp = start + interval;
                ctx.registerEventTimeTimer(nextFireTimestamp);
                fireTimestamp.update(nextFireTimestamp);
            }
    
            return TriggerResult.CONTINUE;
        }
    
        // 计时器触发的函数
        @Override
        public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
    
            // 这里窗口结束时触发不是必要的,取决于是否想要在窗口结束是触发一次计算,并且这里如果不处理延迟的消息,可以返回FIRE_AND_PURGE清理窗口状态(但是注意即使返回PURGE,也不会清理触发器的状态)
            if (time == window.maxTimestamp()) {
                return TriggerResult.FIRE;
            }
    
            ValueState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
    
            Long fireTimestamp = fireTimestampState.value();
    
            // 继续注册计时器
            if (fireTimestamp != null && fireTimestamp == time) {
                fireTimestampState.update(time + interval);
                ctx.registerEventTimeTimer(time + interval);
                return TriggerResult.FIRE;
            }
    
            return TriggerResult.CONTINUE;
        }
    
        @Override
        public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }
    
        @Override
        public void clear(W window, TriggerContext ctx) throws Exception {
            // 清理触发器状态
            ValueState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
            Long timestamp = fireTimestamp.value();
            if (timestamp != null) {
                ctx.deleteEventTimeTimer(timestamp);
                fireTimestamp.clear();
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    代码里面注解已经比较详细的说明了注意事项,此外对于状态的清理,我们需要看的是WindowOperator,如下

    在这里插入图片描述

  • 相关阅读:
    【Android】导入三方jar包/系统的framework.jar
    VLAN 数据帧的处理
    指望识别技术
    URDF+Gazebo+Rviz仿真
    盲盒电商模式:融合神秘感与趣味性,带来消费者购买欲望与商业利益
    聊聊使用场景法进行性能测试
    宗老师团队国家工程-园区GIS应用
    使用信号分析器
    AT89S52单片机
    ORACLE获取 表的主键id值
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133549437