Mysql和oracle在cdc sql中的数据类型映射
MySQL CDC Connector — Flink CDC documentation
MySQL type | Flink SQL type |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
BIGINT UNSIGNED | DECIMAL(20, 0) |
REAL | FLOAT |
DOUBLE | DOUBLE |
NUMERIC(p, s) | DECIMAL(p, s) |
NUMERIC(p, s) | STRING |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
TIMESTAMP [(p)] | TIMESTAMP [(p)] |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈n/8⌉) |
Oracle type | Flink SQL type |
NUMBER(p, s <= 0), p - s < 3 | TINYINT |
NUMBER(p, s <= 0), p - s < 5 | SMALLINT |
NUMBER(p, s <= 0), p - s < 10 | INT |
NUMBER(p, s <= 0), p - s < 19 | BIGINT |
NUMBER(p, s <= 0), 19 <= p - s <= 38 | DECIMAL(p - s, 0) |
NUMBER(p, s > 0) | DECIMAL(p, s) |
NUMBER(p, s <= 0), p - s > 38 | STRING |
FLOAT | FLOAT |
DOUBLE PRECISION | DOUBLE |
NUMBER(1) | BOOLEAN |
DATE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] WITH TIME ZONE | TIMESTAMP [(p)] WITH TIME ZONE |
TIMESTAMP [(p)] WITH LOCAL TIME ZONE | TIMESTAMP_LTZ [(p)] |
CHAR(n) | STRING |
BLOB | BYTES |
SQL读出来的内容是的Tuple2<Boolean,Row>的格式
DataStream读出来的内容可以设置成JSON的格式,方便解析
{"before":null,"after":{"C_COLLECTORID":"W2","S_TIME":1655497161000,"S_NUM":{"scale":0,"value":"BA=="},"S_LASTESTTIME":1655916525000,"S_INITTIME":1655497161000,"W_WORKSHOP":null,"W_WORKCLASSID":null,"C_FACTORY":"2720"},"source":{"version":"1.5.4.Final","connector":"oracle","name":"oracle_logminer","ts_ms":1657262696008,"snapshot":"last","db":"ORCL.TEST","sequence":null,"schema":"FLINKUSER","table":"GD_WCOLLECTCURRENTSTAT","txId":null,"scn":"1135085","commit_scn":null,"lcr_position":null},"op":“c","ts_ms":1657262696008,"transaction":null}
before就是数据改变前的内容,after就是数据改变后的内容,op为c代表新增的,下面是op的对应数据改变类型