• flink中interval join的flinkSQL实现以及状态的TTL过期时间


    flink1.12中的interval join限定于 inner join, 而flinkSQL中可以实现多种形式的连接:inner join 、left join 、right join、full join

    package com.atguigu.app;
    
    import com.atguigu.bean.Bean1;
    import com.atguigu.bean.Bean2;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.time.Duration;
    
    public class FlinkSQLJoinTest {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));
    
            SingleOutputStreamOperator<Bean1> bean1DS = env.socketTextStream("hadoop102", 8888)
                    .map(line -> {
                        String[] split = line.split(",");
                        return new Bean1(split[0], split[1], Long.parseLong(split[2]));
                    });
    
            SingleOutputStreamOperator<Bean2> bean2DS = env.socketTextStream("hadoop102", 9999)
                    .map(line -> {
                        String[] split = line.split(",");
                        return new Bean2(split[0], split[1], Long.parseLong(split[2]));
                    });
    
            SingleOutputStreamOperator<Tuple2<Bean1, Bean2>> result = bean1DS.keyBy(Bean1::getId)
                    .intervalJoin(bean2DS.keyBy(Bean2::getId))
                    .between(Time.seconds(-5), Time.seconds(5))
                    .process(new ProcessJoinFunction<Bean1, Bean2, Tuple2<Bean1, Bean2>>() {
                        @Override
                        public void processElement(Bean1 left, Bean2 right, Context ctx, Collector<Tuple2<Bean1, Bean2>> out) throws Exception {
                            out.collect(new Tuple2<>(left, right));
                        }
                    });
    
            // 将流转换为动态表
            tableEnv.createTemporaryView("t1", bean1DS);
            tableEnv.createTemporaryView("t2", bean2DS);
    
            // 内连接 OnCreateAndWrite 右:OnCreateAndWrite
            // tableEnv.executeSql("select t1.id,t1.name,t2.sex from t1 join t2 on t1.id = t2.id").print();
    
            // 左外 左:OnReadAndWrite 右:OnCreateAndWrite
            tableEnv.executeSql("select t1.id,t1.name,t2.sex from t1 left join t2 on t1.id = t2.id").print();
    
            // 右外 左:OnCreateAndWrite 右:OnReadAndWrite
            // tableEnv.executeSql("select t1.id,t1.name,t2.sex from t1 right join t2 on t1.id = t2.id").print();
    
            // 全外连接 左:OnReadAndWrite 右:OnReadAndWrite
            // tableEnv.executeSql("select t1.id,t1.name,t2.sex from t1 right join t2 on t1.id = t2.id").print();
    
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    比如上述的t1表与t2表,以 t1 left t2 on t1.id = t2.id
    前提条件:状态设置了过期时间

    tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));
    
    • 1

    case1:
    t1先来一条数据,1001,zhangsan,1
    t2后来一条数据:1001,male,1
    与 t1进行关联, 如果t2表一直来数据,每条数据中间的时间间隔不超过10s, 则会一直输出。
    case2:
    t2先来一条数据:1001,male,1
    t1后来一条数据,1001,zhangsan,1
    如果在 t1表示中数据一直输入,过了10s之后,t2中的数据就会为空。也就是只保存10s.

    上述情况涉及到 state的TTL 过期时间

    ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("dt-state", String.class);
    StateTtlConfig stateTtlConfig = new StateTtlConfig.Builder(Time.days(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .build();
    valueStateDescriptor.enableTimeToLive(stateTtlConfig);
    dtState = getRuntimeContext().getState(valueStateDescriptor);
    
    sdf = new SimpleDateFormat("yyyy-MM-dd");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    TTL表示自动更新状态的过期时间

    /**
    	 * This option value configures when to update last access timestamp which prolongs state TTL.
     */
    public enum UpdateType {
    	/** TTL is disabled. State does not expire. */
    	Disabled,
    	/** Last access timestamp is initialised when state is created and updated on every write operation. */
    	OnCreateAndWrite,
    	/** The same as OnCreateAndWrite but also updated on read. */
    	OnReadAndWrite
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
  • 相关阅读:
    MySQL数据库面试题总结(2022最新版)
    【自然语言处理】基于python的问答系统实现
    计算机毕业设计JavaVue框架电商后台管理系统(源码+系统+mysql数据库+lw文档)
    muduo库中实现Protbuf编码器与消息分发器
    C语言实验四 循环结构程序设计(一)
    自动驾驶「准入试点」风向
    Rust for Linux:世界上最大的软件项目的内存安全问题有办法了!
    【Vue组件的样式绑定&事件监听和处理】
    亚马逊云科技加速大语言模型的创新应用
    Docker部署RabbitMQ
  • 原文地址:https://blog.csdn.net/weixin_42965737/article/details/126756056