表标识由3部分组成:
// 指定使用地表标识
TableEnvironment tEnv = ...;
tEnv.useCatalog("a_catalog");
tEnv.useDatabase("db1");
// 将 table 注册到指定标识中
Table table = ...;
// 注册在默认 catalog 的默认 database 中
tableEnv.createTemporaryView("a_view", table);
// 注册在默认 catalog 的指定 database 中
tableEnv.createTemporaryView("db2.a_view", table);
// 注册在指定 catalog 的指定 database 中
tableEnv.createTemporaryView("x_catalog.db3.a_view", table);
Flinksql 中的表,可以是 virtual 的(view 视图)和 regular 的(table 常规表)
不管是 table 还是 view, 在 tableAPI 中得到的都是 Table 对象
区别:schema 信息是否被持久化存储
Table 对象获取方式解析:
从已注册的表
Table t1 = tenv.from("t1"); // 通过已经在 env 的 catalog 中注册的表名, 获得 Table 对象
从 TableDescriptor(连接器/format/schema/options)
Table table = tenv.from(TableDescriptor
.forConnector("kafka") // 指定连接器
.schema(Schema.newBuilder() // 指定表结构
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.STRING())
.build())
.format("json") // 指定数据源的数据格式
.option("topic", "doit30-3") // 连接器及format格式的相关参数
.option("properties.bootstrap.servers", "doit01:9092")
.option("properties.group.id", "g2")
.option("scan.startup.mode", "earliest-offset")
.option("json.fail-on-missing-field", "false")
.option("json.ignore-parse-errors", "true")
.build());
从 DataStream
// 1 不指定schema,将流创建成Table对象,表的schema是默认的,往往不符合我们的要求
Table table1 = tenv.fromDataStream(kfkStream);
/*table1.execute().print();*/
// 2 为了获得更理想的表结构,可以先把数据流中的数据转成javabean类型
SingleOutputStreamOperator<Person> javaBeanStream = kfkStream.map(json -> JSON.parseObject(json, Person.class));
Table table2 = tenv.fromDataStream(javaBeanStream);
/*table2.execute().print();*/
// 3 手动指定 schema定义,来将一个javabean流,转成Table对象
Table table3 = tenv.fromDataStream(javaBeanStream,
Schema.newBuilder()
.column("id",DataTypes.BIGINT())
.column("name",DataTypes.STRING())
.column("age",DataTypes.INT())
.column("gender",DataTypes.STRING())
.build());
从 Table 对象上的查询 api 生成
Table table = table3.select($("guid"), $("uuid"));
从测试数据
Table table = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id",DataTypes.INT()),
DataTypes.FIELD("name",DataTypes.STRING()),
DataTypes.FIELD("age",DataTypes.DOUBLE())
),
Row.of(1, "zs", 18.2),
Row.of(2, "bb", 28.2),
Row.of(3, "cc", 16.2),
Row.of(4, "dd", 38.2)
);
从已存在的 datastream 注册
tenv.createTemporaryView("t_person", javaBeanStream);
tenv.executeSql("select gender,max(age) as max_age from t_person group by gender").print();
从已存在的 Table 对象注册
tenv.createTemporaryView("t1",table);
从 TableDescriptor(连接器) 注册
tenv.createTable("table_a", // 表名
TableDescriptor.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.STRING())
.build())
.format("csv")
.option("path", "data/sqldemo/a.txt")
.option("csv.ignore-parse-errors", "true")
.build());
tenv.executeSql("select * from table_a").print();
执行 Sql 的 DDL 语句来注册
tenv.executeSql(
"CREATE TABLE age_info "
+ "( "
+ " id INT, "
+ " name string, "
+ " gender string, "
+ " age int, "
+ " PRIMARY KEY (id) NOT ENFORCED "
+ ") WITH ( "
+ " 'connector' = 'mysql-cdc', "
+ " 'hostname' = 'hdp01', "
+ " 'port' = '3306', "
+ " 'username' = 'root', "
+ " 'password' = '123456', "
+ " 'database-name' = 'abc', "
+ " 'table-name' = 'age_info' "
+ " ) ");
tenv.executeSql("select * from age_info").print();
catalog 就是一个元数据空间,简单说就是记录、获取元数据(表定义信息)的实体;
flinksql 在运行时,可以拥有多个 catalog,它们由 catalogManager 模块来注册、管理;
环境创建之初,就会初始化一个默认的元数据空间
用户还可以向环境中注册更多的 catalog,如下代码新增注册了一个 hivecatalog
// 创建了一个 hive 元数据空间的实现对象
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/hiveconf/dir");
// 将 hive 元数据空间对象注册到 环境中
tenv.registerCatalog("mycatalog",hiveCatalog);