在 Flink 中,数据源(Source)是其中一个核心组件,负责从各种来源读取数据供 Flink 程序处理。
一个数据 source 包括三个核心组件:分片(Splits)、分片枚举器(SplitEnumerator) 以及 源阅读器(SourceReader)。
Flink中的数据源(Data Source)是Flink作业的起点,它可以从各种数据来源获取数据,例如文件系统、消息队列、数据库等。数据源可以是内置的(如基于集合、文件、Socket等),也可以是自定义的,或者是通过第三方连接器(如Kafka、RabbitMQ等)获取的。
Flink Data Source的原理主要涉及三个核心组件:分片、分片枚举器和源阅读器。数据源通过这三个组件的协作,实现了从各种数据源中读取数据,并将数据分配给相应的处理任务进行处理。通过这种方式,Flink可以高效地处理来自不同来源的大量实时数据。
Source API 是一个工厂模式的接口,用于创建以下组件。
除此之外,Source 还提供了 Boundedness 的特性,从而使得 Flink 可以选择合适的模式来运行 Flink 任务。
Source 实现应该是可序列化的,因为 Source 实例会在运行时被序列化并上传到 Flink 集群。
SplitEnumerator是数据源(Source)架构中的一个关键组件,它负责生成和管理数据分片(Splits),并将这些分片分配给SourceReader进行并行处理。以下是对SplitEnumerator的详细解释,基于参考文章的内容进行组织:
Flink的SourceReader是数据源架构中的一个关键组件,主要负责从SplitEnumerator获取分片(Split),并读取和处理这些分片中的数据。SourceReader是并行运行在Task Managers的source算子中,并且产出并行的时间流/记录流。
SourceReader提供了一个拉动式(pull-based)处理接口,Flink任务会在循环中不断调用pollNext(ReaderOutput)轮询来自SourceReader的记录。pollNext(ReaderOutput)方法的返回值指示SourceReader的状态,可能包括以下几种:
为了提高性能,SourceReader可以在一次pollNext()调用中返回多条记录。然而,除非有必要,SourceReader的实现应该避免在一次pollNext(ReaderOutput)的调用中发送多个记录,因为对SourceReader轮询的任务线程工作在一个事件循环(event-loop)中,且不能阻塞。
Flink的SourceReader是数据源架构中的核心组件,负责从SplitEnumerator获取分片,读取并处理这些分片中的数据。通过拉动式处理接口和与Task Managers的交互,SourceReader能够高效地处理大量实时数据,并产出并行的时间流/记录流供后续处理使用。
核心的 SourceReader API 是完全异步的, 但实际上,大多数 Sources 都会使用阻塞的操作,例如客户端(如 KafkaConsumer)的 poll() 阻塞调用,或者分布式文件系统(HDFS, S3等)的阻塞I/O操作。为了使其与异步 Source API 兼容,这些阻塞(同步)操作需要在单独的线程中进行,并在之后将数据提交给 reader 的异步线程。
在 Flink 的数据源实现中,数据读取通常涉及以下组件和概念:
核心是上面提到的 SourceReaderBase 类,其使用 SplitReader 并创建提取器(fetcher)线程来运行 SplitReader,该实现支持不同的线程处理模型。
SplitReader API 只有以下三个方法:
SplitReader 仅需要关注从外部系统读取记录,因此比 SourceReader 简单得多。 请查看这个类的 Java 文档以获得更多细节。
常见的 SourceReader 实现方式如下:
为了减少开发新的 SourceReader 所需的工作,Flink 提供了 SourceReaderBase 类作为 SourceReader 的基本实现。 SourceReaderBase 已经实现了上述需求。要重新编写新的 SourceReader,只需要让 SourceReader 继承 SourceReaderBase,而后完善一些方法并实现 SplitReader 。
SourceReaderBase 支持几个开箱即用(out-of-the-box)的线程模型,取决于 SplitFetcherManager 的行为模式。 SplitFetcherManager 创建和维护一个分片提取器(SplitFetchers)池,同时每个分片提取器使用一个 SplitReader 进行提取。它还决定如何分配分片给分片提取器。
例如,如下所示,一个 SplitFetcherManager 可能有固定数量的线程,每个线程对分配给 SourceReader 的一些分片进行抓取。
// 这不是 Flink API 的一部分,只是示例
class FileSplit {
private String filePath;
private long startOffset;
private long endOffset;
// 构造函数、getter 和 setter 等...
}
// 这不是 Flink API 的一部分,只是示例
class FileSplitEnumerator implements SplitEnumerator<FileSplit, Void> {
// ... 省略其他实现细节 ...
@Override
public void start() {
// 启动时初始化分片,例如从文件系统中扫描文件
}
@Override
public CompletableFuture<SplitEnumeration<FileSplit>> getSplitsForSubtask(int subtaskId) {
// 根据 subtaskId 分配分片
// 这里只是一个简单的示例,实际中可能需要根据并行度等条件分配
List<FileSplit> splits = ...; // 从某处获取分片列表
return CompletableFuture.completedFuture(new ListSplitEnumeration<>(splits));
}
// ... 其他必要的方法实现 ...
}
// 这不是 Flink API 的一部分,只是示例
class FileSourceReader implements Runnable {
private final FileSplit split;
private final SourceOutput<String> sourceOutput; // 假设这是 Flink 提供的输出接口
public FileSourceReader(FileSplit split, SourceOutput<String> sourceOutput) {
this.split = split;
this.sourceOutput = sourceOutput;
}
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(new FileReader(split.getFilePath()))) {
reader.skip(split.getStartOffset()); // 跳过之前的数据
String line;
while ((line = reader.readLine()) != null && reader.skipBytes(0) < split.getEndOffset()) {
// 将读取到的数据发送到 Flink 的 SourceOutput
sourceOutput.collect(line);
}
} catch (IOException e) {
// 处理异常
}
}
}
// 这才是 Flink 的 API 部分
public class FileSourceFunction implements SourceFunction<String> {
private transient FileSplitEnumerator splitEnumerator;
private transient List<FileSourceReader> readers = new ArrayList<>();
@Override
public void run(SourceContext<String> ctx) throws Exception {
splitEnumerator = new FileSplitEnumerator();
splitEnumerator.start();
// 假设我们只有一个并行度,简单起见
SplitEnumeration<FileSplit> splits = splitEnumerator.getSplitsForSubtask(0).get();
for (FileSplit split : splits) {
FileSourceReader reader = new FileSourceReader(split, ctx::collect);
new Thread(reader).start(); // 注意:这里仅为示例,实际中应使用 Flink 的执行模型
readers.add(reader);
}
// 等待所有 reader 完成(这里只是概念性示例,实际中可能更复杂)
for (FileSourceReader reader : readers) {
// 假设有某种机制可以等待 reader 完成
}
// 通知 Flink 数据源已经完成
ctx.close();
}
// ... 其他必要的方法实现 ...
}
提示:上述代码只是为了解释如何在 Flink 数据源内部实现类似于 SplitReader 的逻辑。在真实的 Flink 数据源实现中,需要使用 Flink 提供的 SourceFunction API 和相关的上下文(如 SourceContext)来正确地与 Flink 运行时集成。