FlinkSQL 是架构在 flink core 之上用 sql 语言方便快捷地进行结构化数据处理的上层库。
核心工作原理如下:
- 将源数据流,绑定元数据(schema)后,注册成 catalog 中的表(table、view);
- 然后由用户通过 table api 或者 SQL 来表达计算逻辑;
- 由 table-planner 利用 apache calcite 进行 sql 语法解析,绑定元数据得到逻辑执行计划;
- 再用 Optimizer 进行优化后,得到物理执行计划;
- 物理计划经过代码生成器生成代码,得到 Transformation Tree;
- Transformation Tree 转成 JobGraph 后提交到 flink 集群执行。
FlinkSQL 中的表是动态表:
FlinkSQL编程4步曲:
创建方式一:(纯粹表环境)
EnvironmentSettings envSettings = EnvironmentSettings.inStreamingMode(); // 流计算模式
TableEnvironment tableEnv = TableEnvironment.create(envSettings);
创建方式二:(便于 sql 和 core 结合编程)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
API 和 SQL 也可以很容易地混合,因为 Table 对象可以和 sql 表进行方便地互转。
示例:
tableEnv.executeSql(
"create table t_kafka "
+ " ( "
+ " id int, "
+ " name string, "
+ " age int, "
+ " gender string "
+ " ) "
+ " WITH ( "
+ " 'connector' = 'kafka', "
+ " 'topic' = 'mytopic', "
+ " 'properties.bootstrap.servers' = 'doitedu:9092', "
+ " 'properties.group.id' = 'g1', "
+ " 'scan.startup.mode' = 'earliest-offset', "
+ " 'format' = 'json', "
+ " 'json.fail-on-missing-field' = 'false', "
+ " 'json.ignore-parse-errors' = 'true' "
+ " ) "
);
/**
* 把sql表名, 转成table对象
*/
Table table = tableEnv.from("t_kafka");
// 利用table api进行查询计算
table.groupBy($("gender"))
.select($("gender"), $("age").avg())
.execute()
.print();
// 利用 Table SQL 进行查询计算
tableEnv.executeSql("select gender, avg(age) as avg_age from t_kafka group by gender").print();