• Flink自定义sink并支持insert overwrite 功能


    前言

    自定义flink sink,批模式下,有insert overwrite 需求或需要启动任务或任务完成后时,只执行一次某些操作时,则可参考此文章

    组件:

    flink: 1.15

    参考文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sourcessinks/

    分析

    insert overwrite 即在批模式下 先清除表的数据 再插入   

    在大数据多并发模式下  只能执行一次清空数据 且需要在执行插入数据之前先清理

     在flink 支持的连接器中  只有  hive和文件系统此2中连接器支持insert overwrite功能,可参考他们的实现方式

    此处演示例子

    source: flink jdbc连接器

    sink : 自定义flink jdbc连接器

    加工: insert overwrite osd_user_test select * from ods_user

    关键点是在 执行 overwrite 语句时 在任务开始前或结束时 清空原表的数据,且此操作只执行一次,不然会有其他并发执行写入数据了 在执行清数据情况造成漏数

    查看FileSystem sink的源代码

    其中 FileSystemTableSink 中实现了 SupportsOverwrite  则可通过方法 

    取到 是否overwrite逻辑

    后续overwrite 标识通过 createBatchSink 方法把标识的值传给FileSystemOutputFormat 类

    FileSystemOutputFormat 类中  finalizeGlobal 方式使用到overwrite 标识

    FileSystemCommitter 类的 commitPartitions 方法

    PartitionLoader 类的 loadPartition 方法 里调用了 overwriteAndRenameFiles 方法

    最终overwrite功能是删除已经存在的文件 再插入新的文件

    文件系统连接器overwrite 功能的代码实现中可以看处  关键主要再 finalizeGlobal 上,这个类是如何触发的呢,查看这个类的源码注释可知

    这方法执行是在:

    The method is invoked on the master (JobManager) after all (parallel) instances of an OutputFormat finished.

    则可知此方法会被调起  则OutputFormat 一定要实现 FinalizeOnMaster 类

    步骤

    1. 自定义sink连接器

    主要参考如上:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sourcessinks/

    区别:

    JdbcDynamicTableSink 实现 SupportsOverwrite 即可支持 insert overwrite功能

    JDBCDynamicTableFactory类
    1. package com.tang.conector.jdbc;
    2. import org.apache.flink.configuration.ConfigOption;
    3. import org.apache.flink.configuration.ConfigOptions;
    4. import org.apache.flink.configuration.ReadableConfig;
    5. import org.apache.flink.table.connector.sink.DynamicTableSink;
    6. import org.apache.flink.table.factories.DynamicTableSinkFactory;
    7. import org.apache.flink.table.factories.FactoryUtil;
    8. import org.apache.flink.table.types.DataType;
    9. import java.util.HashSet;
    10. import java.util.Set;
    11. public class JDBCDynamicTableFactory implements DynamicTableSinkFactory {
    12. public static final String CONNECTOR = "my-jdbc";
    13. // define all options statically
    14. public static final ConfigOption<String> URL = ConfigOptions.key("url")
    15. .stringType()
    16. .noDefaultValue();
    17. public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("table-name")
    18. .stringType()
    19. .noDefaultValue();
    20. public static final ConfigOption<String> USER_NAME = ConfigOptions.key("username")
    21. .stringType()
    22. .noDefaultValue();
    23. public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
    24. .stringType()
    25. .noDefaultValue();
    26. @Override
    27. public DynamicTableSink createDynamicTableSink(Context context) {
    28. final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
    29. // get the validated options
    30. final ReadableConfig options = helper.getOptions();
    31. String url = options.get(URL);
    32. String tableName = options.get(TABLE_NAME);
    33. String user = options.get(USER_NAME);
    34. String password = options.get(PASSWORD);
    35. // derive the produced data type (excluding computed columns) from the catalog table
    36. final DataType producedDataType =
    37. context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
    38. // create and return dynamic table source
    39. return new JdbcDynamicTableSink(url, tableName, user, password, producedDataType);
    40. }
    41. @Override
    42. public String factoryIdentifier() {
    43. return CONNECTOR;
    44. }
    45. @Override
    46. public Set<ConfigOption<?>> requiredOptions() {
    47. final Set<ConfigOption<?>> options = new HashSet<>();
    48. options.add(URL);
    49. options.add(TABLE_NAME);
    50. options.add(USER_NAME);
    51. options.add(PASSWORD);
    52. return options;
    53. }
    54. @Override
    55. public Set<ConfigOption<?>> optionalOptions() {
    56. final Set<ConfigOption<?>> options = new HashSet<>();
    57. return options;
    58. }
    59. }
    JdbcDynamicTableSink
    1. package com.tang.conector.jdbc;
    2. import org.apache.flink.streaming.api.datastream.DataStream;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSink;
    4. import org.apache.flink.table.connector.ChangelogMode;
    5. import org.apache.flink.table.connector.ProviderContext;
    6. import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
    7. import org.apache.flink.table.connector.sink.DynamicTableSink;
    8. import org.apache.flink.table.connector.sink.SinkFunctionProvider;
    9. import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
    10. import org.apache.flink.table.data.RowData;
    11. import org.apache.flink.table.types.DataType;
    12. import java.util.Optional;
    13. public class JdbcDynamicTableSink implements DynamicTableSink, SupportsOverwrite {
    14. private final String url;
    15. private final String tableName;
    16. private final String user;
    17. private final String password;
    18. private final DataType producedDataType;
    19. private boolean overwrite;
    20. public JdbcDynamicTableSink(String url, String tableName, String user, String password, DataType producedDataType) {
    21. this.url = url;
    22. this.tableName = tableName;
    23. this.user = user;
    24. this.password = password;
    25. this.producedDataType = producedDataType;
    26. }
    27. @Override
    28. public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
    29. return ChangelogMode.all();
    30. }
    31. @Override
    32. public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
    33. boolean isBounded = sinkContext.isBounded();
    34. // 批模式
    35. return new DataStreamSinkProvider() {
    36. @Override
    37. public DataStreamSink<?> consumeDataStream(
    38. ProviderContext providerContext, DataStream<RowData> dataStream) {
    39. return consume(providerContext, dataStream, sinkContext);
    40. }
    41. };
    42. // 流模式
    43. /* JdbcOutFormat format = new JdbcOutFormat(overwrite);
    44. JdbcSinkFunction<JdbcOutFormat> function = new JdbcSinkFunction<>(url, tableName, user, password, format);
    45. return SinkFunctionProvider.of(function, 4);*/
    46. }
    47. private DataStreamSink<?> consume(
    48. ProviderContext providerContext, DataStream<RowData> dataStream, Context sinkContext) {
    49. final int inputParallelism = dataStream.getParallelism();
    50. final int parallelism = Optional.ofNullable(4).orElse(inputParallelism);
    51. return createBatchSink(dataStream, sinkContext, parallelism);
    52. }
    53. private DataStreamSink<RowData> createBatchSink(
    54. DataStream<RowData> inputStream, Context sinkContext, final int parallelism) {
    55. JdbcOutFormat format = new JdbcOutFormat(overwrite);
    56. return inputStream
    57. .writeUsingOutputFormat(format)
    58. .setParallelism(parallelism)
    59. .name("Jdbc");
    60. }
    61. @Override
    62. public DynamicTableSink copy() {
    63. return new JdbcDynamicTableSink(url,tableName,user,password,producedDataType);
    64. }
    65. @Override
    66. public String asSummaryString() {
    67. return "my-jdbc";
    68. }
    69. @Override
    70. public void applyOverwrite(boolean b) {
    71. overwrite = b;
    72. }
    73. }

    2. 自定义OutFormat类

    JdbcOutFormat
    1. package com.tang.conector.jdbc;
    2. import org.apache.flink.api.common.io.FinalizeOnMaster;
    3. import org.apache.flink.api.common.io.InitializeOnMaster;
    4. import org.apache.flink.api.common.io.RichOutputFormat;
    5. import org.apache.flink.configuration.Configuration;
    6. import java.io.IOException;
    7. import java.io.Serializable;
    8. public class JdbcOutFormat extends RichOutputFormat implements FinalizeOnMaster,InitializeOnMaster, Serializable {
    9. private boolean overwrite;
    10. public JdbcOutFormat(boolean overwrite) {
    11. this.overwrite = overwrite;
    12. }
    13. @Override
    14. public void finalizeGlobal(int parallelism) throws IOException {
    15. System.out.println("finalizeGlobal do some thing after all");
    16. }
    17. @Override
    18. public void configure(Configuration parameters) {
    19. }
    20. @Override
    21. public void open(int taskNumber, int numTasks) throws IOException {
    22. System.out.println("do open....");
    23. }
    24. @Override
    25. public void writeRecord(T record) throws IOException {
    26. System.out.println("do write one data ....");
    27. }
    28. @Override
    29. public void close() throws IOException {
    30. System.out.println("do close....");
    31. }
    32. @Override
    33. public void initializeGlobal(int parallelism) throws IOException {
    34. System.out.println("initializeGlobal do some thing before all");
    35. if (overwrite) {
    36. System.out.println(String.format("initializeGlobal This is overwrite mode. execute truncate table "));
    37. // TODO 清空表逻辑
    38. }
    39. }
    40. }

    3. 自定义sink初始化执行一次实现overwrite逻辑

    JdbcOutFormat 实现 FinalizeOnMaster 和 InitializeOnMaster  

    其中 InitializeOnMaster 类的 initializeGlobal 方法作用如下

    其中 FinalizeOnMaster 类的 finalizeGlobal 方法解释如下:

    测试与验证

    1. 测试代码

    1. package com.tang.jdbc;
    2. import org.apache.flink.table.api.EnvironmentSettings;
    3. import org.apache.flink.table.api.TableEnvironment;
    4. public class JdbcTest {
    5. public static void main(String[] args) throws Exception{
    6. EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
    7. TableEnvironment tableEnv = TableEnvironment.create(settings);
    8. String myqlSql = "CREATE TABLE ods_user (\n" +
    9. " id INT,\n" +
    10. " name STRING,\n" +
    11. " age INT,\n" +
    12. " city STRING,\n" +
    13. " update_time TIMESTAMP,\n" +
    14. " PRIMARY KEY (id) NOT ENFORCED\n" +
    15. ") WITH (\n" +
    16. " 'connector' = 'jdbc',\n" +
    17. " 'url' = 'jdbc:mysql://localhost:3306/wm',\n" +
    18. " 'table-name' = 'osd_user',\n" +
    19. " 'username' = 'root',\n" +
    20. " 'password' = '123456'\n" +
    21. ")";
    22. tableEnv.executeSql(myqlSql);
    23. String sinkSql = "CREATE TABLE osd_user_test (\n" +
    24. " id INT,\n" +
    25. " name STRING,\n" +
    26. " age INT,\n" +
    27. " city STRING,\n" +
    28. " update_time TIMESTAMP,\n" +
    29. " PRIMARY KEY (id) NOT ENFORCED\n" +
    30. ") WITH (\n" +
    31. " 'connector' = 'my-jdbc',\n" +
    32. " 'url' = 'jdbc:mysql://localhost:3306/wm',\n" +
    33. " 'table-name' = 'osd_user_test',\n" +
    34. " 'username' = 'root',\n" +
    35. " 'password' = '123456'\n" +
    36. ")";
    37. tableEnv.executeSql(sinkSql);
    38. String insertSql = "insert overwrite osd_user_test select * from ods_user";
    39. tableEnv.executeSql(insertSql);
    40. }
    41. }

    2.结果与现象

    sink 的并发为2,

    其中 initializeGlobal 方法 执行一次 在open 方法之前  可在此处现实清空数据逻辑从而实现overwrite 功能

    open 和 close方法执行2次  因为并发是2

    writeRecord 执行N次 因为有N条数据

    finalizeGlobal 方法执行一次 在close方法之后 可在此处实现sink结束后所需要的操作

  • 相关阅读:
    【无标题】
    LeetCode——字符串(Java)
    16 | 把大象装进冰箱:HTTP传输大文件的方法
    Express框架
    阿里云云原生一体化数仓 — 分析服务一体化新能力解读
    使用IO完成端口实现简单回显服务器
    免密登录和Jenkins自动复制jar包以及启动
    【C++】STL容器——vector类的使用指南(含代码演示)(11)
    【多线程案例】单例模式
    Java二十三种设计模式-享元模式(12/23)
  • 原文地址:https://blog.csdn.net/m0_37592814/article/details/133827345