• 深入浅出Apache SeaTunnel SQL Server Sink Connector


    在大数据时代,数据的迁移和流动已经变得日益重要。为了使数据能够更加高效地从一个源流向另一个目标,我们需要可靠、高效和易于配置的工具。今天,我们将介绍 JDBC SQL Server Sink Connector,这是一个专为 SQL Server 设计的连接器,能够确保数据的精准、高效传输。

    file

    不仅如此,它还支持多种流处理引擎,例如 Spark、Flink 和 SeatTunnel Zeta。无论您是初学者还是有经验的开发者,本文都将为您提供关于如何最大限度地利用此连接器的深入见解。

    支持 SQL Server 版本

    • 服务器:2008(或更高版本,仅供信息参考)

    支持的引擎

    Spark
    Flink
    Seatunnel Zeta

    主要特点

    使用 Xa 事务 来确保 精准一次性。因此,仅支持支持 Xa 事务 的数据库的 精准一次性。您可以设置 is_exactly_once=true 来启用它。

    描述

    通过 JDBC 写入数据。支持批处理模式和流处理模式,支持并发写入,支持精准一次性语义(使用 XA 事务保证)。

    支持的数据源信息

    数据源支持的版本驱动URLMaven
    SQL Server支持版本 >= 2008com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433下载

    数据库依赖

    请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录
    例如 SQL Server 数据源:cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

    数据类型映射

    SQL Server 数据类型Seatunnel 数据类型
    BITBOOLEAN
    TINYINT
    SMALLINT
    SHORT
    INTEGERINT
    BIGINTLONG
    DECIMAL
    NUMERIC
    MONEY
    SMALLMONEY
    DECIMAL((指定列的指定列大小)+1,
    (获取指定列的小数点右边的数字的数量。)))
    REALFLOAT
    FLOATDOUBLE
    CHAR
    NCHAR
    VARCHAR
    NTEXT
    NVARCHAR
    TEXT
    STRING
    DATELOCAL_DATE
    TIMELOCAL_TIME
    DATETIME
    DATETIME2
    SMALLDATETIME
    DATETIMEOFFSET
    LOCAL_DATE_TIME
    TIMESTAMP
    BINARY
    VARBINARY
    IMAGE
    UNKNOWN
    尚不支持

    Sink 选项

    名称类型必需默认值描述
    url字符串-JDBC 连接的 URL。例如:jdbc:sqlserver://localhost:1433;databaseName=mydatabase
    driver字符串-用于连接到远程数据源的 JDBC 类名,如果使用 SQL Server,则值为 com.microsoft.sqlserver.jdbc.SQLServerDriver
    user字符串-连接实例的用户名
    password字符串-连接实例的密码
    query字符串-使用此 SQL 将上游输入数据写入数据库。例如 INSERT ...query 具有更高的优先级
    database字符串-使用此 databasetable-name 自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query 互斥,优先级更高。
    table字符串-使用数据库和此表名自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query 互斥,优先级更高。
    primary_keys数组-此选项用于支持自动生成 SQL 时的 insertdeleteupdate 等操作。
    support_upsert_by_query_primary_key_exist布尔false选择是否使用 INSERT SQL、UPDATE SQL 来处理基于查询主键是否存在的更新事件(INSERT、UPDATE_AFTER)。只有在数据库不支持 upsert 语法时才使用此配置。注意:此方法性能较低。
    connection_check_timeout_sec整数30等待用于验证连接的数据库操作完成的秒数。
    max_retries整数0重试提交失败(executeBatch)的次数。
    batch_size整数1000用于批量写入的记录数量达到 batch_size 或时间达到 checkpoint.interval 时,数据将刷新到数据库。
    is_exactly_once布尔false是否启用精准一次性语义,将使用 XA 事务。如果开启,需要设置 xa_data_source_class_name
    generate_sink_sql布尔false基于要写入的数据库表生成 SQL 语句。
    xa_data_source_class_name字符串-数据库驱动程序的 XA 数据源类名,例如,SQL Server 为 com.microsoft.sqlserver.jdbc.SQLServerXADataSource,其他数据源请参考附录。
    max_commit_attempts整数3事务提交失败的重试次数。
    transaction_timeout_sec整数-1事务打开后的超时时间,默认值为 -1(永不超时)。请注意,设置超时可能会影响精准一次性语义。
    auto_commit布尔true默认启用自动事务提交。
    common-options-Sink 插件通用参数,请参考 Sink Common Options 以获取详细信息。
    ## 提示

    如果未设置 partition_column,则将以单一并发运行;如果设置了 partition_column,则将根据任务的并发度执行并行操作。

    任务示例

    简单:

    这是一个读取 Sql Server 数据并将其直接插入另一个表中的示例

    env {
      # 您可以在此处设置引擎配置
      execution.parallelism = 10
    }
    source {
      # 这是一个示例源插件,仅用于测试和演示源插件的功能
      Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        query = "select * from column_type_test.dbo.full_types_jdbc"
        # 并行分片读取字段
        partition_column = "id"
        # 片段数量
        partition_num = 10
      }
    }
    
    transform {
    
      # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
      # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
    }
    
    sink {
      Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"
    
      }  # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,
      # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    CDC(Change Data Capture)事件

    我们还支持 CDC 变更数据,此时需要配置数据库、表和主键。

    Jdbc {
      source_table_name = "customers"
      driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
      url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
      user = SA
      password = "Y.sa123456"
      generate_sink_sql = true
      database = "column_type_test"
      table = "dbo.full_types_sink"
      batch_size = 100
      primary_keys = ["id"]
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    精确一次性 Sink

    事务性写入可能会更慢,但对数据更准确

    
      Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"
        is_exactly_once = "true"
    
        xa_data_source_class_name = "com.microsoft.sqlserver.jdbc.SQLServerXADataSource"
    
      }  # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,
      # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    本文由 白鲸开源科技 提供发布支持!

  • 相关阅读:
    unity 从UI上拖出3D物体,(2D转3D)
    Java Persistence API (JPA) 之 EntityManager
    corona接入ios登录和支付
    程序化广告平台如何让app广告变现收益最大化?
    vue-cli脚手架创建项目
    制造企业如何满足客户需求?精益生产教您三招
    目标检测论文解读复现之十四:一种基于残差网络优化的航拍小目标检测算法
    深度学习入门(四十二)计算机视觉——目标检测和边界框
    MySQL进阶(再论事务)——什么是事务 & 事务的隔离级别 & 结合MySQL案例详细分析
    【逗老师的无线电】宝峰1701刷OpenGD77
  • 原文地址:https://blog.csdn.net/DolphinScheduler/article/details/133999054