• 43、Flink 自定义窗口触发器代码示例


    1、方法说明

    1)onElement() 方法在每个元素被加入窗口时调用。

    返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件
    CONTINUE: 什么也不做
    FIRE: 触发计算
    PURGE: 清空窗口内的元素
    FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素

    2)onProcessingTime() 方法在注册的 processing-time timer 触发时调用。

    返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件
    CONTINUE: 什么也不做
    FIRE: 触发计算
    PURGE: 清空窗口内的元素
    FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素

    3)onEventTime() 方法在注册的 event-time timer 触发时调用。

    返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件
    CONTINUE: 什么也不做
    FIRE: 触发计算
    PURGE: 清空窗口内的元素
    FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素

    4)clear() 方法处理在对应窗口被移除时所需的逻辑。

    5)onMerge() 方法与有状态的 trigger 相关,该方法会在两个窗口合并时,将窗口对应 trigger 的状态合并,比如使用会话窗口时。

    2、完整代码示例

    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
    import org.apache.flink.streaming.api.windowing.triggers.Trigger;
    import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
    import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
    import org.apache.flink.streaming.api.windowing.windows.Window;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * FIRE 会保留被触发的窗口中的内容,Flink 内置的 trigger 默认使用 `FIRE`。
     * FIRE_AND_PURGE 不会保留被触发的窗口中的内容
     * Purge 只会移除窗口中的内容,不会移除关于窗口的 meta-information 和 trigger 的状态
     */
    public class _08_WindowTriggerCustom {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> input = env.socketTextStream("localhost", 8888);
    
            ArrayList<String> keyList = new ArrayList<>();
            keyList.add("c");
            keyList.add("d");
    
            // 测试时限制了分区数,生产中需要设置空闲数据源
            env.setParallelism(2);
    
            // Processing-Time
            input.keyBy(e -> String.valueOf(e.hashCode() % 2))
                    .window(GlobalWindows.create())
                    .trigger(new MyCustomWindowTrigger<>(keyList))
                    .apply(new WindowFunction<String, String, String, GlobalWindow>() {
                        @Override
                        public void apply(String s, GlobalWindow globalWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {
                            for (String word : iterable) {
                                collector.collect(word);
                            }
                        }
                    })
                    .print();
    
            env.execute();
        }
    }
    
    class MyCustomWindowTrigger<W extends Window> extends Trigger<String, W> {
        private List<String> keyWords;
    
        private ValueStateDescriptor<String> valueStateDescriptor;
    
        public MyCustomWindowTrigger(List<String> keyWords) {
            this.keyWords = keyWords;
            this.valueStateDescriptor = new ValueStateDescriptor<>("cnt", String.class);
        }
    
        // onElement() 方法在每个元素被加入窗口时调用。
        // 返回 `TriggerResult` 来决定 trigger 如何应对到达窗口的事件
        //- `CONTINUE`: 什么也不做
        //- `FIRE`: 触发计算
        //- `PURGE`: 清空窗口内的元素
        //- `FIRE_AND_PURGE`: 触发计算,计算结束后清空窗口内的元素
        @Override
        public TriggerResult onElement(String input, long l, W w, TriggerContext triggerContext) throws Exception {
            //当窗口内的元素匹配到首个关键字时触发,触发前的元素用 '-' 拼接
            //a0\b0\a1\b1\c2\d2
            //FIRE_AND_PURGE=会清除窗口状态
            //FIRE=不会清除窗口状态
            //2> b0
            //2> a1
            //2> c2
            //
            //1> a0
            //1> b1
            //1> d2
            ValueState<String> partitionedState = triggerContext.getPartitionedState(valueStateDescriptor);
            String value = partitionedState.value();
    
            for (String keyWord : keyWords) {
                if (input.startsWith(keyWord)) {
                    if (value == null || value.isEmpty()) {
                        partitionedState.update(input);
                    }
                    value += "-" + input;
                    partitionedState.update(value);
                }
            }
    
            if (partitionedState.value() != null && !partitionedState.value().isEmpty()) {
    //            return TriggerResult.FIRE_AND_PURGE;
                return TriggerResult.FIRE;
            }
    
            return TriggerResult.CONTINUE;
        }
    
        // onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
        // 返回 `TriggerResult` 来决定 trigger 如何应对到达窗口的事件
        //- `CONTINUE`: 什么也不做
        //- `FIRE`: 触发计算
        //- `PURGE`: 清空窗口内的元素
        //- `FIRE_AND_PURGE`: 触发计算,计算结束后清空窗口内的元素
        @Override
        public TriggerResult onProcessingTime(long l, W w, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }
    
        // onEventTime() 方法在注册的 event-time timer 触发时调用。
        // 返回 `TriggerResult` 来决定 trigger 如何应对到达窗口的事件
        //- `CONTINUE`: 什么也不做
        //- `FIRE`: 触发计算
        //- `PURGE`: 清空窗口内的元素
        //- `FIRE_AND_PURGE`: 触发计算,计算结束后清空窗口内的元素
        @Override
        public TriggerResult onEventTime(long l, W w, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }
    
        // clear() 方法处理在对应窗口被移除时所需的逻辑。
        @Override
        public void clear(W w, TriggerContext triggerContext) throws Exception {
            ValueState<String> partitionedState = triggerContext.getPartitionedState(valueStateDescriptor);
            partitionedState.clear();
        }
    
        // onMerge() 方法与有状态的 trigger 相关,该方法会在两个窗口合并时,将窗口对应 trigger 的状态合并,比如使用会话窗口时。
        @Override
        public void onMerge(W window, OnMergeContext ctx) throws Exception {
        }
    }
    
  • 相关阅读:
    LeetCode1005. K 次取反后最大化的数组和
    // 029 方阵行列互换
    论文阅读:Generative Adversarial Transformers
    美颜SDK集成指南:为应用添加视频美颜功能
    聊聊几位大厂清华同学的近况
    java虚拟机堆空间
    postman下载安装汉化及使用
    c语言实现数据结构中的队列
    vue.draggable拖动插件的使用
    Spring Boot文档阅读笔记-CORS Support
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/139648337