• JDBC SQL Server Source Connector: 一览与实践


    file

    在快速发展的数据驱动业务环境中,确保数据在各个系统间高效、准确地同步至关重要。为了进一步的数据处理和分析,经常需要将这些数据同步到其他数据处理系统。Apache SeaTunnel 提供了一个强大而灵活的数据集成框架,使得从 SQL Server 到其他系统的数据同步变得简单且高效。

    本文档将指导您如何配置 Apache SeaTunnel,使用 JDBC SQL Server Source Connector 来实现数据的有效同步。

    SQL Server

    JDBC SQL Server Source Connector

    支持 SQL Server 版本

    • 服务器:2008(或更高版本,仅供信息参考)

    支持以下引擎

    Spark
    Flink
    Seatunnel Zeta

    主要特点

    支持查询 SQL 并能够实现投影效果。

    描述

    通过 JDBC 读取外部数据源数据。

    支持的数据源信息

    数据源支持的版本驱动URLMaven
    SQL Server支持版本 >= 2008com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433下载

    数据库依赖

    请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录
    例如 SQL Server 数据源:cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

    数据类型映射

    SQL Server 数据类型Seatunnel 数据类型
    BITBOOLEAN
    TINYINT
    SMALLINT
    SHORT
    INTEGERINT
    BIGINTLONG
    DECIMAL
    NUMERIC
    MONEY
    SMALLMONEY
    DECIMAL((指定列的指定列大小)+1,
    (获取指定列的小数点右边的数字的数量。)))
    REALFLOAT
    FLOATDOUBLE
    CHAR
    NCHAR
    VARCHAR
    NTEXT
    NVARCHAR
    TEXT
    STRING
    DATELOCAL_DATE
    TIMELOCAL_TIME
    DATETIME
    DATETIME2
    SMALLDATETIME
    DATETIMEOFFSET
    LOCAL_DATE_TIME
    TIMESTAMP
    BINARY
    VARBINARY
    IMAGE
    UNKNOWN
    尚不支持

    源选项

    名称类型必需默认值描述
    url字符串-JDBC 连接的 URL。例如:jdbc:sqlserver://127.0.0.1:1434;database=TestDB
    driver字符串-用于连接到远程数据源的 JDBC 类名,如果使用 SQL Server,则值为 com.microsoft.sqlserver.jdbc.SQLServerDriver
    user字符串-连接实例的用户名
    password字符串-连接实例的密码
    query字符串-查询语句
    connection_check_timeout_sec整数30等待用于验证连接的数据库操作完成的秒数
    partition_column字符串-并行处理的分区列,仅支持数值类型。
    partition_lower_bound长整数-用于扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
    partition_upper_bound长整数-用于扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
    partition_num整数作业并行度分区计数的数量,仅支持正整数。默认值为作业并行度。
    fetch_size整数0对返回大量对象的查询,您可以配置查询中使用的行抓取大小,以减少满足选择条件所需的数据库命中次数,从而提高性能。
    零表示使用 JDBC 默认值。
    common-options-源插件的常见参数,请参阅 源常用选项 以获取详细信息。

    提示

    如果未设置 partition_column,则将以单一并发运行;如果设置了 partition_column,则将根据任务的并发度进行并行执行。

    任务示例

    简单:

    简单的单一任务以读取数据表

    # 定义运行时环境
    
    env {
    
    # 您可以在此处设置 Flink 配置
    
    execution.parallelism = 1
    job.mode = "BATCH"
    }
    source{
    Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "select \* from full_types_jdbc"
    }
    }
    
    transform { # 如果您想要获取有关如何配置 seatunnel 和查看变换插件的完整列表的更多信息, # 请转到 [seatunnel.apache.org/docs/transform-v2/sql](https://seatunnel.apache.org/docs/transform-v2/sql)
    }
    
    sink {
    Console {}
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    并行:

    使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

    env {
      # 您可以在此处设置 Flink 配置
      execution.parallelism = 10
      job.mode = "BATCH"
    }
    
    source {
        Jdbc {
            driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
            url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
            user = SA
            password = "Y.sa123456"
            # 根据需要定义查询逻辑
            query = "select * from full_types_jdbc"
            # 并行分片读取字段
            partition_column = "id"
            # 片段数量
            partition_num = 10
        }
    }
    
    transform {
        # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
        # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
    }
    
    sink {
        Console {}
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    并行:

    使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

    env {
      # 您可以在此处设置 Flink 配置
      execution.parallelism = 10
      job.mode = "BATCH"
    }
    
    source {
        Jdbc {
            driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
            url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
            user = SA
            password = "Y.sa123456"
            # 根据需要定义查询逻辑
            query = "select * from full_types_jdbc"
            # 并行分片读取字段
            partition_column = "id"
            # 片段数量
            partition_num = 10
        }
    }
    
    transform {
        # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
        # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
    }
    
    sink {
        Console {}
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    并行:

    使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

    env {
      # 您可以在此处设置 Flink 配置
      execution.parallelism = 10
      job.mode = "BATCH"
    }
    
    source {
        Jdbc {
            driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
            url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
            user = SA
            password = "Y.sa123456"
            # 根据需要定义查询逻辑
            query = "select * from full_types_jdbc"
            # 并行分片读取字段
            partition_column = "id"
            # 片段数量
            partition_num = 10
        }
    }
    
    transform {
        # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
        # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
    }
    
    sink {
        Console {}
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    并行:

    使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

    env {
      # 您可以在此处设置 Flink 配置
      execution.parallelism = 10
      job.mode = "BATCH"
    }
    
    source {
        Jdbc {
            driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
            url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
            user = SA
            password = "Y.sa123456"
            # 根据需要定义查询逻辑
            query = "select * from full_types_jdbc"
            # 并行分片读取字段
            partition_column = "id"
            # 片段数量
            partition_num = 10
        }
    }
    
    transform {
        # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
        # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
    }
    
    sink {
        Console {}
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    分段并行读取示例:

    这是一个快速并行读取数据的分片示例

    env {
      # 您可以在此处设置引擎配置
      execution.parallelism = 10
    }
    
    source {
      # 这是一个示例源插件,仅用于测试和展示源插件的功能
      Jdbc {
        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
        user = SA
        password = "Y.sa123456"
        query = "select * from column_type_test.dbo.full_types_jdbc"
        # 并行分片读取字段
        partition_column = "id"
        # 片段数量
        partition_num = 10
      }
      # 如果您想要获取有关如何配置 Seatunnel 和查看源插件的完整列表的更多信息,
      # 请转到 https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
    }
    
    transform {
      # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
      # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
    }
    
    sink {
      Console {}
      # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,
      # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    本文由 白鲸开源科技 提供发布支持!

  • 相关阅读:
    MFC 如何启用/禁用菜单(返灰/不可点击状态)
    Allegro导入导出设计数据操作指导
    三篇论文入选国际顶会SIGMOD,厉害了腾讯云数据库
    AndroidStudio案例——登录页面的切换
    前端常见专有名词
    代码技巧: 类中同一个函数可以同时存在常函数版本和普通函数版本(c++)
    【周赛+双周赛20220904】T4
    create® 3入门教程-简单的动作
    Vue指令
    python切分字符串
  • 原文地址:https://blog.csdn.net/weixin_54625990/article/details/134249140