• flink的CoProcessFunction使用示例


    背景

    flink中对两个流进行connect之后进行出处理的场景很常见,我们本文就以书中的一个例子为例说明下实现一个CoProcessFunction的一些要点

    实现CoProcessFunction的一些要点

    这个例子举例的是当收到某个传感器放行的控制消息时,从传感器传来的温度流消息会被运行向下游传递一段时间

    /**
     * 展示CoProcessFunction+onTimer使用方法的例子
     */
    public class CoProcessFunctionTimers {
     
        public static void main(String[] args) throws Exception {
     
            // set up the streaming execution environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     
            // use event time for the application
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
     
            // 控制消息流允许传感器消息流通过指定长度的时间
            DataStream<Tuple2<String, Long>> filterSwitches = env
                .fromElements(
                    // forward readings of sensor_2 for 10 seconds
                    Tuple2.of("sensor_2", 10_000L),
                    // forward readings of sensor_7 for 1 minute
                    Tuple2.of("sensor_7", 60_000L));
     
            // 传感器消息流
            DataStream<SensorReading> readings = env
                // SensorSource generates random temperature readings
                .addSource(new SensorSource());
     
            //传感器消息流connet控制消息流,并且按照传感器id作为key进行分组
            DataStream<SensorReading> forwardedReadings = readings
                //连接控制消息流
                .connect(filterSwitches)
                // 按照传感器id分组
                .keyBy(r -> r.id, s -> s.f0)
                // 应用CoProcessFunction + onTimer函数
                .process(new ReadingFilter());
     
            forwardedReadings.print();
     
            env.execute("Filter sensor readings");
        }
     
        //应用CoProcessFunction + onTimer函数,这已经按照key=传感器id分好组
        public static class ReadingFilter extends CoProcessFunction<SensorReading, Tuple2<String, Long>, SensorReading> {
     
            // 传感器开关状态--键值分区状态,key是传感器id
            private ValueState<Boolean> forwardingEnabled;
         // 保存传感器开关持续时间的状态--键值分区状态,key是传感器id
            private ValueState<Long> disableTimer;
     
            // 初始化键值分区状态 key是传感器id
            public void open(Configuration parameters) throws Exception {
                forwardingEnabled = getRuntimeContext().getState(
                    new ValueStateDescriptor<>("filterSwitch", Types.BOOLEAN));
                disableTimer = getRuntimeContext().getState(
                    new ValueStateDescriptor<Long>("timer", Types.LONG));
            }
     
            @Override
            public void processElement1(SensorReading r, Context ctx, Collector<SensorReading> out) throws Exception {
                // 处理传感器消息流,首先检查key是传感器id对应的键值分区状态,如果开启,那么这个传感器消息就可以正常通过
                Boolean forward = forwardingEnabled.value();
                if (forward != null && forward) {
                    out.collect(r);
                }
            }
     
            @Override
            public void processElement2(Tuple2<String, Long> s, Context ctx, Collector<SensorReading> out) throws Exception {
                //控制流消息过来后,更新键值分区的开关状态为true, key是传感器id
                forwardingEnabled.update(true);
            //控制流消息过来后,更新键值分区的开关状态为true的持续时长的定时器, key是传感器id
                long timerTimestamp = ctx.timerService().currentProcessingTime() + s.f1;
                Long curTimerTimestamp = disableTimer.value();
                if (curTimerTimestamp == null || timerTimestamp > curTimerTimestamp) {
                    // remove current timer
                    if (curTimerTimestamp != null) {
                        ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);
                    }
                    // register new timer
                    ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
                    disableTimer.update(timerTimestamp);
                }
            }
     
            // 键值开关状态的持续时间定时器,key是传感器id,注意,在ontimer方法中,也可以通过out.collect的方式向下游算子发送消息
            public void onTimer(long ts, OnTimerContext ctx, Collector<SensorReading> out) throws Exception {
                // 定时器时间到了之后,清理掉传感器的开关状态
                forwardingEnabled.clear();
                disableTimer.clear();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    以上就是实现一个CoProcessFunction的大概逻辑

  • 相关阅读:
    【JAVA-C】流程控制 for 编程题
    tcpdump简析
    git patch 合入patch
    大一新生HTML期末作业,网页制作作业——明星介绍易烊千玺网站HTML+CSS
    2022速卖通宠物用品热销及需求品类推荐!
    升级Spring Cloud最新版后,有个重要的组件被弃用了。。。
    ASEMI肖特基二极管MBR40200PT参数和作用详解
    JMM(Java Memory Model)
    lc marathon 2022.6.22
    蓝牙技术|AirPods Pro 2或将搭载运动传感器,TWS蓝牙耳机发展新方向
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/134277185