• flink对状态ttl进行单元测试


    背景

    在处理键值分区状态时,使用ttl设置过期时间是我们经常使用的,但是任何代码的修改都需要首先进行单元测试,本文就使用单元测试来验证一下状态ttl的设置是否正确

    测试状态ttl超时的单元测试

    首先看一下处理函数:

    // 处理函数
    public class MyStateProcessFunction extends KeyedProcessFunction<String, String, String> {
     
        // 键值分区状态
        ValueState<String> previousInput;
     
        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<String>("previousInput", Types.STRING);
            // 状态ttl超时时间设置
            StateTtlConfig ttlConfig =
                    StateTtlConfig.newBuilder(Time.minutes(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                            // check 10 keys for every state access
                            .cleanupIncrementally(10, false).build();
            stateDescriptor.enableTimeToLive(ttlConfig);
            previousInput = getRuntimeContext().getState(stateDescriptor);
        }
     
        @Override
        public void processElement(String in, Context context, Collector<String> collector) throws Exception {
            context.timerService().registerProcessingTimeTimer(100);
            String out = (Objects.nonNull(previousInput.value()) ? previousInput.value() : "") + in;
            collector.collect(out);
            if (!in.contains("NotUpdate")) {// 为了模仿有访问状态,但是不更新状态,正常情况下业务逻辑是访问其他key组的其它state,而一直没有访问的key的状态会在超时时间到之后被清理掉
                previousInput.update(in);
            }
        }
     
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            if (Objects.nonNull(previousInput.value())) {
                out.collect(String.format("timer trigger %s", previousInput.value()));
            } else {
                out.collect(String.format("timer trigger state clear", previousInput.value()));
            }
        }
     
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    单元测试代码:

    /**
     * 测试状态处理函数,包含状态的ttl配置,以及ontimer方法
     **/
    @Test
    public void testKeyedStateProcessFunction() throws Exception {
        MyStateProcessFunction myStateProcessFunction = new MyStateProcessFunction();
        OneInputStreamOperatorTestHarness<String, String> testHarness =
                ProcessFunctionTestHarnesses.forKeyedProcessFunction(myStateProcessFunction, x -> "1", Types.STRING);
        testHarness.open();
        testHarness.processElement("hello", 10);
        // 注册了一个定时器,定时器100后过期
        Assert.assertEquals(1, testHarness.numProcessingTimeTimers());
        // 测试输出
        Assert.assertEquals(Lists.newArrayList("hello"), testHarness.extractOutputValues());
        ValueState<String> previousInput = myStateProcessFunction.getRuntimeContext()
                .getState(new ValueStateDescriptor<>("previousInput", Types.STRING));
        // 查看下状态应该已经被设置
        Assert.assertEquals("hello", previousInput.value());
     
        testHarness.processElement("world", 10);
        // 再次测试输出
        Assert.assertEquals(Lists.newArrayList("hello", "helloworld"), testHarness.extractOutputValues());
        // 再次查看下状态应该已经被设置
        Assert.assertEquals("world", previousInput.value());
     
        // 设置时间为1分钟,让状态超时
        testHarness.setStateTtlProcessingTime(Time.minutes(1).toMilliseconds());
        // 触发下状态访问,这样flink就会清理,正常生产中不需要这一步,访问状态本来就一直在进行中,只是可能是其他key分组的状态
        testHarness.processElement("NotUpdate1", System.currentTimeMillis());
        // 查看下状态应该已经被清理
        Assert.assertNull(previousInput.value());
     
        // 设置让定时器过期,顺带确认下状态已经被清理
        testHarness.setProcessingTime(100);
     
        // 测试输出(包含两个输入+一个定时器的输出)
        Assert.assertEquals(Lists.newArrayList("hello", "helloworld", "NotUpdate1", "timer trigger state clear"),
                testHarness.extractOutputValues());
        testHarness.close();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    测试代码中已经包含了详细的注解,我们实现自己的ttl单元测试时可以参考下

  • 相关阅读:
    SparkSQL的Join的实现方式
    【C#】什么是并发,C#常规解决高并发的基本方法
    [JDK工具-10] jvisualvm 多合一故障处理工具
    深度学习入门(四十二)计算机视觉——目标检测和边界框
    docker 安装minio
    实用水文篇--SpringBoot整合Netty实现消息推送服务器
    i.MX 6ULL 驱动开发 四:设备树
    人工智能的优势:使用 GPT 和扩散模型生成图像
    Java的JDBC编程
    微信小程序格式化日期小插件
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/134387754