本文主要是具体说说Flink中的clean操作的实现
在flink中主要是CleanFunction函数:
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info(String.format("exec clean with instant time %s...", instantTime));
executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish");
}
@Override
public void notifyCheckpointComplete(long l) throws Exception {
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {
executor.execute(() -> {
try {
this.writeClient.waitForCleaningFinish();
} finally {
// ensure to switch the isCleaning flag
this.isCleaning = false;
}
}, "wait for cleaning finish");
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
try {
this.writeClient.startAsyncCleaning();
this.isCleaning = true;
} catch (Throwable throwable) {
// catch the exception to not affect the normal checkpointing
LOG.warn("Error while start async cleaning", throwable);
}
}
}
open函数
writeClient =FlinkWriteClients.createWriteClient(conf, getRuntimeContext())
创建FlinkWriteClient,用于写hudi数据
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
创建一个只有一个线程的线程池,改线程池的主要作用来异步执行hudi写操作
executor.execute(() -> writeClient.clean(instantTime)
异步执行hudi的清理操作,该clean函数的主要代码如下:
if (!tableServicesEnabled(config)) {
return null;
}
final Timer.Context timerContext = metrics.getCleanCtx();
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
HoodieTable table = createTable(config, hadoopConf);
if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
LOG.info("Cleaner started");
// proceed only if multiple clean schedules are enabled or if there are no pending cleans.
if (scheduleInline) {
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
table.getMetaClient().reloadActiveTimeline();
}
}
// Proceeds to execute any requested or inflight clean instances in the timeline
HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElapsedMs" + durationMs);
}
return metadata;
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION,() -> rollbackFailedWrites *
根据配置hoodie.cleaner.policy.failed.writes* 默认是EAGER,也就是在写数据失败的时候,会立即进行这次写失败的数据的清理,在这种情况下,
就不会执行rollbackFailedWrites操作,也就是回滚写失败文件的操作
HoodieTable table = createTable *
创建HoodieFlinkMergeOnReadTable*类型的hudi表,用来做clean等操作
scheduleTableServiceInternal
如果hoodie.clean.allow.multiple为true(默认为true)或者没有正在运行中clean操作,则会生成Clean计划
这里最终调用的是FlinkWriteClient.scheduleCleaning方法,即CleanPlanActionExecutor.execute方法
这里最重要的就是requestClean方法:
CleanPlanner planner = new CleanPlanner<>(context, table, config);
Option earliestInstant = planner.getEarliestCommitToRetain();
List partitionsToClean = planner.getPartitionPathsToClean(earliestInstant)
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
Map>> cleanOpsWithPartitionMeta = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue))
Map> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())))
List partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey)
.collect(Collectors.toList())
return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
planner.getLastCompletedCommitTimestamp(),
config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete)
table.getMetaClient().reloadActiveTimeline()
重新加载timeline,便于过滤出来刚才scheduleTableServiceInternal操作生成的xxxxxxxxxxxxxx.clean.requested的元数据文件
table.clean(context, cleanInstantTime, skipLocking)
真正执行clean的部分,主要是调用CleanActionExecutor.execute的方法,最终调用的是*runPendingClean(table, hoodieInstant)*方法:
HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
return runClean(table, cleanInstant, cleanerPlan);
首先是反序列化CleanPlan,然后在进行清理,主要是删除1. 如果没有满足的分区,直接删除该分区,2. 否则删除该分区下的满足条件的文件,最后返回HoodieCleanStat包含删除的文件信息等。
snapshotState方法
notifyCheckpointComplete方法