• 【Flink SQL】Flink SQL 基础概念:数据类型


    Flink SQL 内置了很多常见的数据类型,并且也为用户提供了自定义数据类型的能力。

    总共包含 3 部分:

    • 原子 数据类型
    • 复合 数据类型
    • 用户自定义 数据类型

    1.原子数据类型

    1.1 字符串类型

    • CHARCHAR(n):定长字符串,就和 Java 中的 Char 一样,n 代表字符的定长,取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定 n,则默认为 1。
    • VARCHARVARCHAR(n)STRING:可变长字符串,就和 Java 中的 String 一样,n 代表字符的最大长度,取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定 n,则默认为 1。STRING 等同于 VARCHAR(2147483647)

    1.2 二进制字符串类型

    • BINARYBINARY(n):定长二进制字符串,n 代表定长,取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定 n,则默认为 1。
    • VARBINARYVARBINARY(n)BYTES:可变长二进制字符串,n 代表字符的最大长度,取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定 n,则默认为 1。BYTES 等同于 VARBINARY(2147483647)

    1.3 精确数值类型

    • DECIMALDECIMAL(p)DECIMAL(p, s)DECDEC(p)DEC(p, s)NUMERICNUMERIC(p)NUMERIC(p, s):固定长度和精度的数值类型,就和 Java 中的 BigDecimal 一样,p 代表 数值位数(长度),取值范围 [ 1 , 38 ] [1, 38] [1,38]s 代表 小数点后的位数(精度),取值范围 [ 0 , p ] [0, p] [0,p]。如果不指定,p 默认为 10,s 默认为 0。
    • TINYINT − 128 -128 128 127 127 127 的 1 字节大小的有符号整数,就和 Java 中的 byte 一样。
    • SMALLINT − 32768 -32768 32768 32767 32767 32767 的 2 字节大小的有符号整数,就和 Java 中的 short 一样。
    • INTINTEGER − 2147483648 -2147483648 2147483648 2147483647 2147483647 2147483647 的 4 字节大小的有符号整数,就和 Java 中的 int 一样。
    • BIGINT − 9223372036854775808 -9223372036854775808 9223372036854775808 9223372036854775807 9223372036854775807 9223372036854775807 的 8 字节大小的有符号整数,就和 Java 中的 long 一样。

    1.4 有损精度数值类型

    • FLOAT:4 字节大小的单精度浮点数值,就和 Java 中的 float 一样。
    • DOUBLEDOUBLE PRECISION:8 字节大小的双精度浮点数值,就和 Java 中的 double 一样。

    关于 FLOATDOUBLE 的区别可见 https://www.runoob.com/w3cnote/float-and-double-different.html

    1.5 布尔类型:BOOLEAN

    • NULL 类型:NULL。
    • Raw 类型:RAW('class', 'snapshot')。只会在数据发生网络传输时进行序列化,反序列化操作,可以保留其原始数据。以 Java 举例,class 参数代表具体对应的 Java 类型,snapshot 代表类型在发生网络传输时的序列化器。

    1.6 日期、时间类型

    • DATE:由 年-月-日 组成的 不带时区含义 的日期类型,取值范围 [0000-01-01, 9999-12-31]
    • TIMETIME(p):由 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的时间数据类型,精度高达纳秒,取值范围 [00:00:00.000000000, 23:59:59.9999999]。其中 p 代表小数秒的位数,取值范围 [ 0 , 9 ] [0, 9] [0,9],如果不指定 p,默认为 0。
    • TIMESTAMPTIMESTAMP(p)TIMESTAMP WITHOUT TIME ZONETIMESTAMP(p) WITHOUT TIME ZONE:由 年-月-日 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中 p 代表小数秒的位数,取值范围 [ 0 , 9 ] [0, 9] [0,9],如果不指定 p,默认为 6。
    • TIMESTAMP WITH TIME ZONETIMESTAMP(p) WITH TIME ZONE:由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [ 0 , 9 ] [0, 9] [0,9],如果不指定 p,默认为 6。
    • TIMESTAMP_LTZTIMESTAMP_LTZ(p):由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [ 0 , 9 ] [0, 9] [0,9],如果不指定 p,默认为 6。
      • TIMESTAMP_LTZTIMESTAMP WITH TIME ZONE 的区别在于:TIMESTAMP WITH TIME ZONE 的时区信息是携带在数据中的,举例:其输入数据应该是 2022-01-01 00:00:00.000000000 +08:00TIMESTAMP_LTZ 的时区信息不是携带在数据中的,而是由 Flink SQL 任务的全局配置决定的,我们可以由 table.local-time-zone 参数来设置时区。
    • INTERVAL YEAR TO MONTHINTERVAL DAY TO SECONDINTERVAL 的涉及到的种类比较多。INTERVAL 主要是用于给 TIMESTAMPTIMESTAMP_LTZ 添加偏移量的。举例,比如给 TIMESTAMP 加、减几天、几个月、几年。INTERVAL 子句总共涉及到的语法种类如下 Flink SQL 案例所示。
    CREATE TABLE sink_table (
        result_interval_year TIMESTAMP(3),
        result_interval_year_p TIMESTAMP(3),
        result_interval_year_p_to_month TIMESTAMP(3),
        result_interval_month TIMESTAMP(3),
        result_interval_day TIMESTAMP(3),
        result_interval_day_p1 TIMESTAMP(3),
        result_interval_day_p1_to_hour TIMESTAMP(3),
        result_interval_day_p1_to_minute TIMESTAMP(3),
        result_interval_day_p1_to_second_p2 TIMESTAMP(3),
        result_interval_hour TIMESTAMP(3),
        result_interval_hour_to_minute TIMESTAMP(3),
        result_interval_hour_to_second TIMESTAMP(3),
        result_interval_minute TIMESTAMP(3),
        result_interval_minute_to_second_p2 TIMESTAMP(3),
        result_interval_second TIMESTAMP(3),
        result_interval_second_p2 TIMESTAMP(3)
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO sink_table
    SELECT
        -- Flink SQL 支持的所有 INTERVAL 子句如下,总体可以分为 `年-月`、`日-小时-秒` 两种
    
        -- 1. 年-月。取值范围为 [-9999-11, +9999-11]
        -- 其中 p 是指有效位数,取值范围 [1, 4],默认值为 2。比如如果值为 1000,但是 p = 2,则会直接报错。
        -- INTERVAL YEAR
        f1 + INTERVAL '10' YEAR as result_interval_year,
        -- INTERVAL YEAR(p)
        f1 + INTERVAL '100' YEAR(3) as result_interval_year_p,
        -- INTERVAL YEAR(p) TO MONTH
        f1 + INTERVAL '10-03' YEAR(3) TO MONTH as result_interval_year_p_to_month,
        -- INTERVAL MONTH
        f1 + INTERVAL '13' MONTH as result_interval_month,
    
        -- 2. 日-小时-秒。取值范围为 [-999999 23:59:59.999999999, +999999 23:59:59.999999999]
        -- 其中 p1/p2 都是有效位数,p1 取值范围 [1, 6],默认值为 2;p2 取值范围 [0, 9],默认值为 6
        -- INTERVAL DAY
        f1 + INTERVAL '10' DAY as result_interval_day,
        -- INTERVAL DAY(p1)
        f1 + INTERVAL '100' DAY(3) as result_interval_day_p1,
        -- INTERVAL DAY(p1) TO HOUR
        f1 + INTERVAL '10 03' DAY(3) TO HOUR as result_interval_day_p1_to_hour,
        -- INTERVAL DAY(p1) TO MINUTE
        f1 + INTERVAL '10 03:12' DAY(3) TO MINUTE as result_interval_day_p1_to_minute,
        -- INTERVAL DAY(p1) TO SECOND(p2)
        f1 + INTERVAL '10 00:00:00.004' DAY TO SECOND(3) as result_interval_day_p1_to_second_p2,
        -- INTERVAL HOUR
        f1 + INTERVAL '10' HOUR as result_interval_hour,
        -- INTERVAL HOUR TO MINUTE
        f1 + INTERVAL '10:03' HOUR TO MINUTE as result_interval_hour_to_minute,
        -- INTERVAL HOUR TO SECOND(p2)
        f1 + INTERVAL '00:00:00.004' HOUR TO SECOND(3) as result_interval_hour_to_second,
        -- INTERVAL MINUTE
        f1 + INTERVAL '10' MINUTE as result_interval_minute,
        -- INTERVAL MINUTE TO SECOND(p2)
        f1 + INTERVAL '05:05.006' MINUTE TO SECOND(3) as result_interval_minute_to_second_p2,
        -- INTERVAL SECOND
        f1 + INTERVAL '3' SECOND as result_interval_second,
        -- INTERVAL SECOND(p2)
        f1 + INTERVAL '300' SECOND(3) as result_interval_second_p2
    FROM (SELECT TO_TIMESTAMP_LTZ(1640966476500, 3) as f1)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    2.复合数据类型

    • 数组类型ARRAYt ARRAY。数组最大长度为 2147483647 2147483647 2147483647t 代表数组内的数据类型。举例 ARRAYARRAY,其等同于 INT ARRAYSTRING ARRAY
    • Map 类型MAP。Map 类型就和 Java 中的 Map 类型一样,key 是没有重复的。举例 MapMap
    • 集合类型MULTISETt MULTISET。就和 Java 中的 List 类型,一样,运行重复的数据。举例 MULTISET,其等同于 INT MULTISET
    • 对象类型ROWROWROW(n0 t0, n1 t1, ...)ROW(n0 t0 'd0', n1 t1 'd1', ...)。其中,n 是字段的唯一名称,t 是字段的逻辑类型,d 是字段的描述。就和 Java 中的自定义对象一样。举例:ROW(myField INT, myOtherField BOOLEAN),其等同于 ROW

    3.用户自定义数据类型

    用户自定义类型就是运行用户使用 Java 等语言自定义一个数据类型出来。但是目前数据类型不支持使用 CREATE TABLE 的 DDL 进行定义,只支持作为函数的输入输出参数。如下案例:

    • 第一步,自定义数据类型
    public class User {
    
        // 1. 基础类型,Flink 可以通过反射类型信息自动把数据类型获取到
        // 关于 SQL 类型和 Java 类型之间的映射见:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/types/#data-type-extraction
        public int age;
        public String name;
    
        // 2. 复杂类型,用户可以通过 @DataTypeHint("DECIMAL(10, 2)") 注解标注此字段的数据类型
        public @DataTypeHint("DECIMAL(10, 2)") BigDecimal totalBalance;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 第二步,在 UDF 中使用此数据类型
    public class UserScalarFunction extends ScalarFunction {
    
        // 1. 自定义数据类型作为输出参数
        public User eval(long i) {
            if (i > 0 && i <= 5) {
                User u = new User();
                u.age = (int) i;
                u.name = "name1";
                u.totalBalance = new BigDecimal(1.1d);
                return u;
            } else {
                User u = new User();
                u.age = (int) i;
                u.name = "name2";
                u.totalBalance = new BigDecimal(2.2d);
                return u;
            }
        }
        
        // 2. 自定义数据类型作为输入参数
        public String eval(User i) {
            if (i.age > 0 && i.age <= 5) {
                User u = new User();
                u.age = 1;
                u.name = "name1";
                u.totalBalance = new BigDecimal(1.1d);
                return u.name;
            } else {
                User u = new User();
                u.age = 2;
                u.name = "name2";
                u.totalBalance = new BigDecimal(2.2d);
                return u.name;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 第三步,在 Flink SQL 中使用
    -- 1. 创建 UDF
    CREATE FUNCTION user_scalar_func AS 'flink.examples.sql._12_data_type._02_user_defined.UserScalarFunction';
    
    -- 2. 创建数据源表
    CREATE TABLE source_table (
        user_id BIGINT NOT NULL COMMENT '用户 id'
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.user_id.min' = '1',
      'fields.user_id.max' = '10'
    );
    
    -- 3. 创建数据汇表
    CREATE TABLE sink_table (
        result_row_1 ROW<age INT, name STRING, totalBalance DECIMAL(10, 2)>,
        result_row_2 STRING
    ) WITH (
      'connector' = 'print'
    );
    
    -- 4. SQL 查询语句
    INSERT INTO sink_table
    select
        -- 4.a. 用户自定义类型作为输出
        user_scalar_func(user_id) as result_row_1,
        -- 4.b. 用户自定义类型作为输出及输入
        user_scalar_func(user_scalar_func(user_id)) as result_row_2
    from source_table;
    
    -- 5. 查询结果
    +I[+I[9, name2, 2.20], name2]
    +I[+I[1, name1, 1.10], name1]
    +I[+I[5, name1, 1.10], name1]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    参考:《Data Types | Apache Flink

  • 相关阅读:
    【云原生 | Kubernetes 系列】---Prometheus监控Nginx
    CTFHub(web sql)(四)
    基于可视图法(VG)的路径规划算法简述
    javascript跨域传输数据的设置和兼容浏览函数代码
    项目管理:如何建立一个具有执行力的团队?
    华云数据×天融信 | 提升云安全能力 联合打造“数据中心云安全资源池解决方案”
    猿创征文 |【C++】C++中的引用
    设计模式-责任链模式
    git bash 常见场景用法
    在大厂工作是这样的
  • 原文地址:https://blog.csdn.net/be_racle/article/details/136634313