目录
createReader(SourceReaderContext readerContext)
createEnumerator(SplitEnumeratorContext enumContext)
SimpleVersionedSerializer getSplitSerializer()
SimpleVersionedSerializer getEnumeratorCheckpointSerializer()
Flink的Source主要是由3个核心部分组成:Splits,SplitEnumerator,SourceReader。

(放一下官网的图。。。)
Flink作为批流一体的架构,Data Source API支持数据文件是无界流或者是有界批文件。
对于有界批文件来说,enumerator会产生一系列的split文件,并且每一个split文件明确是有限大小的;而对于无界流来说,则有两种情况 1)splits文件是无限的 2)enumerator不断的产生新的split文件
具体说明一下:
有界文件
Source数据源存在一个URI/Path路径,并且有固定的format去明确如何解析文件。
有界Kafka
同理,只不过每一个split是一个明确的topic分区的end offset。一旦sourceReader达到了end offset,就会完成这个split文件的读取。当所有的split文件完成后,sourceReader就会结束。
无界文件流
无界的情况下,将永远不会产生 NoMoreSplits 的标志,会周期性监控URI/Path路径下是否会产生新的文件。一旦产生了新文件则会生成新的split切片并分发给可用的sourcereaders。
无界Kafka
Source数据源是一个Kafka的Topic文件,或者是一系列Topic/Topic正则。
在1.14-1.15版本的时候source api是一个工厂模式的接口,用于创建以下的组件。
除此之外,Source 还提供了 Boundedness 的特性,从而使得 Flink 可以选择合适的模式来运行 Flink 任务。
Source 实现应该是可序列化的,因为 Source 实例会在运行时被序列化并上传到 Flink 集群。
接下来看看source的源码
- import org.apache.flink.annotation.Public;
- import org.apache.flink.core.io.SimpleVersionedSerializer;
-
- import java.io.Serializable;
-
- /**
- * The interface for Source. It acts like a factory class that helps construct the {@link
- * SplitEnumerator} and {@link SourceReader} and corresponding serializers.
- *
- * @param
The type of records produced by the source. - * @param
The type of splits handled by the source. - * @param
The type of the enumerator checkpoints. - */
- // 在flink1.16之后,source的接口变为public interface Source
extends SourceReaderFactory - @Public
- public interface Source
extends SourceSplit, EnumChkT> extends Serializable { -
- /**
- * Get the boundedness of this source.
- *
- * @return the boundedness of this source.
- */
- Boundedness getBoundedness();
-
- /**
- * Creates a new reader to read data from the splits it gets assigned. The reader starts fresh
- * and does not have any state to resume.
- *
- * @param readerContext The {@link SourceReaderContext context} for the source reader.
- * @return A new SourceReader.
- * @throws Exception The implementor is free to forward all exceptions directly. Exceptions
- * thrown from this method cause task failure/recovery.
- */
- SourceReader
createReader(SourceReaderContext readerContext) throws Exception; -
- /**
- * Creates a new SplitEnumerator for this source, starting a new input.
- *
- * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
- * @return A new SplitEnumerator.
- * @throws Exception The implementor is free to forward all exceptions directly. * Exceptions
- * thrown from this method cause JobManager failure/recovery.
- */
- SplitEnumerator
createEnumerator(SplitEnumeratorContext enumContext) - throws Exception;
-
- /**
- * Restores an enumerator from a checkpoint.
- *
- * @param enumContext The {@link SplitEnumeratorContext context} for the restored split
- * enumerator.
- * @param checkpoint The checkpoint to restore the SplitEnumerator from.
- * @return A SplitEnumerator restored from the given checkpoint.
- * @throws Exception The implementor is free to forward all exceptions directly. * Exceptions
- * thrown from this method cause JobManager failure/recovery.
- */
- SplitEnumerator
restoreEnumerator( - SplitEnumeratorContext
enumContext, EnumChkT checkpoint) throws Exception; -
- // ------------------------------------------------------------------------
- // serializers for the metadata
- // ------------------------------------------------------------------------
-
- /**
- * Creates a serializer for the source splits. Splits are serialized when sending them from
- * enumerator to reader, and when checkpointing the reader's current state.
- *
- * @return The serializer for the split type.
- */
- SimpleVersionedSerializer
getSplitSerializer(); -
- /**
- * Creates the serializer for the {@link SplitEnumerator} checkpoint. The serializer is used for
- * the result of the {@link SplitEnumerator#snapshotState()} method.
- *
- * @return The serializer for the SplitEnumerator checkpoint.
- */
- SimpleVersionedSerializer
getEnumeratorCheckpointSerializer(); - }
我们一个一个函数来看,毕竟一堆看上去确实感觉挺头疼的。。。。
主要是返回数据源是否有界,返回类型是Boundedness的枚举类,值只有两个BOUNDED 和 CONTINUOUS_UNBOUNDED。
具体的接口实现有四类(后面的实现都是有四类,这边只讲fileSource相关的,就不会过多介绍了。。。)
- public abstract class AbstractFileSource
extends FileSourceSplit> - implements Source
>, ResultTypeQueryable { - @Override
- public Boundedness getBoundedness() {
- return continuousEnumerationSettings == null
- ? Boundedness.BOUNDED // 有界
- : Boundedness.CONTINUOUS_UNBOUNDED; // 无界
- }
- }
-
- public class DorisSource
implements Source, ResultTypeQueryable { - public Boundedness getBoundedness() {
- return this.boundedness;
- }
- }
-
- public class HybridSource
implements Source { - public Boundedness getBoundedness() {
- return ((HybridSource.SourceListEntry)this.sources.get(this.sources.size() - 1)).boundedness;
- }
- }
-
- public class NumberSequenceSource
- implements Source<
- Long,
- NumberSequenceSource.NumberSequenceSplit,
- Collection
>, - ResultTypeQueryable
{ -
- @Override
- public Boundedness getBoundedness() {
- return Boundedness.BOUNDED;
- }
- }
其中,continuousEnumerationSettings主要的作用是设置轮询时间,多久去对于无界的文件进行扫描。
创建一个全新的source reader去读取分配给到它的splits文件,不包含任何状态恢复,返回接口SourceReader。在flink1.16的版本中已经放在了SourceReaderFactory接口中实现。
- // abstractFileSource中的实现
- @Override
- public SourceReader
createReader(SourceReaderContext readerContext) { - // fileSourceReader是一种读取方式,从FileSourceSplit中读取记录
- return new FileSourceReader<>(
- readerContext, readerFormat, readerContext.getConfiguration());
- }
其中,readerContext是Flink运行时source的上下文;readerFormat是BulkFormat
为这个source创建新的SplitEnumerator,开始一个新的input。
- @Override
- public SplitEnumerator
> createEnumerator( - SplitEnumeratorContext
enumContext) { -
- final FileEnumerator enumerator = enumeratorFactory.create();
-
- // read the initial set of splits (which is also the total set of splits for bounded
- // sources)
- final Collection
splits; - try {
- // TODO - in the next cleanup pass, we should try to remove the need to "wrap unchecked"
- // here
- splits = enumerator.enumerateSplits(inputPaths, enumContext.currentParallelism());
- } catch (IOException e) {
- throw new FlinkRuntimeException("Could not enumerate file splits", e);
- }
-
- return createSplitEnumerator(enumContext, enumerator, splits, null);
- }
其中,enumerator是由FileEnumerator工厂类产生的,这个类主要任务是找到所有需要读取的文件,切分它们成为FileSourceSplit。并且遍历路径的同时会过滤文件(如果有文件不想要读取可以通过名称进行过滤),决定是否切分文件为多个split,如何去切分的。
splits = enumerator.enumerateSplits(inputPaths, enumContext.currentParallelism());
这里则是进行切分split,里面的函数实现主要是通过递归进行遍历path。顺便提一嘴,具体实现是接口FileEnumerator的具体实现NonSplittingRecursiveEnumerator类。
- @Override
- public Collection
enumerateSplits(Path[] paths, int minDesiredSplits) - throws IOException {
- final ArrayList
splits = new ArrayList<>(); -
- for (Path path : paths) {
- final FileSystem fs = path.getFileSystem();
- final FileStatus status = fs.getFileStatus(path);
- addSplitsForPath(status, fs, splits);
- }
-
- return splits;
- }
-
- private void addSplitsForPath(
- FileStatus fileStatus, FileSystem fs, ArrayList
target) - throws IOException {
- if (!fileFilter.test(fileStatus.getPath())) {
- return;
- }
- // 判断是文件还是目录,如果是文件则转化为source split去读取。
- // 比如hdfs的话,就会去获取datanode的host
- if (!fileStatus.isDir()) {
- convertToSourceSplits(fileStatus, fs, target);
- return;
- }
-
- final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
- for (FileStatus containedStatus : containedFiles) {
- // 递归遍历文件目录
- addSplitsForPath(containedStatus, fs, target);
- }
- }
最后createSplitEnumerator这个函数则是去根据是有界数据还是无界数据进行划分,如果无界数据存在alreadyProcessedPaths也会直接去划分split,如果alreadyProcessedPaths为空,才会去周期性的监控路径是否产生新文件。(后续再讲。。。)
- private SplitEnumerator
> createSplitEnumerator( - SplitEnumeratorContext
context, - FileEnumerator enumerator,
- Collection
splits, - @Nullable Collection
alreadyProcessedPaths) { -
- // cast this to a collection of FileSourceSplit because the enumerator code work
- // non-generically just on that base split type
- @SuppressWarnings("unchecked")
- final SplitEnumeratorContext
fileSplitContext = - (SplitEnumeratorContext
) context; -
- final FileSplitAssigner splitAssigner = assignerFactory.create(splits);
-
- if (continuousEnumerationSettings == null) {
- // bounded case
- return castGeneric(new StaticFileSplitEnumerator(fileSplitContext, splitAssigner));
- } else {
- // unbounded case
- if (alreadyProcessedPaths == null) {
- alreadyProcessedPaths = splitsToPaths(splits);
- }
-
- return castGeneric(
- new ContinuousFileSplitEnumerator(
- fileSplitContext,
- enumerator,
- splitAssigner,
- inputPaths,
- alreadyProcessedPaths,
- continuousEnumerationSettings.getDiscoveryInterval().toMillis()));
- }
- }
-
- @SuppressWarnings("unchecked")
- private SplitEnumerator
> castGeneric( - final SplitEnumerator
> - enumerator) {
-
- // cast arguments away then cast them back. Java Generics Hell :-/
- return (SplitEnumerator
>) - (SplitEnumerator, ?>) enumerator;
- }
-
- private static Collection
splitsToPaths(Collection splits) { - return splits.stream()
- .map(FileSourceSplit::path)
- .collect(Collectors.toCollection(HashSet::new));
- }
主要是通过一个checkpoint去恢复一个枚举器。最后调用的函数与createEnumerator只是多了一个checkpoint.getAlreadyProcessedPaths()参数传递。
- @Override
- public SplitEnumerator
> restoreEnumerator( - SplitEnumeratorContext
enumContext, - PendingSplitsCheckpoint
checkpoint) { -
- final FileEnumerator enumerator = enumeratorFactory.create();
-
- // cast this to a collection of FileSourceSplit because the enumerator code work
- // non-generically just on that base split type
- @SuppressWarnings("unchecked")
- final Collection
splits = - (Collection
) checkpoint.getSplits(); -
- return createSplitEnumerator(
- enumContext, enumerator, splits, checkpoint.getAlreadyProcessedPaths());
- }
主要是为source splits创建一个序列化器,在splits从enumerator到reader的时候或者是当reader进行checkpoint的时候执行。
- @Override
- public SimpleVersionedSerializer
getSplitSerializer() { - return FileSourceSplitSerializer.INSTANCE;
- }
-
-
- @PublicEvolving
- public final class FileSourceSplitSerializer implements SimpleVersionedSerializer
{ -
- public static final FileSourceSplitSerializer INSTANCE = new FileSourceSplitSerializer();
获取SplitEnumerator checkpoint的序列化器,用于处理SplitEnumerator#snapshotState()方法返回的结果
- @Override
- public SimpleVersionedSerializer
> - getEnumeratorCheckpointSerializer() {
- return new PendingSplitsCheckpointSerializer<>(getSplitSerializer());
- }
以上就是Source的接口的所有方法,主要包含创建 SourceReader 、 SplitEnumerator 和对应get序列化器的方法。
目前可以看出,Souce接口的更新,其实是因为Flink在1.12之前将批处理任务与流处理任务分为两种实现模式。
在底层实现中
DataSet API中Source对应的核心借口是InputFormat,功能上主要有三点:
1、3两点主要会被JobManager/JobMaster在调度Exection时使用,而第2点读取数据功能则会在运行时被TaskManager使用。
DataStream API中 Source 对应的核心接口为 SourceFunction 以及 SourceContext。前者直接继承 Function 接口与 Operator 交互,负责通用的状态管理(比如初始化或取消);后者代表运行时的上下文,负责与单条记录级别的数据的交互。此外还有其他一些辅助类型的类或接口。
运行时,Source 主要通过 SourceContext 来控制数据的输出。从 SourceContext 接口的方法即可以看出,Source 在接受到数据后的主要工作有以下几点:
综上所述,之前的 Source 接口并不能很好的满足批流一体的发展,所以在 FLIP-27中选择重构Source接口,新接口的核心是通过 SplitEnumerator 和 SplitReader,前者负责发现和分配 Split、触发 Checkpoint 等管理工作,后者负责 Split 的实际读取处理。此外,新增 Operator 间的通信机制(复用大部分现有的 RPC 机制),让 Source Subtask 之间可以协调完成 Event Time 对齐等新特性。最后, SplitReader 底层封装了通用的线程模型,相比之前的 SourceFunction 大大简化了 Source 的实现。