• 必看!S3File Sink Connector 使用文档


    file

    S3File 是一个用于管理 Amazon S3(Simple Storage Service)的 Python 模块。当前,Apache SeaTunnel 已经支持 S3File Sink Connector,为了更好地使用这个 Connector,有必要看一下这篇使用文档指南。

    描述

    将数据输出到 AWS S3 文件系统。

    提示:

    如果您使用的是 Spark/Flink,在使用此连接器之前,必须确保您的 Spark/Flink 集群已经集成了 Hadoop。Hadoop 2.x 版本已通过测试。

    如果您使用的是 SeaTunnel Engine,它会在您下载和安装 SeaTunnel Engine 时自动集成 Hadoop JAR 包。您可以在 ${SEATUNNEL_HOME}/lib 目录下确认这个 JAR 包是否存在。

    主要特性

    默认情况下,我们使用 2PC 提交来确保 "仅一次语义"。

    • 文件格式类型
      • 文本 (text)
      • CSV
      • Parquet
      • ORC
      • JSON
      • Excel

    选项

    名称类型必需默认值备注
    pathstring-
    bucketstring-
    fs.s3a.endpointstring-
    fs.s3a.aws.credentials.providerstringcom.amazonaws.auth.InstanceProfileCredentialsProvider
    access_keystring-仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
    access_secretstring-仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
    custom_filenamebooleanfalse是否需要自定义文件名
    file_name_expressionstring"${transactionId}"仅在 custom_filename 为 true 时使用
    filename_time_formatstring"yyyy.MM.dd"仅在 custom_filename 为 true 时使用
    file_format_typestring"csv"
    field_delimiterstring'\001'仅在 file_format 为 text 时使用
    row_delimiterstring"\n"仅在 file_format 为 text 时使用
    have_partitionbooleanfalse是否需要处理分区
    partition_byarray-仅在 have_partition 为 true 时使用
    partition_dir_expressionstring"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"仅在 have_partition 为 true 时使用
    is_partition_field_write_in_filebooleanfalse仅在 have_partition 为 true 时使用
    sink_columnsarray当此参数为空时,将写入所有从 "Transform" 或 "Source" 获取的字段
    is_enable_transactionbooleantrue
    batch_sizeint1000000
    compress_codecstringnone
    common-optionsobject-
    max_rows_in_memoryint-仅在 file_format 为 Excel 时使用
    sheet_namestringSheet${Random number}仅在 file_format 为 Excel 时使用

    path [string]

    目标目录路径是必需的。

    bucket [string]

    S3 文件系统的bucket地址,例如:s3n://seatunnel-test,如果您使用的是 s3a 协议,此参数应为 s3a://seatunnel-test

    fs.s3a.endpoint [string]

    fs s3a 端点

    fs.s3a.aws.credentials.provider [string]

    认证 s3a 的方式。目前我们仅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvidercom.amazonaws.auth.InstanceProfileCredentialsProvider

    关于凭证提供程序的更多信息,您可以参考 Hadoop AWS 文档

    access_key [string]

    S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws

    access_secret [string]

    S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws

    hadoop_s3_properties [map]

    如果需要添加其他选项,可以在这里添加并参考此 链接

    hadoop_s3_properties {
          "fs.s3a.buffer.dir" = "/data/st_test/s3a"
          "fs.s3a.fast.upload.buffer" = "disk"
       }
    • 1
    • 2
    • 3

    custom_filename [boolean]

    是否自定义文件名。

    file_name_expression [string]

    仅在 custom_filenametrue 时使用

    file_name_expression 描述了将创建到 path 中的文件表达式。我们可以在 file_name_expression 中添加变量 ${now}${uuid},例如 test_${uuid}_${now}${now} 代表当前时间,其格式可以通过指定选项 filename_time_format 来定义。

    请注意,如果 is_enable_transactiontrue,我们将在文件名的开头自动添加${transactionId}_

    filename_time_format [string]

    仅在 custom_filenametrue 时使用

    file_name_expression 参数中的格式为 xxxx-${now} 时,filename_time_format 可以指定路径的时间格式,默认值为 yyyy.MM.dd。常用的时间格式列于下表中:

    符号描述
    y
    M
    d月中的天数
    H一天中的小时 (0-23)
    m小时中的分钟
    s分钟中的秒数

    file_format_type [string]

    我们支持以下文件类型:

    • 文本 (text)
    • JSON
    • CSV
    • ORC
    • Parquet
    • Excel

    请注意,最终文件名将以文件格式的后缀结尾,文本文件的后缀是 txt

    field_delimiter [string]

    数据行中列之间的分隔符。仅在 file_format 为 text 时需要。

    row_delimiter [string]

    文件中行之间的分隔符。仅在 file_format 为 text 时需要。

    have_partition [boolean]

    是否需要处理分区。

    partition_by [array]

    仅在 have_partitiontrue 时使用。

    基于选定字段对分区数据进行分区。

    partition_dir_expression [string]

    仅在 have_partitiontrue 时使用。

    如果指定了 partition_by,我们将根据分区信息生成相应的分区目录,并将最终文件放在分区目录中。

    默认的 partition_dir_expression${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/k0 是第一个分区字段,v0 是第一个分区字段的值。

    is_partition_field_write_in_file [boolean]

    仅在 have_partitiontrue 时使用。

    如果 is_partition_field_write_in_filetrue,分区字段及其值将写入数据文件中。

    例如,如果您想要写入 Hive 数据文件,其值应为 false

    sink_columns [array]

    需要写入文件的哪些列,默认值为从 "Transform" 或 "Source" 获取的所有列。 字段的顺序决定了实际写入文件的顺序。

    is_enable_transaction [boolean]

    如果 is_enable_transaction 为 true,我们将确保在写入目标目录时数据不会丢失或重复。

    请注意,如果 is_enable_transactiontrue,我们将在文件头部自动添加 ${transactionId}_

    目前仅支持 true

    batch_size [int]

    文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_sizecheckpoint.interval 共同决定。如果 checkpoint.interval 的值足够大,当文件中的行数大于 batch_size 时,写入器将写入文件。如果 checkpoint.interval 较小,则在新的检查点触发时,写入器将创建一个新文件。

    compress_codec [string]

    文件的压缩编解码器及其支持的详细信息如下:

    • txt: lzo none
    • JSON: lzo none
    • CSV: lzo none
    • ORC: lzo snappy lz4 zlib none
    • Parquet: lzo snappy lz4 gzip brotli zstd none

    提示:Excel 类型不支持任何压缩格式。

    常见选项

    请参考 Sink Common Options 获取 Sink 插件的常见参数详细信息。

    max_rows_in_memory [int]

    当文件格式为 Excel 时,可以缓存在内存中的数据项的最大数量。

    sheet_name [string]

    工作簿的工作表名称。

    示例

    对于文本文件格式,具有 have_partitioncustom_filenamesink_columnscom.amazonaws.auth.InstanceProfileCredentialsProvider 的配置示例:

      S3File {
        bucket = "s3a://seatunnel-test"
        tmp_path = "/tmp/seatunnel"
        path="/seatunnel/text"
        fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
        fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
        file_format_type = "text"
        field_delimiter = "\t"
        row_delimiter = "\n"
        have_partition = true
        partition_by = ["age"]
        partition_dir_expression = "${k0}=${v0}"
        is_partition_field_write_in_file = true
        custom_filename = true
        file_name_expression = "${transactionId}_${now}"
        filename_time_format = "yyyy.MM.dd"
        sink_columns = ["name","age"]
        is_enable_transaction=true
        hadoop_s3_properties {
          "fs.s3a.buffer.dir" = "/data/st_test/s3a"
          "fs.s3a.fast.upload.buffer" = "disk"
        }
      }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    对于 Parquet 文件格式,仅需用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider进行配置:

      S3File {
        bucket = "s3a://seatunnel-test"
        tmp_path = "/tmp/seatunnel"
        path="/seatunnel/parquet"
        fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
        fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
        access_key = "xxxxxxxxxxxxxxxxx"
        secret_key = "xxxxxxxxxxxxxxxxx"
        file_format_type = "parquet"
        hadoop_s3_properties {
          "fs.s3a.buffer.dir" = "/data/st_test/s3a"
          "fs.s3a.fast.upload.buffer" = "disk"
        }
      }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    对于 orc 文件仅需配置 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

      S3File {
        bucket = "s3a://seatunnel-test"
        tmp_path = "/tmp/seatunnel"
        path="/seatunnel/orc"
        fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
        fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
        access_key = "xxxxxxxxxxxxxxxxx"
        secret_key = "xxxxxxxxxxxxxxxxx"
        file_format_type = "orc"
      }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    更新日志

    2.3.0-beta 2022-10-20

    • 添加 S3File Sink 连接器

      2.3.0 2022-12-30

    • Bug修复

      • 修复了以下导致数据写入文件失败的错误:
        • 当上游字段为空时会抛出 NullPointerException
        • Sink 列映射失败
        • 从状态中恢复写入器时直接获取事务失败 (3258)
    • 功能

      • 支持 S3A 协议 (3632)
        • 允许用户添加额外的 Hadoop-S3 参数
        • 允许使用 S3A 协议
        • 解耦 Hadoop-AWS 依赖
      • 支持设置每个文件的批处理大小 (3625)
      • 设置 S3 AK 为可选项 (3688)

    下一版本

  • 相关阅读:
    【虹科技术】OPC UA技术,实现设备控制与互连未来
    windows 平台下编译openssl 最新版本-3.0.5
    Python计算目标检测中的IoU
    三西格玛和六西格玛区别是什么?优思学院用一幅图告诉你
    【模板】差分
    5.vue3项目(五):实现顶部导航栏功能:导航栏静态搭建,菜单折叠功能实现,面包屑动态展示路径,刷新页面功能,全屏功能
    视图、储存过程、函数 e3
    Writesonic:博客和内容创作者的终极写作助手
    SQL 的优化
    Flask学习(二):flask模板渲染
  • 原文地址:https://blog.csdn.net/weixin_54625990/article/details/133135535