Flink SQL 内置了很多常见的数据类型,并且也为用户提供了自定义数据类型的能力。
总共包含 3 部分:
CHAR、CHAR(n):定长字符串,就和 Java 中的 Char 一样,n 代表字符的定长,取值范围
[
1
,
2147483647
]
[1, 2147483647]
[1,2147483647]。如果不指定 n,则默认为 1。VARCHAR、VARCHAR(n)、STRING:可变长字符串,就和 Java 中的 String 一样,n 代表字符的最大长度,取值范围
[
1
,
2147483647
]
[1, 2147483647]
[1,2147483647]。如果不指定 n,则默认为 1。STRING 等同于 VARCHAR(2147483647)。BINARY、BINARY(n):定长二进制字符串,n 代表定长,取值范围
[
1
,
2147483647
]
[1, 2147483647]
[1,2147483647]。如果不指定 n,则默认为 1。VARBINARY、VARBINARY(n)、BYTES:可变长二进制字符串,n 代表字符的最大长度,取值范围
[
1
,
2147483647
]
[1, 2147483647]
[1,2147483647]。如果不指定 n,则默认为 1。BYTES 等同于 VARBINARY(2147483647)。DECIMAL、DECIMAL(p)、DECIMAL(p, s)、DEC、DEC(p)、DEC(p, s)、NUMERIC、NUMERIC(p)、NUMERIC(p, s):固定长度和精度的数值类型,就和 Java 中的 BigDecimal 一样,p 代表 数值位数(长度),取值范围
[
1
,
38
]
[1, 38]
[1,38];s 代表 小数点后的位数(精度),取值范围
[
0
,
p
]
[0, p]
[0,p]。如果不指定,p 默认为 10,s 默认为 0。TINYINT:
−
128
-128
−128 到
127
127
127 的 1 字节大小的有符号整数,就和 Java 中的 byte 一样。SMALLINT:
−
32768
-32768
−32768 到
32767
32767
32767 的 2 字节大小的有符号整数,就和 Java 中的 short 一样。INT、INTEGER:
−
2147483648
-2147483648
−2147483648 到
2147483647
2147483647
2147483647 的 4 字节大小的有符号整数,就和 Java 中的 int 一样。BIGINT:
−
9223372036854775808
-9223372036854775808
−9223372036854775808 到
9223372036854775807
9223372036854775807
9223372036854775807 的 8 字节大小的有符号整数,就和 Java 中的 long 一样。FLOAT:4 字节大小的单精度浮点数值,就和 Java 中的 float 一样。DOUBLE、DOUBLE PRECISION:8 字节大小的双精度浮点数值,就和 Java 中的 double 一样。关于 FLOAT 和 DOUBLE 的区别可见 https://www.runoob.com/w3cnote/float-and-double-different.html
NULL 类型:NULL。Raw 类型:RAW('class', 'snapshot')。只会在数据发生网络传输时进行序列化,反序列化操作,可以保留其原始数据。以 Java 举例,class 参数代表具体对应的 Java 类型,snapshot 代表类型在发生网络传输时的序列化器。DATE:由 年-月-日 组成的 不带时区含义 的日期类型,取值范围 [0000-01-01, 9999-12-31]TIME、TIME(p):由 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的时间数据类型,精度高达纳秒,取值范围 [00:00:00.000000000, 23:59:59.9999999]。其中 p 代表小数秒的位数,取值范围
[
0
,
9
]
[0, 9]
[0,9],如果不指定 p,默认为 0。TIMESTAMP、TIMESTAMP(p)、TIMESTAMP WITHOUT TIME ZONE、TIMESTAMP(p) WITHOUT TIME ZONE:由 年-月-日 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中 p 代表小数秒的位数,取值范围
[
0
,
9
]
[0, 9]
[0,9],如果不指定 p,默认为 6。TIMESTAMP WITH TIME ZONE、TIMESTAMP(p) WITH TIME ZONE:由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围
[
0
,
9
]
[0, 9]
[0,9],如果不指定 p,默认为 6。TIMESTAMP_LTZ、TIMESTAMP_LTZ(p):由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围
[
0
,
9
]
[0, 9]
[0,9],如果不指定 p,默认为 6。
TIMESTAMP_LTZ 与 TIMESTAMP WITH TIME ZONE 的区别在于:TIMESTAMP WITH TIME ZONE 的时区信息是携带在数据中的,举例:其输入数据应该是 2022-01-01 00:00:00.000000000 +08:00;TIMESTAMP_LTZ 的时区信息不是携带在数据中的,而是由 Flink SQL 任务的全局配置决定的,我们可以由 table.local-time-zone 参数来设置时区。INTERVAL YEAR TO MONTH、INTERVAL DAY TO SECOND:INTERVAL 的涉及到的种类比较多。INTERVAL 主要是用于给 TIMESTAMP、TIMESTAMP_LTZ 添加偏移量的。举例,比如给 TIMESTAMP 加、减几天、几个月、几年。INTERVAL 子句总共涉及到的语法种类如下 Flink SQL 案例所示。CREATE TABLE sink_table (
result_interval_year TIMESTAMP(3),
result_interval_year_p TIMESTAMP(3),
result_interval_year_p_to_month TIMESTAMP(3),
result_interval_month TIMESTAMP(3),
result_interval_day TIMESTAMP(3),
result_interval_day_p1 TIMESTAMP(3),
result_interval_day_p1_to_hour TIMESTAMP(3),
result_interval_day_p1_to_minute TIMESTAMP(3),
result_interval_day_p1_to_second_p2 TIMESTAMP(3),
result_interval_hour TIMESTAMP(3),
result_interval_hour_to_minute TIMESTAMP(3),
result_interval_hour_to_second TIMESTAMP(3),
result_interval_minute TIMESTAMP(3),
result_interval_minute_to_second_p2 TIMESTAMP(3),
result_interval_second TIMESTAMP(3),
result_interval_second_p2 TIMESTAMP(3)
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
-- Flink SQL 支持的所有 INTERVAL 子句如下,总体可以分为 `年-月`、`日-小时-秒` 两种
-- 1. 年-月。取值范围为 [-9999-11, +9999-11]
-- 其中 p 是指有效位数,取值范围 [1, 4],默认值为 2。比如如果值为 1000,但是 p = 2,则会直接报错。
-- INTERVAL YEAR
f1 + INTERVAL '10' YEAR as result_interval_year,
-- INTERVAL YEAR(p)
f1 + INTERVAL '100' YEAR(3) as result_interval_year_p,
-- INTERVAL YEAR(p) TO MONTH
f1 + INTERVAL '10-03' YEAR(3) TO MONTH as result_interval_year_p_to_month,
-- INTERVAL MONTH
f1 + INTERVAL '13' MONTH as result_interval_month,
-- 2. 日-小时-秒。取值范围为 [-999999 23:59:59.999999999, +999999 23:59:59.999999999]
-- 其中 p1/p2 都是有效位数,p1 取值范围 [1, 6],默认值为 2;p2 取值范围 [0, 9],默认值为 6
-- INTERVAL DAY
f1 + INTERVAL '10' DAY as result_interval_day,
-- INTERVAL DAY(p1)
f1 + INTERVAL '100' DAY(3) as result_interval_day_p1,
-- INTERVAL DAY(p1) TO HOUR
f1 + INTERVAL '10 03' DAY(3) TO HOUR as result_interval_day_p1_to_hour,
-- INTERVAL DAY(p1) TO MINUTE
f1 + INTERVAL '10 03:12' DAY(3) TO MINUTE as result_interval_day_p1_to_minute,
-- INTERVAL DAY(p1) TO SECOND(p2)
f1 + INTERVAL '10 00:00:00.004' DAY TO SECOND(3) as result_interval_day_p1_to_second_p2,
-- INTERVAL HOUR
f1 + INTERVAL '10' HOUR as result_interval_hour,
-- INTERVAL HOUR TO MINUTE
f1 + INTERVAL '10:03' HOUR TO MINUTE as result_interval_hour_to_minute,
-- INTERVAL HOUR TO SECOND(p2)
f1 + INTERVAL '00:00:00.004' HOUR TO SECOND(3) as result_interval_hour_to_second,
-- INTERVAL MINUTE
f1 + INTERVAL '10' MINUTE as result_interval_minute,
-- INTERVAL MINUTE TO SECOND(p2)
f1 + INTERVAL '05:05.006' MINUTE TO SECOND(3) as result_interval_minute_to_second_p2,
-- INTERVAL SECOND
f1 + INTERVAL '3' SECOND as result_interval_second,
-- INTERVAL SECOND(p2)
f1 + INTERVAL '300' SECOND(3) as result_interval_second_p2
FROM (SELECT TO_TIMESTAMP_LTZ(1640966476500, 3) as f1)
ARRAY、t ARRAY。数组最大长度为
2147483647
2147483647
2147483647。t 代表数组内的数据类型。举例 ARRAY、ARRAY,其等同于 INT ARRAY、STRING ARRAY。MAP。Map 类型就和 Java 中的 Map 类型一样,key 是没有重复的。举例 Map、Map。MULTISET、t MULTISET。就和 Java 中的 List 类型,一样,运行重复的数据。举例 MULTISET,其等同于 INT MULTISET。ROW、ROW、ROW(n0 t0, n1 t1, ...)、ROW(n0 t0 'd0', n1 t1 'd1', ...)。其中,n 是字段的唯一名称,t 是字段的逻辑类型,d 是字段的描述。就和 Java 中的自定义对象一样。举例:ROW(myField INT, myOtherField BOOLEAN),其等同于 ROW用户自定义类型就是运行用户使用 Java 等语言自定义一个数据类型出来。但是目前数据类型不支持使用 CREATE TABLE 的 DDL 进行定义,只支持作为函数的输入输出参数。如下案例:
public class User {
// 1. 基础类型,Flink 可以通过反射类型信息自动把数据类型获取到
// 关于 SQL 类型和 Java 类型之间的映射见:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/types/#data-type-extraction
public int age;
public String name;
// 2. 复杂类型,用户可以通过 @DataTypeHint("DECIMAL(10, 2)") 注解标注此字段的数据类型
public @DataTypeHint("DECIMAL(10, 2)") BigDecimal totalBalance;
}
public class UserScalarFunction extends ScalarFunction {
// 1. 自定义数据类型作为输出参数
public User eval(long i) {
if (i > 0 && i <= 5) {
User u = new User();
u.age = (int) i;
u.name = "name1";
u.totalBalance = new BigDecimal(1.1d);
return u;
} else {
User u = new User();
u.age = (int) i;
u.name = "name2";
u.totalBalance = new BigDecimal(2.2d);
return u;
}
}
// 2. 自定义数据类型作为输入参数
public String eval(User i) {
if (i.age > 0 && i.age <= 5) {
User u = new User();
u.age = 1;
u.name = "name1";
u.totalBalance = new BigDecimal(1.1d);
return u.name;
} else {
User u = new User();
u.age = 2;
u.name = "name2";
u.totalBalance = new BigDecimal(2.2d);
return u.name;
}
}
}
-- 1. 创建 UDF
CREATE FUNCTION user_scalar_func AS 'flink.examples.sql._12_data_type._02_user_defined.UserScalarFunction';
-- 2. 创建数据源表
CREATE TABLE source_table (
user_id BIGINT NOT NULL COMMENT '用户 id'
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10'
);
-- 3. 创建数据汇表
CREATE TABLE sink_table (
result_row_1 ROW<age INT, name STRING, totalBalance DECIMAL(10, 2)>,
result_row_2 STRING
) WITH (
'connector' = 'print'
);
-- 4. SQL 查询语句
INSERT INTO sink_table
select
-- 4.a. 用户自定义类型作为输出
user_scalar_func(user_id) as result_row_1,
-- 4.b. 用户自定义类型作为输出及输入
user_scalar_func(user_scalar_func(user_id)) as result_row_2
from source_table;
-- 5. 查询结果
+I[+I[9, name2, 2.20], name2]
+I[+I[1, name1, 1.10], name1]
+I[+I[5, name1, 1.10], name1]