自定义flink sink,批模式下,有insert overwrite 需求或需要启动任务或任务完成后时,只执行一次某些操作时,则可参考此文章
组件:
flink: 1.15
参考文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sourcessinks/
insert overwrite 即在批模式下 先清除表的数据 再插入
在大数据多并发模式下 只能执行一次清空数据 且需要在执行插入数据之前先清理
在flink 支持的连接器中 只有 hive和文件系统此2中连接器支持insert overwrite功能,可参考他们的实现方式
此处演示例子
source: flink jdbc连接器
sink : 自定义flink jdbc连接器
加工: insert overwrite osd_user_test select * from ods_user
关键点是在 执行 overwrite 语句时 在任务开始前或结束时 清空原表的数据,且此操作只执行一次,不然会有其他并发执行写入数据了 在执行清数据情况造成漏数
查看FileSystem sink的源代码
其中 FileSystemTableSink 中实现了 SupportsOverwrite 则可通过方法

取到 是否overwrite逻辑
后续overwrite 标识通过 createBatchSink 方法把标识的值传给FileSystemOutputFormat 类

FileSystemOutputFormat 类中 finalizeGlobal 方式使用到overwrite 标识

FileSystemCommitter 类的 commitPartitions 方法

PartitionLoader 类的 loadPartition 方法 里调用了 overwriteAndRenameFiles 方法

最终overwrite功能是删除已经存在的文件 再插入新的文件
由文件系统连接器overwrite 功能的代码实现中可以看处 关键主要再 finalizeGlobal 上,这个类是如何触发的呢,查看这个类的源码注释可知

这方法执行是在:
The method is invoked on the master (JobManager) after all (parallel) instances of an OutputFormat finished.
则可知此方法会被调起 则OutputFormat 一定要实现 FinalizeOnMaster 类
主要参考如上:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sourcessinks/
区别:
JdbcDynamicTableSink 实现 SupportsOverwrite 即可支持 insert overwrite功能

JDBCDynamicTableFactory类
- package com.tang.conector.jdbc;
-
- import org.apache.flink.configuration.ConfigOption;
- import org.apache.flink.configuration.ConfigOptions;
- import org.apache.flink.configuration.ReadableConfig;
- import org.apache.flink.table.connector.sink.DynamicTableSink;
- import org.apache.flink.table.factories.DynamicTableSinkFactory;
- import org.apache.flink.table.factories.FactoryUtil;
- import org.apache.flink.table.types.DataType;
-
- import java.util.HashSet;
- import java.util.Set;
-
-
- public class JDBCDynamicTableFactory implements DynamicTableSinkFactory {
-
- public static final String CONNECTOR = "my-jdbc";
-
- // define all options statically
- public static final ConfigOption<String> URL = ConfigOptions.key("url")
- .stringType()
- .noDefaultValue();
-
- public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("table-name")
- .stringType()
- .noDefaultValue();
- public static final ConfigOption<String> USER_NAME = ConfigOptions.key("username")
- .stringType()
- .noDefaultValue();
- public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
- .stringType()
- .noDefaultValue();
-
- @Override
- public DynamicTableSink createDynamicTableSink(Context context) {
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- // get the validated options
- final ReadableConfig options = helper.getOptions();
- String url = options.get(URL);
- String tableName = options.get(TABLE_NAME);
- String user = options.get(USER_NAME);
- String password = options.get(PASSWORD);
-
- // derive the produced data type (excluding computed columns) from the catalog table
- final DataType producedDataType =
- context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
-
- // create and return dynamic table source
- return new JdbcDynamicTableSink(url, tableName, user, password, producedDataType);
- }
-
- @Override
- public String factoryIdentifier() {
- return CONNECTOR;
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(URL);
- options.add(TABLE_NAME);
- options.add(USER_NAME);
- options.add(PASSWORD);
- return options;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- return options;
- }
- }
JdbcDynamicTableSink
- package com.tang.conector.jdbc;
-
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSink;
- import org.apache.flink.table.connector.ChangelogMode;
- import org.apache.flink.table.connector.ProviderContext;
- import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
- import org.apache.flink.table.connector.sink.DynamicTableSink;
- import org.apache.flink.table.connector.sink.SinkFunctionProvider;
- import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
- import org.apache.flink.table.data.RowData;
- import org.apache.flink.table.types.DataType;
-
- import java.util.Optional;
-
- public class JdbcDynamicTableSink implements DynamicTableSink, SupportsOverwrite {
-
- private final String url;
- private final String tableName;
- private final String user;
- private final String password;
- private final DataType producedDataType;
- private boolean overwrite;
-
- public JdbcDynamicTableSink(String url, String tableName, String user, String password, DataType producedDataType) {
- this.url = url;
- this.tableName = tableName;
- this.user = user;
- this.password = password;
- this.producedDataType = producedDataType;
- }
-
- @Override
- public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
- return ChangelogMode.all();
- }
-
- @Override
- public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
- boolean isBounded = sinkContext.isBounded();
- // 批模式
-
- return new DataStreamSinkProvider() {
- @Override
- public DataStreamSink<?> consumeDataStream(
- ProviderContext providerContext, DataStream<RowData> dataStream) {
- return consume(providerContext, dataStream, sinkContext);
- }
- };
- // 流模式
- /* JdbcOutFormat format = new JdbcOutFormat(overwrite);
- JdbcSinkFunction<JdbcOutFormat> function = new JdbcSinkFunction<>(url, tableName, user, password, format);
- return SinkFunctionProvider.of(function, 4);*/
- }
-
- private DataStreamSink<?> consume(
- ProviderContext providerContext, DataStream<RowData> dataStream, Context sinkContext) {
- final int inputParallelism = dataStream.getParallelism();
- final int parallelism = Optional.ofNullable(4).orElse(inputParallelism);
- return createBatchSink(dataStream, sinkContext, parallelism);
- }
-
- private DataStreamSink<RowData> createBatchSink(
- DataStream<RowData> inputStream, Context sinkContext, final int parallelism) {
- JdbcOutFormat format = new JdbcOutFormat(overwrite);
- return inputStream
- .writeUsingOutputFormat(format)
- .setParallelism(parallelism)
- .name("Jdbc");
- }
-
- @Override
- public DynamicTableSink copy() {
- return new JdbcDynamicTableSink(url,tableName,user,password,producedDataType);
- }
-
- @Override
- public String asSummaryString() {
- return "my-jdbc";
- }
-
- @Override
- public void applyOverwrite(boolean b) {
- overwrite = b;
- }
- }
JdbcOutFormat
- package com.tang.conector.jdbc;
-
- import org.apache.flink.api.common.io.FinalizeOnMaster;
- import org.apache.flink.api.common.io.InitializeOnMaster;
- import org.apache.flink.api.common.io.RichOutputFormat;
- import org.apache.flink.configuration.Configuration;
-
- import java.io.IOException;
- import java.io.Serializable;
-
- public class JdbcOutFormat
extends RichOutputFormat implements FinalizeOnMaster,InitializeOnMaster, Serializable { -
- private boolean overwrite;
-
- public JdbcOutFormat(boolean overwrite) {
- this.overwrite = overwrite;
- }
-
- @Override
- public void finalizeGlobal(int parallelism) throws IOException {
- System.out.println("finalizeGlobal do some thing after all");
- }
-
- @Override
- public void configure(Configuration parameters) {
- }
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- System.out.println("do open....");
- }
-
- @Override
- public void writeRecord(T record) throws IOException {
- System.out.println("do write one data ....");
- }
-
- @Override
- public void close() throws IOException {
- System.out.println("do close....");
- }
-
- @Override
- public void initializeGlobal(int parallelism) throws IOException {
- System.out.println("initializeGlobal do some thing before all");
- if (overwrite) {
- System.out.println(String.format("initializeGlobal This is overwrite mode. execute truncate table "));
- // TODO 清空表逻辑
- }
- }
- }
JdbcOutFormat 实现 FinalizeOnMaster 和 InitializeOnMaster
其中 InitializeOnMaster 类的 initializeGlobal 方法作用如下

其中 FinalizeOnMaster 类的 finalizeGlobal 方法解释如下:

- package com.tang.jdbc;
-
- import org.apache.flink.table.api.EnvironmentSettings;
- import org.apache.flink.table.api.TableEnvironment;
-
- public class JdbcTest {
-
- public static void main(String[] args) throws Exception{
- EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
- TableEnvironment tableEnv = TableEnvironment.create(settings);
-
- String myqlSql = "CREATE TABLE ods_user (\n" +
- " id INT,\n" +
- " name STRING,\n" +
- " age INT,\n" +
- " city STRING,\n" +
- " update_time TIMESTAMP,\n" +
- " PRIMARY KEY (id) NOT ENFORCED\n" +
- ") WITH (\n" +
- " 'connector' = 'jdbc',\n" +
- " 'url' = 'jdbc:mysql://localhost:3306/wm',\n" +
- " 'table-name' = 'osd_user',\n" +
- " 'username' = 'root',\n" +
- " 'password' = '123456'\n" +
- ")";
- tableEnv.executeSql(myqlSql);
-
- String sinkSql = "CREATE TABLE osd_user_test (\n" +
- " id INT,\n" +
- " name STRING,\n" +
- " age INT,\n" +
- " city STRING,\n" +
- " update_time TIMESTAMP,\n" +
- " PRIMARY KEY (id) NOT ENFORCED\n" +
- ") WITH (\n" +
- " 'connector' = 'my-jdbc',\n" +
- " 'url' = 'jdbc:mysql://localhost:3306/wm',\n" +
- " 'table-name' = 'osd_user_test',\n" +
- " 'username' = 'root',\n" +
- " 'password' = '123456'\n" +
- ")";
- tableEnv.executeSql(sinkSql);
- String insertSql = "insert overwrite osd_user_test select * from ods_user";
- tableEnv.executeSql(insertSql);
- }
- }
sink 的并发为2,
其中 initializeGlobal 方法 执行一次 在open 方法之前 可在此处现实清空数据逻辑从而实现overwrite 功能
open 和 close方法执行2次 因为并发是2
writeRecord 执行N次 因为有N条数据
finalizeGlobal 方法执行一次 在close方法之后 可在此处实现sink结束后所需要的操作
