• 6.1、Flink数据写入到文件


    1、前言

    Flink API 提供了FileSink连接器,来帮助我们将数据写出到文件系统中去

    版本说明:java1.8、flink1.17

    官网链接:官网


    2、Format Types - 指定文件格式

    FileSink 支持 Row-encoded 、Bulk-encoded 两种格式写入文件系统

            Row-encoded:文本格式

            Bulk-encoded:Parquet、Avro、SequenceFile、Compress、Orc

    1. Row-encoded sink: FileSink.forRowFormat(basePath, rowEncoder)
    2. Bulk-encoded sink: FileSink.forBulkFormat(basePath, bulkWriterFactory)

    3、桶分配 - 文件分区策略(分目录)

    桶的逻辑定义了如何将数据分配到基本输出目录内的子目录中。(好比Hive中的分区)

    Flink 内置了两种同分配策略:

    • DateTimeBucketAssigner :默认的基于时间的分配器
    • BasePathBucketAssigner :分配所有文件存储在基础路径上(单个全局桶)
    BasePathBucketAssigner - 不会生成子目录
    DateTimeBucketAssigner - 根据时间进行分桶

    代码示例:

    1. // TODO 按照时间进行分桶,每分钟生成一个子目录,目录名称为 yyyy-MM-dd HH-mm
    2. .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH-mm", ZoneId.systemDefault()))
    3. // TODO 当个全局桶,不生成子目录
    4. .withBucketAssigner(new BasePathBucketAssigner())

    4、滚动策略 - 分文件

    滚动策略定义`何时生成新的文件`,可以指定 文件创建时间和文件大小 进行配置

    1. // TODO 文件滚动策略: 文件创建后1分钟 或 大小超过1m 时生成新的文件
    2. .withRollingPolicy(
    3. DefaultRollingPolicy.builder()
    4. .withRolloverInterval(Duration.ofMinutes(1)) // 指定文件持续时间
    5. .withMaxPartSize(new MemorySize(1024 * 1024)) // 指定文件大小
    6. .build()
    7. )

    5、文件命名&生命周期

    Part 文件可以处于以下三种状态中的任意一种:

    1. In-progress :当前正在写入的 Part 文件处于 in-progress 状态
    2. Pending :由于指定的滚动策略)关闭 in-progress 状态文件,并且等待提交
    3. Finished :流模式(STREAMING)下的成功的 Checkpoint 或者批模式(BATCH)下输入结束,文件的 Pending 状态转换为 Finished 状态

    注意:在 STREAMING 模式下使用 FileSink 需要开启 Checkpoint 功能。 Finished状态的文件只在 Checkpoint 成功时生成。如果没有开启 Checkpoint 功能,文件将永远停留在 in-progress 或者 pending 的状态,并且下游系统将不能安全读取该文件数据。

    文件命名策略:

    • In-progress / Pending:prefix-part--.ext.inprogress.uid
    • Finished:prefix-part--.ext

        prefix : 文件名称前缀(默认为空)

      ext :文件名称后缀(默认为空)

      uid :uid 是一个分配给 Subtask 的随机 ID 值

    1. └── 2019-08-25--12
    2. ├── prefix-4005733d-a830-4323-8291-8866de98b582-0.ext
    3. ├── prefix-4005733d-a830-4323-8291-8866de98b582-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    4. ├── prefix-81fc4980-a6af-41c8-9937-9939408a734b-0.ext
    5. └── prefix-81fc4980-a6af-41c8-9937-9939408a734b-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

    代码示例:

    1. // TODO 指定输出文件的名称配置 前缀、后缀
    2. .withOutputFileConfig(
    3. OutputFileConfig.builder()
    4. .withPartPrefix("flink") // 指定前缀
    5. .withPartSuffix("txt") // 指定后缀
    6. .build()
    7. )

    6、这是一个完整的例子

    1. package com.baidu.datastream.sink;
    2. import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    3. import org.apache.flink.configuration.MemorySize;
    4. import org.apache.flink.connector.file.sink.FileSink;
    5. import org.apache.flink.core.fs.Path;
    6. import org.apache.flink.streaming.api.CheckpointingMode;
    7. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    9. import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
    10. import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
    11. import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
    12. import java.time.Duration;
    13. import java.time.ZoneId;
    14. // TODO flink 数据输出到文件系统
    15. public class SinkFiles {
    16. public static void main(String[] args) throws Exception {
    17. // 1.获取执行环境
    18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    19. env.setParallelism(2);
    20. // STREAMING 模式时,必须开启checkpoint,否则文件一直都是 .inprogress
    21. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    22. // 2.指定数据源
    23. DataStreamSource streamSource = env.socketTextStream("localhost", 9999);
    24. // 3.初始化 FileSink 实例
    25. FileSink fileSink = FileSink
    26. // TODO 指定输出方式 行式输出、文件路径、编码
    27. .forRowFormat(new Path("data/output"), new SimpleStringEncoder("UTF-8"))
    28. // TODO 指定输出文件的名称配置 前缀、后缀
    29. .withOutputFileConfig(
    30. OutputFileConfig.builder()
    31. .withPartPrefix("flink") // 指定前缀
    32. .withPartSuffix(".txt") // 指定后缀
    33. .build()
    34. )
    35. // TODO 按照时间进行目录分桶:每分钟生成一个目录,目录格式为 yyyy-MM-dd HH-mm
    36. .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH-mm", ZoneId.systemDefault()))
    37. // TODO 文件滚动策略: 1分钟 或 1m 生成新的文件
    38. .withRollingPolicy(
    39. DefaultRollingPolicy.builder()
    40. .withRolloverInterval(Duration.ofMinutes(1))
    41. .withMaxPartSize(new MemorySize(1024 * 1024))
    42. .build()
    43. )
    44. .build();
    45. streamSource.sinkTo(fileSink);
    46. // 3.触发程序执行
    47. env.execute();
    48. }
    49. }

  • 相关阅读:
    史上最全webpack带你深入了解webpack
    跨境电商做什么产品好?2022速卖通热销品类榜单来袭!
    C盘扩容好帮手——傲梅分区助手
    day02_运算符_if
    从贝叶斯网络到SLAM
    centos7下docker设置新的下载镜像源并调整存放docker下载镜像的仓库位置
    为react项目添加开发/提交规范(前端工程化、eslint、prettier、husky、commitlint、stylelint)
    金仓数据库KingbaseES物理备份恢复命令选项(stanza-upgrade命令)
    【SpringCloud微服务项目实战-mall4cloud项目(5)】——mall4cloud-leaf
    【金九银十必问面试题】站在架构师角度分析问题,如何解决TCC中的悬挂问题
  • 原文地址:https://blog.csdn.net/weixin_42845827/article/details/132835352