• Flink—— Data Source 介绍


    Data Source 简介

            Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。

            Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加数据来源。

            Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

    Flink Data Source分类

    Flink的数据源可以根据数据的来源和特性进行分类。以下是常见的Flink数据源分类:

    集合数据源

            集合数据源(Collection Data Source):集合数据源指的是将本地的集合或数组作为输入数据的数据源。在Flink中,可以使用fromCollection、fromElements等方法将Java或Scala中的集合数据转化为数据流进行处理。

    1、fromCollection(Collection) - 从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。

    2、fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。

    3、fromElements(T …) - 从给定的对象序列中创建数据流。所有对象类型必须相同。

    4、fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。

    5、generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。

    1. import org.apache.flink.api.java.ExecutionEnvironment;
    2. import org.apache.flink.api.java.DataSet;
    3. import java.util.Arrays;
    4. import java.util.List;
    5. public class CollectionDataSourceExample {
    6. public static void main(String[] args) throws Exception {
    7. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    8. // 创建一个包含整数的集合
    9. List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
    10. // 将集合转化为Flink的DataSet
    11. DataSet<Integer> dataset = env.fromCollection(data);
    12. // 打印数据集中的元素
    13. dataset.print();
    14. }
    15. }

    关于使用集合数据源的注意事项:

    1. 数据规模:集合数据源适用于小规模数据集。确保你的数据集在内存中能够合理存放,不至于导致内存溢出。

    2. 内存消耗:集合数据源会将所有数据存储在内存中,因此需要谨慎处理大型数据集,避免对内存资源造成过大压力。

    3. 并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高作业的执行效率。

    4. 调试和测试:集合数据源非常适合用于本地调试和测试,可以快速验证处理逻辑并观察输出结果。

    使用集合数据源时需要注意这些方面,以确保作业能够稳定运行并获得良好的性能表现。

    文件数据源

            文件数据源(File Data Source):文件数据源用于从文件系统中读取数据,可以是本地文件系统或分布式文件系统(如HDFS)。Flink提供了readTextFile、readCsvFile等方法来支持常见文件格式的数据读取。

    1、readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。

    2、readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。

    3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。

    1. import org.apache.flink.api.java.ExecutionEnvironment;
    2. import org.apache.flink.api.java.DataSet;
    3. public class FileDataSourceExample {
    4. public static void main(String[] args) throws Exception {
    5. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    6. // 从文件创建数据集
    7. String filePath = "path/to/your/file.txt";
    8. DataSet<String> text = env.readTextFile(filePath);
    9. // 打印文件中的内容
    10. text.print();
    11. }
    12. }

    关于使用文件数据源的注意事项:

    1. 文件路径:确保提供的文件路径是正确的,可以是本地文件系统路径,也可以是HDFS路径或其他支持的文件系统路径。

    2. 文件格式:Flink支持多种文件格式,包括文本文件、CSV文件、Parquet文件等。根据实际情况选择合适的文件格式进行读取。

    3. 并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高文件读取的并行处理能力。

    4. 文件分区:对于大型文件,可以考虑文件分区和并行读取,以加速数据的加载和处理过程。

    5. 文件读取性能:尽量避免频繁的小文件读取操作,因为这会增加文件系统的负担并降低整体性能。

    使用文件数据源时需要注意以上方面,以确保能够有效地读取文件数据,并且提高作业的执行效率。

    Socket数据源

            Socket数据源(Socket Data Source):Socket数据源允许通过网络套接字接收数据,通常用于测试和演示目的。Flink可以使用socketTextStream方法从TCP socket接收数据流。

    socketTextStream(String hostname, int port) - 从 socket 读取。元素可以用分隔符切分。

    1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    2. public class SocketDataSourceExample {
    3. public static void main(String[] args) throws Exception {
    4. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. // 从socket创建数据流
    6. String hostname = "localhost";
    7. int port = 9999;
    8. env.socketTextStream(hostname, port)
    9. .print();
    10. // 执行作业
    11. env.execute("Socket Data Source Example");
    12. }
    13. }

    关于使用Socket数据源的注意事项:

    1. 主机和端口:确保指定的主机和端口是正确的,并且能够与数据源通信。

    2. 网络延迟:由于Socket数据源涉及网络通信,因此可能受到网络延迟的影响。需要考虑网络性能对作业整体性能的影响。

    3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

    4. 数据格式:需要确保从Socket接收到的数据能够被正确解析和处理,例如按行读取文本数据等。

    5. 容错机制:在使用Socket数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

    使用Socket数据源时需要注意以上方面,以确保能够有效地接收数据并提高作业的执行效率。

    自定义数据源

            自定义数据源(Custom Data Source):除了上述内置的数据源外,Flink还支持自定义数据源。用户可以实现自己的SourceFunction接口来定义特定的数据生成逻辑,例如从消息队列、数据库、传感器等实时数据源中读取数据。

    1. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    2. import org.apache.flink.streaming.api.functions.source.SourceFunction;
    3. public class CustomDataSource extends RichParallelSourceFunction<String> {
    4. private boolean running = true;
    5. @Override
    6. public void run(SourceContext<String> ctx) throws Exception {
    7. while (running) {
    8. // 生成数据
    9. String data = generateData();
    10. // 发射数据
    11. ctx.collect(data);
    12. // 控制数据生成频率
    13. Thread.sleep(1000);
    14. }
    15. }
    16. @Override
    17. public void cancel() {
    18. running = false;
    19. }
    20. private String generateData() {
    21. // 实现自定义的数据生成逻辑
    22. return "some data";
    23. }
    24. }

            在这个示例中,我们创建了一个名为CustomDataSource的类,它继承自RichParallelSourceFunction并指定了数据类型为String。在run方法中,我们使用一个循环来生成数据并通过collect方法将数据发射出去。在cancel方法中,我们设置了一个标志位来控制数据源的运行状态。

    关于使用自定义数据源的注意事项:

    1. 并行度设置:根据数据源的性质和数据量合理地设置并行度,以充分利用集群资源。

    2. 数据生成频率:确保数据生成的频率和速度能够适应作业的处理能力,避免数据源产生过快导致作业无法及时处理。

    3. 容错机制:在自定义数据源中,需要考虑作业的容错机制,例如在发生故障时如何正确处理和恢复。

    4. 数据格式:确保从自定义数据源产生的数据能够被正确解析和处理,符合作业的输入要求。

    5. 资源管理:需要确保自定义数据源的资源占用和生命周期管理,避免资源泄露或过度占用资源。

    使用自定义数据源时需要考虑以上方面,并确保能够有效地产生数据并提高作业的执行效率。

    Apache Kafka数据源

            Apache Kafka数据源(Kafka Data Source):作为流数据处理框架,Flink对Kafka提供了良好的集成支持。可以使用addSource方法结合Flink的Kafka Connector来从Kafka主题中读取数据。

    1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    2. import org.apache.flink.streaming.api.datastream.DataStream;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    5. import java.util.Properties;
    6. public class KafkaDataSourceExample {
    7. public static void main(String[] args) throws Exception {
    8. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. // Kafka配置
    10. Properties properties = new Properties();
    11. properties.setProperty("bootstrap.servers", "localhost:9092");
    12. properties.setProperty("group.id", "flink-consumer-group");
    13. // 创建Kafka数据流
    14. FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
    15. DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);
    16. kafkaDataStream.print();
    17. // 执行作业
    18. env.execute("Kafka Data Source Example");
    19. }
    20. }

    在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后设置Kafka的连接配置,包括bootstrap servers和consumer group id等。接下来,我们创建了一个FlinkKafkaConsumer对象,指定了要消费的topic以及数据的序列化方式,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。

    关于使用Kafka数据源的注意事项:

    1. Kafka配置:确保指定的Kafka配置正确,并能够与Kafka集群进行通信。

    2. 序列化方式:根据实际情况选择合适的数据序列化方式,例如SimpleStringSchema、JSON、Avro等。

    3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

    4. 数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。

    5. 容错机制:在使用Kafka数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

    使用Kafka数据源时需要注意以上方面,以确保能够有效地消费Kafka中的数据并提高作业的执行效率。

    Apache Pulsar数据源

            Apache Pulsar数据源(Pulsar Data Source):类似于Kafka,Flink也集成了对Pulsar的支持,可以直接从Pulsar主题中读取数据。

    1. import org.apache.flink.streaming.api.datastream.DataStream;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
    4. import org.apache.pulsar.client.api.Schema;
    5. import org.apache.pulsar.client.api.PulsarClientException;
    6. public class PulsarDataSourceExample {
    7. public static void main(String[] args) throws Exception {
    8. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. String serviceUrl = "pulsar://localhost:6650";
    10. String topic = "my-topic";
    11. FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>(
    12. serviceUrl,
    13. topic,
    14. Schema.STRING
    15. );
    16. DataStream<String> pulsarDataStream = env.addSource(pulsarSource);
    17. pulsarDataStream.print();
    18. env.execute("Pulsar Data Source Example");
    19. }
    20. }

            在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后指定了Pulsar的连接信息和要消费的topic。接下来,我们创建了一个FlinkPulsarSource对象,并指定了Pulsar的serviceUrl、topic以及数据的Schema,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。

    关于使用Pulsar数据源的注意事项:

    1. Pulsar连接配置:确保指定的Pulsar连接信息正确,并能够与Pulsar集群进行通信。

    2. Schema设置:根据实际情况选择合适的数据Schema,例如STRING、JSON、AVRO等。

    3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

    4. 数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。

    5. 容错机制:在使用Pulsar数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

            使用Pulsar数据源时需要注意以上方面,以确保能够有效地消费Pulsar中的数据并提高作业的执行效率。

            这些不同类型的数据源为Flink应用程序提供了灵活的数据接入方式,使得Flink可以轻松地处理不同来源和格式的数据。根据具体的业务需求和场景特点,可以选择合适的数据源类型来构建流处理和批处理应用程序。

    更多消息资讯,请访问昂焱数据(https://www.ayshuju.com)

  • 相关阅读:
    OpenGL原理与实践——核心模式(一):VBO、VAO等原理解析及项目初始设置
    MySQL 自动补全工具
    抖音矩阵系统源码,抖音矩阵系统源码,抖音矩阵系统源码,抖音矩阵系统源码,抖音矩阵系统源码,抖音矩阵系统源码。
    Day10--初始化uni-app项目
    药物研发检测记录模板-0903不溶性微粒检查法检验原始记录
    用Spring Boot 3.2虚拟线程搭建静态文件服务器有多快?
    编译安装MySQL服务(LAMP2)
    大模型培训 AUTOWEBGLM:自动网页导航智能体
    js array数组json去重
    RPA的数据库自动化操作
  • 原文地址:https://blog.csdn.net/tszc95/article/details/134271466