
Mysql和oracle在cdc sql中的数据类型映射
MySQL CDC Connector — Flink CDC documentation
| MySQL type | Flink SQL type |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| BIGINT UNSIGNED | DECIMAL(20, 0) |
| REAL | FLOAT |
| DOUBLE | DOUBLE |
| NUMERIC(p, s) | DECIMAL(p, s) |
| NUMERIC(p, s) | STRING |
| BOOLEAN | BOOLEAN |
| DATE | DATE |
| TIME [(p)] | TIME [(p)] |
| TIMESTAMP [(p)] | TIMESTAMP [(p)] |
| CHAR(n) | CHAR(n) |
| VARCHAR(n) | VARCHAR(n) |
| BIT(n) | BINARY(⌈n/8⌉) |
| Oracle type | Flink SQL type |
| NUMBER(p, s <= 0), p - s < 3 | TINYINT |
| NUMBER(p, s <= 0), p - s < 5 | SMALLINT |
| NUMBER(p, s <= 0), p - s < 10 | INT |
| NUMBER(p, s <= 0), p - s < 19 | BIGINT |
| NUMBER(p, s <= 0), 19 <= p - s <= 38 | DECIMAL(p - s, 0) |
| NUMBER(p, s > 0) | DECIMAL(p, s) |
| NUMBER(p, s <= 0), p - s > 38 | STRING |
| FLOAT | FLOAT |
| DOUBLE PRECISION | DOUBLE |
| NUMBER(1) | BOOLEAN |
| DATE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
| TIMESTAMP [(p)] WITH TIME ZONE | TIMESTAMP [(p)] WITH TIME ZONE |
| TIMESTAMP [(p)] WITH LOCAL TIME ZONE | TIMESTAMP_LTZ [(p)] |
| CHAR(n) | STRING |
| BLOB | BYTES |
SQL读出来的内容是的Tuple2<Boolean,Row>的格式



DataStream读出来的内容可以设置成JSON的格式,方便解析

{"before":null,"after":{"C_COLLECTORID":"W2","S_TIME":1655497161000,"S_NUM":{"scale":0,"value":"BA=="},"S_LASTESTTIME":1655916525000,"S_INITTIME":1655497161000,"W_WORKSHOP":null,"W_WORKCLASSID":null,"C_FACTORY":"2720"},"source":{"version":"1.5.4.Final","connector":"oracle","name":"oracle_logminer","ts_ms":1657262696008,"snapshot":"last","db":"ORCL.TEST","sequence":null,"schema":"FLINKUSER","table":"GD_WCOLLECTCURRENTSTAT","txId":null,"scn":"1135085","commit_scn":null,"lcr_position":null},"op":“c","ts_ms":1657262696008,"transaction":null}
before就是数据改变前的内容,after就是数据改变后的内容,op为c代表新增的,下面是op的对应数据改变类型