• 大数据(9h)FlinkSQL双流JOIN、Lookup Join


    重点是Lookup JoinProcessing Time Temporal Join,其它随意

    1、环境

    WIN10+IDEA2021+JDK1.8+本地MySQL8

    <properties>
        <maven.compiler.source>8maven.compiler.source>
        <maven.compiler.target>8maven.compiler.target>
        <flink.version>1.13.6flink.version>
        <scala.binary.version>2.12scala.binary.version>
        <slf4j.version>2.0.3slf4j.version>
        <log4j.version>2.17.2log4j.version>
        <fastjson.version>2.0.19fastjson.version>
        <lombok.version>1.18.24lombok.version>
    properties>
    
    <dependencies>
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-javaartifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-java_${scala.binary.version}artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-clients_${scala.binary.version}artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-runtime-web_${scala.binary.version}artifactId>
            <version>${flink.version}version>
        dependency>
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-csvartifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-jsonartifactId>
            <version>${flink.version}version>
        dependency>
        
        <dependency>
            <groupId>org.slf4jgroupId>
            <artifactId>slf4j-apiartifactId>
            <version>${slf4j.version}version>
        dependency>
        <dependency>
            <groupId>org.slf4jgroupId>
            <artifactId>slf4j-log4j12artifactId>
            <version>${slf4j.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.logging.log4jgroupId>
            <artifactId>log4j-to-slf4jartifactId>
            <version>${log4j.version}version>
        dependency>
        
        <dependency>
            <groupId>com.alibabagroupId>
            <artifactId>fastjsonartifactId>
            <version>${fastjson.version}version>
        dependency>
        
        <dependency>
            <groupId>org.projectlombokgroupId>
            <artifactId>lombokartifactId>
            <version>${lombok.version}version>
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    2、Temporal Joins

    2.1、基于处理时间(重点)

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    public class Hi {
        public static void main(String[] args) {
            //创建流执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
            //创建流式表执行环境
            StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
            //双流
            DataStreamSource<Tuple2<String, Integer>> d1 = env.fromElements(
                    Tuple2.of("a", 2),
                    Tuple2.of("b", 3));
            DataStreamSource<P> d2 = env.fromElements(
                    new P("a", 4000L),
                    new P("b", 5000L));
            //创建临时视图
            tbEnv.createTemporaryView("v1", d1);
            tbEnv.createTemporaryView("v2", d2);
            //双流JOIN
            tbEnv.sqlQuery("SELECT * FROM v1 LEFT JOIN v2 ON v1.f0=v2.pid").execute().print();
        }
    
        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class P {
            private String pid;
            private Long timestamp;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    结果

    +----+-------+-------+-------------+-------------+
    | op |    f0 |    f1 |         pid |   timestamp |
    +----+-------+-------+-------------+-------------+
    | +I |     a |     2 |      (NULL) |      (NULL) |
    | -D |     a |     2 |      (NULL) |      (NULL) |
    | +I |     a |     2 |           a |        4000 |
    | +I |     b |     3 |      (NULL) |      (NULL) |
    | -D |     b |     3 |      (NULL) |      (NULL) |
    | +I |     b |     3 |           b |        5000 |
    +----+-------+-------+-------------+-------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2.1.1、设置状态保留时间

    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    import java.time.Duration;
    import java.util.Scanner;
    
    public class Hi {
        public static void main(String[] args) {
            //创建流执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
            //创建流式表执行环境
            StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
            //设置状态保留时间
            tbEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5L));
            //双流
            DataStreamSource<Tuple2<String, Long>> d1 = env.addSource(new AutomatedSource());
            DataStreamSource<String> d2 = env.addSource(new ManualSource());
            //创建临时视图
            tbEnv.createTemporaryView("v1", d1);
            tbEnv.createTemporaryView("v2", d2);
            //双流JOIN
            tbEnv.sqlQuery("SELECT * FROM v1 INNER JOIN v2 ON v1.f0=v2.f0").execute().print();
        }
    
        /** 手动输入的数据源(请输入a或b进行测试) */
        public static class ManualSource implements SourceFunction<String> {
            public ManualSource() {}
    
            @Override
            public void run(SourceFunction.SourceContext<String> sc) {
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    String str = scanner.nextLine().trim();
                    if (str.equals("STOP")) {break;}
                    if (!str.equals("")) {sc.collect(str);}
                }
                scanner.close();
            }
    
            @Override
            public void cancel() {}
        }
    
        /** 自动输入的数据源 */
        public static class AutomatedSource implements SourceFunction<Tuple2<String, Long>> {
            public AutomatedSource() {}
    
            @Override
            public void run(SourceFunction.SourceContext<Tuple2<String, Long>> sc) throws InterruptedException {
                for (long i = 0L; i < 999L; i++) {
                    Thread.sleep(1000L);
                    sc.collect(Tuple2.of("a", i));
                    sc.collect(Tuple2.of("b", i));
                }
            }
    
            @Override
            public void cancel() {}
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    测试结果

    2.2、基于事件时间

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    public class Hello {
        public static void main(String[] args) {
            //创建流和表的执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
            StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
            //创建数据流,设定水位线
            tbEnv.executeSql("CREATE TABLE v1 (" +
                    "  x STRING PRIMARY KEY," +
                    "  y BIGINT," +
                    "  ts AS to_timestamp(from_unixtime(y,'yyyy-MM-dd HH:mm:ss'))," +
                    "  watermark FOR ts AS ts - INTERVAL '2' SECOND" +
                    ") WITH (" +
                    "  'connector'='filesystem'," +
                    "  'path'='src/main/resources/a.csv'," +
                    "  'format'='csv'" +
                    ")");
            tbEnv.executeSql("CREATE TABLE v2 (" +
                    "  x STRING PRIMARY KEY," +
                    "  y BIGINT," +
                    "  ts AS to_timestamp(from_unixtime(y,'yyyy-MM-dd HH:mm:ss'))," +
                    "  watermark FOR ts AS ts - INTERVAL '2' SECOND" +
                    ") WITH (" +
                    "  'connector'='filesystem'," +
                    "  'path'='src/main/resources/b.csv'," +
                    "  'format'='csv'" +
                    ")");
            //执行查询
            tbEnv.sqlQuery("SELECT * " +
                    "FROM v1 " +
                    "LEFT JOIN v2 FOR SYSTEM_TIME AS OF v1.ts " +
                    "ON v1.x = v2.x"
            ).execute().print();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    打印结果

    +----+---+------------+-------------------------+--------+------------+-------------------------+
    | op | x |          y |                      ts |     x0 |         y0 |                     ts0 |
    +----+---+------------+-------------------------+--------+------------+-------------------------+
    | +I | a | 1666540800 | 2022-10-24 00:00:00.000 | (NULL) |     (NULL) |                  (NULL) |
    | +I | b | 1666540803 | 2022-10-24 00:00:03.000 |      b | 1666540802 | 2022-10-24 00:00:02.000 |
    | +I | c | 1666540806 | 2022-10-24 00:00:06.000 |      c | 1666540803 | 2022-10-24 00:00:03.000 |
    +----+---+------------+-------------------------+--------+------------+-------------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3、Lookup Join(重点)

    Lookup Join是基于Processing Time Temporal Join

    详细链接:https://yellow520.blog.csdn.net/article/details/128070761

    4、Interval Joins(基于间隔JOIN)

    SQL

    SELECT * FROM v1
    LEFT JOIN v2 ON v1.x=v2.x AND
    v1.y BETWEEN (v2.y - INTERVAL '2' SECOND) AND (v2.y + INTERVAL '1' SECOND)
    
    • 1
    • 2
    • 3

    Java

    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    import java.util.Scanner;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    public class Hi {
        public static void main(String[] args) {
            //创建流和表的执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
            StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
            //创建数据流
            DataStreamSource<String> d1 = env.addSource(new ManualSource());
            DataStreamSource<String> d2 = env.addSource(new AutomatedSource());
            //创建动态表,并声明一个额外的字段来作为处理时间字段
            Table t1 = tbEnv.fromDataStream(d1, $("x"), $("y").proctime());
            Table t2 = tbEnv.fromDataStream(d2, $("x"), $("y").proctime());
            //创建临时视图
            tbEnv.createTemporaryView("v1", t1);
            tbEnv.createTemporaryView("v2", t2);
            //执行查询
            tbEnv.sqlQuery("SELECT * FROM v1 LEFT JOIN v2 ON v1.x=v2.x AND " +
                    "v1.y BETWEEN (v2.y - INTERVAL '2' SECOND) AND (v2.y + INTERVAL '1' SECOND)"
            ).execute().print();
        }
    
        /** 手动输入的数据源 */
        public static class ManualSource implements SourceFunction<String> {
            public ManualSource() {}
    
            @Override
            public void run(SourceFunction.SourceContext<String> sc) {
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    String str = scanner.nextLine().trim();
                    if (str.equals("STOP")) {break;}
                    if (!str.equals("")) {sc.collect(str);}
                }
                scanner.close();
            }
    
            @Override
            public void cancel() {}
        }
    
        /** 自动输入的数据源 */
        public static class AutomatedSource implements SourceFunction<String> {
            public AutomatedSource() {}
    
            @Override
            public void run(SourceFunction.SourceContext<String> sc) throws InterruptedException {
                for (int i = 0; i < 999; i++) {
                    Thread.sleep(801);
                    sc.collect("a");
                    sc.collect("b");
                }
            }
    
            @Override
            public void cancel() {}
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    测试结果

  • 相关阅读:
    python库-dotenv包 | .env配置文件
    springboot整合https使用自签名证书实现浏览器和服务器之间的双向认证
    《DevOps实践指南》笔记:第2章
    Unity 关节:铰链、弹簧、固定、物理材质:摩檫力、 特效:拖尾、
    群晖内安装的windows虚拟机如何扩展磁盘(虚拟机如何扩展磁盘,解决扩展磁盘不生效的问题)
    分享关于职场心态
    猿创征文|date-fns 小时助手函数
    用 flink 插件chunjun实现全量+增量同步-达梦数据库到postgresql
    C/C++ 11/14/17 有栈式协同程式的基础框架类库【关于】
    面试题-多线程-解释什么是死锁( deadlock )
  • 原文地址:https://blog.csdn.net/Yellow_python/article/details/128059543