• 【天衍系列 01】深入理解Flink的 FileSource 组件:实现大规模数据文件处理


    01 基本概念

    Apache Flink 是一个流式处理框架,被广泛应用于大数据领域的实时数据处理和分析任务中。在 Flink 中,FileSource 是一个重要的组件,用于从文件系统中读取数据并将其转换为 Flink 的数据流。本文将深入探讨 FileSource 的工作原理、用法以及与其他数据源的比较。

    02 工作原理

    FileSource 是 Flink 提供的一种用于从文件系统中读取数据的源。它能够处理各种类型的文件,包括文本文件、压缩文件、序列文件等。FileSource 的工作原理可以概括为以下几个步骤:

    1.文件分配(File Assignment)

    在 Flink 集群中,每个任务都会负责读取文件的一个分片。FileSource 会根据文件的大小和数量将文件分配给不同的任务进行处理。

    2.并行读取(Parallel Reading)

    每个任务会并行地读取分配给它的文件分片。这意味着文件中的数据会被同时读取,从而提高了整体的读取速度和处理效率。

    3.数据解析(Data Parsing)

    读取的数据会经过解析器进行解析,将其转换为 Flink 中的数据结构,如 DataSet 或 DataStream。

    4.数据分发(Data Distribution)

    解析后的数据会被分发到后续的算子中进行进一步的处理和分析。

    03 数据流实现

    • 有界流(Bounded Streams)

      有界流是指具有明确结束点的数据流,即数据流在某个时刻会结束,数据量是有限的。例如,从静态文件、数据库或有限数据集中读取的数据流就是有界流。有界流的特点包括:

      • 数据量是有限的,流的结束点是已知的。
      • 可以对整个数据流进行批处理式的分析和处理,因为所有数据都可用且有限。
      • 可以使用批处理算法和优化技术,例如排序、分组聚合等。
    • 无界流(Unbounded Streams)

      无界流是指没有明确结束点的数据流,即数据流会持续不断地产生,数据量可能是无限的。例如,实时传感器数据、日志流、消息队列中的数据等都是无界流。无界流的特点包括:

      • 数据源持续不断地产生数据,流没有明确的结束点。
      • 通常用于实时流式处理,要求系统能够实时处理数据并在流中进行持续的分析和计算。
      • 需要采用流式处理的技术和算法,例如窗口计算、流式聚合、事件时间处理等。
    • 不同数据流实现

      • 创建一个 File Source 时, 默认情况下,Source 为有界/批的模式;

        //创建一个FileSource数据源,并设置为批模式,读取完文件后结束
        final FileSource source = FileSource.forRecordStreamFormat(...)
                .build();
        
        • 1
        • 2
        • 3
      • 设置参数monitorContinuously(Duration.ofMillis(5)) 可以把 Source 设置为持续的流模式

        //创建一个FileSource数据源,并设置为流模式,每隔5分钟检查路径新文件,并读取
        final FileSource source = FileSource.forRecordStreamFormat(...)
                .monitorContinuously(Duration.ofMillis(5))  
                .build();   
        
        • 1
        • 2
        • 3
        • 4

    04 项目实战

    1.FileSource支持多种数据格式数据读取与解析,本期以Text File文件为例展开。
    2.jdk版本11
    3.Flink版本1.18.0
    4.下面是两个简单的示例代码,演示如何在 Flink 中使用 FileSource 读取文件数据

    4.1 项目结构

    在这里插入图片描述

    4.2 maven依赖

    
    
    	org.apache.flink
    	flink-connector-files
    	1.18.0
    
    
    
    
    
        org.apache.flink
        flink-java
        1.18.0
    
    
    
        org.apache.flink
        flink-streaming-scala_2.12
        1.18.0
    
    
    
        org.apache.flink
        flink-clients
        1.18.0
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    4.3 StreamFormat读取文件数据

    • StreamFormat从文件流中读取文件内容。它是最简单的格式实现, 并且提供了许多拆箱即用的特性(如 Checkpoint 逻辑),但是限制了可应用的优化(例如对象重用,批处理等等)。
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.connector.file.src.FileSource;
    import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import java.time.Duration;
    
    /**
     * 描述:
     * flink集成FileSource & forRecordStreamFormat使用 & 流模式
     * StreamFormat:从文件流中读取文件内容。它是最简单的格式实现,
     * 并且提供了许多拆箱即用的特性(如 Checkpoint 逻辑),
     * 但是限制了可应用的优化(例如对象重用,批处理等等)。
     *
     * @author 浅夏的猫
     * @version 1.0.0
     * @date 2024-02-07 15:30:22
     */
    public class FileSourceRecordStreamingJob {
    
        public static void main(String[] args) throws Exception {
    
            // 创建 需要读取的文件路径Path
            Path path = new Path("D:\\flink\\file_source.txt");
    
            // 创建 读取文件的格式函数
            TextLineInputFormat textLineInputFormat = new TextLineInputFormat();
    
            // 创建 FileSource
            FileSource fileSource = FileSource.
                    forRecordStreamFormat(textLineInputFormat, path)
                    //放开注释则使用流模式,每隔5分钟检查是否有新文件否则默认使用批模式
    //                .monitorContinuously(Duration.ofMillis(5))
                    .build();
    
            // 创建 执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 添加 FileSource 到数据流
            env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "FileSource").print();
    
            // 执行任务
            env.execute("FileSourceRecordStreamingJob");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    4.4 BulkFormat读取文件数据

    • BulkFormat从文件中一次读取一批记录,虽然是最 “底层” 的格式实现,但是提供了优化实现的最大灵活性。
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.connector.file.src.FileSource;
    import org.apache.flink.connector.file.src.FileSourceSplit;
    import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
    import org.apache.flink.connector.file.src.reader.BulkFormat;
    import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.time.Duration;
    
    
    /**
     * 描述:flink集成FileSource & forBulkFileFormat使用 & 流模式
     * BulkFormat:从文件中一次读取一批记录。 它虽然是最 “底层” 的格式实现,但是提供了优化实现的最大灵活性。
     *
     * @author 浅夏的猫
     * @version 1.0.0
     * @date 2024-02-07 15:30:22
     */
    public class FileSourceBulkStreamingJob {
    
        public static void main(String[] args) throws Exception {
    
            //创建 批量读取文件的格式函数,其实底层还是通过对单行文件读取
            BulkFormat bulkFormat = new StreamFormatAdapter<>(new TextLineInputFormat());
    
            // 创建 FileSource
            FileSource fileSource = FileSource.
                    forBulkFileFormat(bulkFormat, new Path("D:\\flink\\file_source.txt"))
                    //放开注释则使用流模式,每隔5分钟检查是否有新文件,否则默认使用批模式
    //                .monitorContinuously(Duration.ofMillis(5))
                    .build();
    
            // 创建 执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 添加 FileSource 到数据流
            env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "FileSource").print();
    
            // 执行任务
            env.execute("FileSourceBulkStreamingJob");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    4.5 使用小结

    在上面的示例中,我们使用FileSource方法从指定路径读取文本文件,并将其转换为一个数据流,选择不同的输入格式和解析方式,然后我们调用 print 方法将数据流中的数据打印出来。

    05 数据源比较

    FileSource 是 Flink 中常用的数据源之一,与其他数据源相比,它具有一些优势和劣势,根据实际情况和需求,可以选择不同的数据源来满足任务的要求。

    • 优势

      • 支持读取大规模的文件数据,适用于大数据处理场景。
      • 支持并行读取和处理,能够充分利用集群资源,提高处理效率。
      • 支持多种文件格式和压缩方式,灵活性强。
    • 劣势

      • 对于小文件的处理效率较低,可能会导致资源浪费和性能下降。
      • 无法实时监控文件的变化,需要手动触发重新读取。

    06 总结

    FileSource 是 Apache Flink 中用于读取文件数据的重要组件,它能够高效地处理大规模的文件数据,并提供丰富的功能和灵活的用法。通过深入了解 FileSource 的工作原理和用法,可以更好地利用 Flink 来实现大规模数据文件的处理和分析任务。

    通过以上详细介绍,可以对 Apache Flink 中的 FileSource 有一个全面的了解,从而更好地应用于实际的数据处理项目中

  • 相关阅读:
    Java这些最基础的知识,你还记得多少?
    大屏经典组件:“无限滚动” 从分析到开发
    科技的成就(三十二)
    跨端开发方案之桌面应用小程序
    汇舟问卷:想要挣钱?海外问卷调查不容错过!
    MySQL(五) 数据恢复
    [b01lers2020]Welcome to Earth
    大数据之Set/Map集合
    思维题练习部分
    关于el-upload看这一篇就够了
  • 原文地址:https://blog.csdn.net/weixin_40736233/article/details/136075161