• FlinkSQL系列01-编程入门


    引言

    FlinkSQL 是架构在 flink core 之上用 sql 语言方便快捷地进行结构化数据处理的上层库。

    核心工作原理如下

    • 将源数据流,绑定元数据(schema)后,注册成 catalog 中的表(table、view);
    • 然后由用户通过 table api 或者 SQL 来表达计算逻辑;
    • 由 table-planner 利用 apache calcite 进行 sql 语法解析,绑定元数据得到逻辑执行计划;
    • 再用 Optimizer 进行优化后,得到物理执行计划;
    • 物理计划经过代码生成器生成代码,得到 Transformation Tree;
    • Transformation Tree 转成 JobGraph 后提交到 flink 集群执行。

    FlinkSQL 中的表是动态表:

    1. 数据源的数据是持续输入
    2. 查询过程是持续计算
    3. 查询结果是持续输出

    FlinkSQL编程

    FlinkSQL编程4步曲:

    1. 创建FlinkSQL编程入口
    2. 将数据源定义(映射)成表(视图)
    3. 执行 sql 语义的查询(sql 语法或 tableAPI)
    4. 将查询结果输出到目标表

    编程入口

    创建方式一:(纯粹表环境)

    EnvironmentSettings envSettings = EnvironmentSettings.inStreamingMode();  // 流计算模式
    TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    
    • 1
    • 2

    创建方式二:(便于 sql 和 core 结合编程)

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    • 1
    • 2
    • 3

    编程方式

    • Table API
    • Table SQL

    API 和 SQL 也可以很容易地混合,因为 Table 对象可以和 sql 表进行方便地互转。

    示例:

    tableEnv.executeSql(
            "create table t_kafka                                        "
                    + " (                                                   "
                    + "   id int,                                           "
                    + "   name string,                                      "
                    + "   age int,                                          "
                    + "   gender string                                     "
                    + " )                                                   "
                    + " WITH (                                              "
                    + "  'connector' = 'kafka',                             "
                    + "  'topic' = 'mytopic',                              "
                    + "  'properties.bootstrap.servers' = 'doitedu:9092',   "
                    + "  'properties.group.id' = 'g1',                      "
                    + "  'scan.startup.mode' = 'earliest-offset',           "
                    + "  'format' = 'json',                                 "
                    + "  'json.fail-on-missing-field' = 'false',            "
                    + "  'json.ignore-parse-errors' = 'true'                "
                    + " )                                                   "
    );
    
    /**
     * 把sql表名, 转成table对象
     */
    Table table = tableEnv.from("t_kafka");
    // 利用table api进行查询计算
    table.groupBy($("gender"))
            .select($("gender"), $("age").avg())
            .execute()
            .print();
    
    // 利用 Table SQL 进行查询计算
    tableEnv.executeSql("select gender, avg(age) as avg_age from t_kafka group by gender").print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
  • 相关阅读:
    HarmonyOS hsp制作与引用
    webgoat-(A1)SQL Injection
    Linux操作系统——安装RPM包或源码包
    java基础之循环(5)
    模拟量信号干扰的主要原因及解决方案
    [Games 101] Lecture 10 Geometry 1 (Introduction)
    2022-07-20 Android 11 SELinux avc 修改sys目录下面某个节点的权限
    JavaScript作用域(作用域概述、变量的作用域、作用域链)、JavaScript预解析(特殊案例)
    动态规划的算法题以及其python实现
    Vue08 事件的基本使用
  • 原文地址:https://blog.csdn.net/qq_17310871/article/details/126558752