Flink JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据并将数据写入其中。本文档介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询。
最终的目的是通过Flink CDC实时的同步MySQL中的数据到Iceberg。

如果在 DDL 上定义了主键,则 JDBC sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。
Apache Iceberg是一种表格式(table format)。我们可以简单理解为它是基于计算层(flink、spark)和存储层(orc、parquet)的一个中间层,我们可以把它定义成一种“数据组织格式”,Iceberg将其称之为“表格式”也是表达类似的含义。
它与底层的存储格式(比如ORC、Parquet之类的列式存储格式)最大的区别是,它并不定义数据存储方式,而是定义了数据、元数据的组织方式,向上提供统一的“表”的语义。它构建在数据存储格式之上,其底层的数据存储仍然使用Parquet、ORC等进行存储。在hive建立一个iceberg格式的表。用flink或者spark写入iceberg,然后再通过其他方式来读取这个表,比如spark、flink、presto等。
Iceberg 优势
增量读取处理能力:Iceberg支持通过流式方式读取增量数据,支持Structured Streaming以及Flink table Source;
支持事务(ACID),上游数据写入即可见,不影响当前数据处理任务,简化ETL;提供upsert和merge into能力,可以极大地缩小数据入库延迟;
可扩展的元数据,快照隔离以及对于文件列表的所有修改都是原子操作;
同时支持流批处理、支持多种存储格式和灵活的文件组织:提供了基于流式的增量计算模型和基于批处理的全量表计算模型。批处理和流任务可以使用相同的存储模型,数据不再孤立;Iceberg支持隐藏分区和分区进化,方便业务进行数据分区策略更新。支持Parquet、Avro以及ORC等存储格式。
支持多种计算引擎,优秀的内核抽象使之不绑定特定的计算引擎,目前Iceberg支持的计算引擎有Spark、Flink、Presto以及Hive。
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <flink.version>1.14.4</flink.version>
- <scala.binary.version>2.12</scala.binary.version>
- <hadoop.version>3.1.2</hadoop.version>
- <hive.version>3.1.2</hive.version>
- </properties>
- <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime -->
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-flink-runtime</artifactId>
- <version>0.12.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.21</version>
- </dependency>
- /**
- * MySQL数据导入iceberg
- */
- public class JDBC2IcebergTable {
- public static void main(String[] args) throws Exception {
-
- // create environments of both APIs
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-
- tableEnv.executeSql("CREATE TABLE IF NOT EXISTS EventTable (\n" +
- "`user` STRING,\n" +
- "url STRING,\n" +
- "`timestamp` BIGINT\n" +
- ") WITH (\n" +
- "'connector' = 'jdbc',\n" +
- "'url' = 'jdbc:mysql://127.0.0.1:3306/flink',\n" +
- "'table-name' = 'events',\n" +
- "'username'='root',\n" +
- "'password'='00000'\n" +
- ")");
-
- Table eventTable = tableEnv.from("EventTable");
-
- // Table aliceTable = tableEnv.sqlQuery("select * from EventTable ");
-
- //创建CATALOG
- tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
- " 'type'='iceberg',\n" +
- " 'catalog-type'='hadoop',\n" +
- " 'warehouse'='file:///tmp/warehouse/iceberg',\n" +
- " 'property-version'='1'\n" +
- ")");
-
- tableEnv.executeSql("USE CATALOG hadoop_catalog");
-
- //创建表
- tableEnv.executeSql("CREATE TABLE IF NOT EXISTS eve (\n" +
- " `user` STRING,\n" +
- " url STRING,\n" +
- " `timestamp` BIGINT \n" +
- ")");
-
- Configuration configuration = new Configuration();
- TableSchema schema = eventTable.getSchema();
- DataStream<Row> input = tableEnv.toDataStream(eventTable);
- // input.print();
-
- TableLoader tableLoader = TableLoader.fromHadoopTable("file:///tmp/warehouse/iceberg/default/eve", configuration);
- DataStreamSink<RowData> dataDataStreamSink = FlinkSink.forRow(input, schema)
- .tableLoader(tableLoader)
- .build();
-
- //读数据
-
- DataStream<RowData> batch = FlinkSource.forRowData()
- .env(env)
- .tableLoader(tableLoader)
- .streaming(false)
- .build();
-
- batch.map(x -> x.getString(0)).print();
- //batch.print();
-
- env.execute("Test Iceberg Batch Read");
-
-
- }
- }
创建脚本,warehouse的路径,它会自动创建HDFS路径里面 ns是命名空间,但namenode的使用ip:port代替。
- //创建CATALOG
- tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
- " 'type'='iceberg',\n" +
- " 'catalog-type'='hadoop',\n" +
- " 'warehouse'='file:///tmp/warehouse/iceberg',\n" +
- " 'property-version'='1'\n" +
- ")");
在刚才构建的Catalog下面创建数据表
- tableEnv.executeSql("USE CATALOG hadoop_catalog");
-
- //创建表
- //创建表
- tableEnv.executeSql("CREATE TABLE IF NOT EXISTS eve (\n" +
- " `user` STRING,\n" +
- " url STRING,\n" +
- " `timestamp` BIGINT \n" +
- ")");
新创建的表数据和元数据。

- DataStream<RowData> batch = FlinkSource.forRowData()
- .env(env)
- .tableLoader(tableLoader)
- .streaming(false)
- .build();
- batch.print();
Idea的输出
