接着上一部分,在读取过程中StreamingMonitorFunction的monitorAndForwardSplits()方法如何获取到增量FlinkInputSplit
FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(this.table, newScanContext, this.workerPool);
调用FlinkSplitPlanner的planInputSplits方法:
static FlinkInputSplit[] planInputSplits(Table table, ScanContext context, ExecutorService workerPool) {
try {
CloseableIterable tasksIterable = planTasks(table, context, workerPool);
Throwable var4 = null;
FlinkInputSplit[] var8;
try {
List tasks = Lists.newArrayList(tasksIterable);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
boolean exposeLocality = context.exposeLocality();
Tasks.range(tasks.size()).stopOnFailure().executeWith(exposeLocality ? workerPool : null).run((index) -> {
CombinedScanTask task = (CombinedScanTask)tasks.get(index);
String[] hostnames = null;
if (exposeLocality) {
hostnames = Util.blockLocations(table.io(), task);
}
splits[index] = new FlinkInputSplit(index, task, hostnames);
});
var8 = splits;
} catch (Throwable var13) {
var4 = var13;
throw var13;
} finally {
if (tasksIterable != null) {
$closeResource(var4, tasksIterable);
}
}
return var8;
} catch (IOException var15) {
throw new UncheckedIOException("Failed to process tasks iterable", var15);
}
}
planTasks方法:
static CloseableIterable planTasks(Table table, ScanContext context, ExecutorService workerPool) {
FlinkSplitPlanner.ScanMode scanMode = checkScanMode(context);
if (scanMode == FlinkSplitPlanner.ScanMode.INCREMENTAL_APPEND_SCAN) {
IncrementalAppendScan scan = table.newIncrementalAppendScan();
scan = (IncrementalAppendScan)refineScanWithBaseConfigs(scan, context, workerPool);
if (context.startSnapshotId() != null) {
scan = (IncrementalAppendScan)scan.fromSnapshotExclusive(context.startSnapshotId());
}
if (context.endSnapshotId() != null) {
scan = (IncrementalAppendScan)scan.toSnapshot(context.endSnapshotId());
}
return scan.planTasks();
} else {
TableScan scan = table.newScan();
scan = (TableScan)refineScanWithBaseConfigs(scan, context, workerPool);
if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
}
if (context.asOfTimestamp() != null) {
scan = scan.asOfTime(context.asOfTimestamp());
}
return scan.planTasks();
}
}
此处为核心内容,调用BaseIncrementalAppendScan的planTasks()方法:
public CloseableIterable planTasks() {
//获取到所有数据文件
CloseableIterable fileScanTasks = this.planFiles();
//对大文件进行拆分
CloseableIterable splitFiles = TableScanUtil.splitFiles(fileScanTasks, this.targetSplitSize());
//把多个小文件合并成在一个 CombinedScanTask 中
return TableScanUtil.planTasks(splitFiles, this.targetSplitSize(), this.splitLookback(), this.splitOpenFileCost());
}
通过以下流程来生成输入分片:
其中:
FileScanTask 表示一个输入文件或者一个文件的一部分,在这里实现类是 BaseFileScanTask
CombinedScanTask 表示多个 FileScanTask 组合在一起,实现类是 BaseCombinedScanTask
public CloseableIterable planFiles() {
Long fromSnapshotId = this.context().fromSnapshotId();
Long toSnapshotId = this.context().toSnapshotId();
if (fromSnapshotId == null && toSnapshotId == null && this.table().currentSnapshot() == null) {
return CloseableIterable.empty();
} else {
long toSnapshotIdInclusive = this.toSnapshotIdInclusive();
Long fromSnapshotIdExclusive = this.fromSnapshotIdExclusive(fromSnapshotId, toSnapshotIdInclusive);
if (fromSnapshotIdExclusive != null) {
Listeners.notifyAll(new IncrementalScanEvent(this.table().name(), fromSnapshotIdExclusive, toSnapshotIdInclusive, this.context().rowFilter(), this.table().schema(), false));
} else {
Table var10001 = this.table();
Objects.requireNonNull(var10001);
Snapshot oldestAncestorSnapshot = SnapshotUtil.oldestAncestorOf(toSnapshotIdInclusive, var10001::snapshot);
Listeners.notifyAll(new IncrementalScanEvent(this.table().name(), oldestAncestorSnapshot.snapshotId(), toSnapshotIdInclusive, this.context().rowFilter(), this.table().schema(), true));
}
List snapshots = appendsBetween(this.table(), fromSnapshotIdExclusive, toSnapshotIdInclusive);
return snapshots.isEmpty() ? CloseableIterable.empty() : this.appendFilesFromSnapshots(snapshots);
}
}
确定开始,结束快照,然后调用appendFilesFromSnapshots方法:
private CloseableIterable appendFilesFromSnapshots(List snapshots) {
Set snapshotIds = Sets.newHashSet(Iterables.transform(snapshots, Snapshot::snapshotId));
Set manifests = FluentIterable.from(snapshots).transformAndConcat(Snapshot::dataManifests).filter((manifestFile) -> {
return snapshotIds.contains(manifestFile.snapshotId());
}).toSet();
ManifestGroup manifestGroup = (new ManifestGroup(this.tableOps().io(), manifests)).caseSensitive(this.context().caseSensitive()).select(this.context().returnColumnStats() ? DataTableScan.SCAN_WITH_STATS_COLUMNS : DataTableScan.SCAN_COLUMNS).filterData(this.context().rowFilter()).filterManifestEntries((manifestEntry) -> {
return snapshotIds.contains(manifestEntry.snapshotId()) && manifestEntry.status() == Status.ADDED;
}).specsById(this.tableOps().current().specsById()).ignoreDeleted();
if (this.context().ignoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
}
if (manifests.size() > 1 && (DataTableScan.PLAN_SCANS_WITH_WORKER_POOL || this.context().planWithCustomizedExecutor())) {
manifestGroup = manifestGroup.planWith(this.context().planExecutor());
}
return manifestGroup.planFiles();
}
通过snapshotIds,获取到manifestlist,再构造出一个 ManifestGroup ,让 ManifestGroup 根据 manifest file 来获取输入的文件
public CloseableIterable planFiles() {
LoadingCache residualCache = Caffeine.newBuilder().build((specId) -> {
PartitionSpec spec = (PartitionSpec)this.specsById.get(specId);
Expression filter = this.ignoreResiduals ? Expressions.alwaysTrue() : this.dataFilter;
return ResidualEvaluator.of(spec, (Expression)filter, this.caseSensitive);
});
DeleteFileIndex deleteFiles = this.deleteIndexBuilder.build();
boolean dropStats = ManifestReader.dropStats(this.dataFilter, this.columns);
if (!deleteFiles.isEmpty()) {
this.select(ManifestReader.withStatsColumns(this.columns));
}
Iterable> tasks = this.entries((manifest, entries) -> {
int specId = manifest.partitionSpecId();
PartitionSpec spec = (PartitionSpec)this.specsById.get(specId);
String schemaString = SchemaParser.toJson(spec.schema());
String specString = PartitionSpecParser.toJson(spec);
ResidualEvaluator residuals = (ResidualEvaluator)residualCache.get(specId);
// 将返回的 DataFile 对象, 封装成 BaseFileScanTask
return CloseableIterable.transform(entries, (e) -> {
return new BaseFileScanTask((DataFile)((DataFile)e.file()).copy(!dropStats), deleteFiles.forEntry(e), schemaString, specString, residuals);
});
});
return (CloseableIterable)(this.executorService != null ? new ParallelIterable(tasks, this.executorService, DATA_FILE_SCAN_LIMIT_ENABLE, DATA_FILE_SCAN_LIMIT_SIZE_PROP) : CloseableIterable.concat(tasks));
}
获取 文件信息 的流程:
private Iterable> entries(BiFunction>, CloseableIterable> entryFn) {
LoadingCache evalCache = this.specsById == null ? null : Caffeine.newBuilder().build((specId) -> {
PartitionSpec spec = (PartitionSpec)this.specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(Expressions.and(this.partitionFilter, Projections.inclusive(spec, this.caseSensitive).project(this.dataFilter)), spec, this.caseSensitive);
});
Evaluator evaluator;
if (this.fileFilter != null && this.fileFilter != Expressions.alwaysTrue()) {
evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), this.fileFilter, this.caseSensitive);
} else {
evaluator = null;
}
// 先通过表达是过滤 ManifestFile 文件
Iterable matchingManifests = evalCache == null ? this.dataManifests : Iterables.filter(this.dataManifests, (manifest) -> {
return ((ManifestEvaluator)evalCache.get(manifest.partitionSpecId())).eval(manifest);
});
if (this.ignoreDeleted) {
matchingManifests = Iterables.filter((Iterable)matchingManifests, (manifest) -> {
return manifest.hasAddedFiles() || manifest.hasExistingFiles();
});
}
if (this.ignoreExisting) {
matchingManifests = Iterables.filter((Iterable)matchingManifests, (manifest) -> {
return manifest.hasAddedFiles() || manifest.hasDeletedFiles();
});
}
Predicate var10001 = this.manifestPredicate;
Objects.requireNonNull(var10001);
Iterable matchingManifests = Iterables.filter((Iterable)matchingManifests, var10001::test);
return Iterables.transform(matchingManifests, (manifest) -> {
return new CloseableIterable() {
private CloseableIterable iterable;
public CloseableIterator iterator() {
//读取 ManifestFile
ManifestReader reader = ManifestFiles.read(manifest, ManifestGroup.this.io, ManifestGroup.this.specsById).filterRows(ManifestGroup.this.dataFilter).filterPartitions(ManifestGroup.this.partitionFilter).caseSensitive(ManifestGroup.this.caseSensitive).select(ManifestGroup.this.columns);
//解析出 DataFile 对象
CloseableIterable entries = reader.entries();
if (ManifestGroup.this.ignoreDeleted) {
entries = reader.liveEntries();
}
if (ManifestGroup.this.ignoreExisting) {
entries = CloseableIterable.filter(entries, (entry) -> {
return entry.status() != Status.EXISTING;
});
}
//对 DataFile 做裁剪
if (evaluator != null) {
entries = CloseableIterable.filter(entries, (entry) -> {
return evaluator.eval((GenericDataFile)entry.file());
});
}
entries = CloseableIterable.filter(entries, ManifestGroup.this.manifestEntryPredicate);
this.iterable = (CloseableIterable)entryFn.apply(manifest, entries);
return this.iterable.iterator();
}
public void close() throws IOException {
if (this.iterable != null) {
this.iterable.close();
}
}
};
});
}
通过一系列操作,获取到了包含数据文件信息的 DataFile 对象,将其封装在 CombinedScanTask 进行返回。
public static CloseableIterable splitFiles(CloseableIterable tasks, long splitSize) {
Preconditions.checkArgument(splitSize > 0L, "Invalid split size (negative or 0): %s", splitSize);
Iterable splitTasks = FluentIterable.from(tasks).transformAndConcat((input) -> {
return input.split(splitSize);
});
return CloseableIterable.combine(splitTasks, tasks);
}
调用BaseFileScanTask的split方法:
public Iterable split(long targetSplitSize) {
if (this.file.format().isSplittable()) {
return this.file.splitOffsets() != null ? () -> {
return new BaseFileScanTask.OffsetsAwareTargetSplitSizeScanTaskIterator(this.file.splitOffsets(), this);
} : () -> {
return new BaseFileScanTask.FixedSizeSplitScanTaskIterator(targetSplitSize, this);
};
} else {
return ImmutableList.of(this);
}
}
生成OffsetsAwareTargetSplitSizeScanTaskIterator对象:
OffsetsAwareTargetSplitSizeScanTaskIterator(List offsetList, FileScanTask parentScanTask) {
this.offsets = ImmutableList.copyOf(offsetList);
this.parentScanTask = parentScanTask;
this.splitSizes = Lists.newArrayListWithCapacity(this.offsets.size());
if (this.offsets.size() > 0) {
int lastIndex = this.offsets.size() - 1;
for(int index = 0; index < lastIndex; ++index) {
this.splitSizes.add((Long)this.offsets.get(index + 1) - (Long)this.offsets.get(index));
}
this.splitSizes.add(parentScanTask.length() - (Long)this.offsets.get(lastIndex));
}
}
此处定义拆分个数splitSizes和offsets,然后迭代时生成对象:
public FileScanTask next() {
if (!this.hasNext()) {
throw new NoSuchElementException();
} else {
int offsetIdx = this.sizeIdx;
long currentSize = (Long)this.splitSizes.get(this.sizeIdx);
++this.sizeIdx;
return new BaseFileScanTask.SplitScanTask((Long)this.offsets.get(offsetIdx), currentSize, this.parentScanTask);
}
}
public static CloseableIterable planTasks(CloseableIterable splitFiles, long splitSize, int lookback, long openFileCost) {
Preconditions.checkArgument(splitSize > 0L, "Invalid split size (negative or 0): %s", splitSize);
Preconditions.checkArgument(lookback > 0, "Invalid split planning lookback (negative or 0): %s", lookback);
Preconditions.checkArgument(openFileCost >= 0L, "Invalid file open cost (negative): %s", openFileCost);
Function weightFunc = (file) -> {
return Math.max(file.length() + file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(), (long)(1 + file.deletes().size()) * openFileCost);
};
return CloseableIterable.transform(CloseableIterable.combine(new PackingIterable(splitFiles, splitSize, lookback, weightFunc, true), splitFiles), BaseCombinedScanTask::new);
}
此处生成PackingIterable对象:
public static class PackingIterable implements Iterable> {
private final Iterable iterable;
private final long targetWeight;
private final int lookback;
private final Function weightFunc;
private final boolean largestBinFirst;
public PackingIterable(Iterable iterable, long targetWeight, int lookback, Function weightFunc) {
this(iterable, targetWeight, lookback, weightFunc, false);
}
public PackingIterable(Iterable iterable, long targetWeight, int lookback, Function weightFunc, boolean largestBinFirst) {
Preconditions.checkArgument(lookback > 0, "Bin look-back size must be greater than 0: %s", lookback);
this.iterable = iterable;
this.targetWeight = targetWeight;
this.lookback = lookback;
this.weightFunc = weightFunc;
this.largestBinFirst = largestBinFirst;
}
public Iterator> iterator() {
return new BinPacking.PackingIterator(this.iterable.iterator(), this.targetWeight, this.lookback, this.weightFunc, this.largestBinFirst);
}
}
其在迭代过程中又生成PackingIterator对象,执行小文件合并:
private static class PackingIterator implements Iterator> {
private final Deque> bins;
private final Iterator items;
private final long targetWeight;
private final int lookback;
private final Function weightFunc;
private final boolean largestBinFirst;
private PackingIterator(Iterator items, long targetWeight, int lookback, Function weightFunc, boolean largestBinFirst) {
this.bins = Lists.newLinkedList();
this.items = items;
this.targetWeight = targetWeight;
this.lookback = lookback;
this.weightFunc = weightFunc;
this.largestBinFirst = largestBinFirst;
}
public boolean hasNext() {
return this.items.hasNext() || !this.bins.isEmpty();
}
public List next() {
while(true) {
if (this.items.hasNext()) {
T item = this.items.next();
// 得出权重,可以认为文件size,小文件为4MB
long weight = (Long)this.weightFunc.apply(item);
// 遍历,找到可以放置该文件的箱子,箱子最大为128MB
BinPacking.Bin bin = this.findBin(weight);
// 若能找到,将该文件放置到该箱子中
if (bin != null) {
bin.add(item, weight);
continue;
}
// 若不能找到合适的箱子,创建一个新的箱子
bin = this.newBin();
bin.add(item, weight);
this.bins.addLast(bin);
// 判断当前箱子的总数,若小于10个,继续;若大于10个,找到最大的箱子,将箱子输出
if (this.bins.size() <= this.lookback) {
continue;
}
BinPacking.Bin binToRemove;
if (this.largestBinFirst) {
binToRemove = removeLargestBin(this.bins);
} else {
binToRemove = (BinPacking.Bin)this.bins.removeFirst();
}
return ImmutableList.copyOf(binToRemove.items());
}
if (this.bins.isEmpty()) {
throw new NoSuchElementException();
}
// 输出第一个箱子
return ImmutableList.copyOf(((BinPacking.Bin)this.bins.removeFirst()).items());
}
}
private BinPacking.Bin findBin(long weight) {
Iterator var3 = this.bins.iterator();
BinPacking.Bin bin;
do {
if (!var3.hasNext()) {
return null;
}
bin = (BinPacking.Bin)var3.next();
} while(!bin.canAdd(weight));
return bin;
}
private BinPacking.Bin newBin() {
return new BinPacking.Bin(this.targetWeight);
}
private static BinPacking.Bin removeLargestBin(Collection> bins) {
BinPacking.Bin maxBin = (BinPacking.Bin)Collections.max(bins, Comparator.comparingLong(BinPacking.Bin::weight));
if (bins.remove(maxBin)) {
return maxBin;
} else {
throw new NoSuchElementException();
}
}
}
参考: