• Apache Hudi初探(五)(与flink的结合)--Flink 中hudi clean操作


    背景

    本文主要是具体说说Flink中的clean操作的实现

    杂说闲谈

    在flink中主要是CleanFunction函数:

      @Override
      public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
        this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        LOG.info(String.format("exec clean with instant time %s...", instantTime));
        executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish");
      }
    
      @Override
      public void notifyCheckpointComplete(long l) throws Exception {
        if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {
          executor.execute(() -> {
            try {
              this.writeClient.waitForCleaningFinish();
            } finally {
              // ensure to switch the isCleaning flag
              this.isCleaning = false;
            }
          }, "wait for cleaning finish");
        }
      }
    
      @Override
      public void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
          try {
            this.writeClient.startAsyncCleaning();
            this.isCleaning = true;
          } catch (Throwable throwable) {
            // catch the exception to not affect the normal checkpointing
            LOG.warn("Error while start async cleaning", throwable);
          }
        }
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • open函数

      • writeClient =FlinkWriteClients.createWriteClient(conf, getRuntimeContext())
        创建FlinkWriteClient,用于写hudi数据

      • this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
        创建一个只有一个线程的线程池,改线程池的主要作用来异步执行hudi写操作

      • executor.execute(() -> writeClient.clean(instantTime)
        异步执行hudi的清理操作,该clean函数的主要代码如下:

           if (!tableServicesEnabled(config)) {
             return null;
           }
           final Timer.Context timerContext = metrics.getCleanCtx();
           CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
               HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
        
           HoodieTable table = createTable(config, hadoopConf);
           if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
             LOG.info("Cleaner started");
             // proceed only if multiple clean schedules are enabled or if there are no pending cleans.
             if (scheduleInline) {
               scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
               table.getMetaClient().reloadActiveTimeline();
             }
           }
        
           // Proceeds to execute any requested or inflight clean instances in the timeline
           HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);
           if (timerContext != null && metadata != null) {
             long durationMs = metrics.getDurationInMs(timerContext.stop());
             metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
             LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
                 + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
                 + " cleanerElapsedMs" + durationMs);
           }
           return metadata;
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION,() -> rollbackFailedWrites *
          根据配置
          hoodie.cleaner.policy.failed.writes* 默认是EAGER,也就是在写数据失败的时候,会立即进行这次写失败的数据的清理,在这种情况下,
          就不会执行rollbackFailedWrites操作,也就是回滚写失败文件的操作

        • HoodieTable table = createTable *
          创建
          HoodieFlinkMergeOnReadTable*类型的hudi表,用来做clean等操作

        • scheduleTableServiceInternal
          如果hoodie.clean.allow.multiple为true(默认为true)或者没有正在运行中clean操作,则会生成Clean计划
          这里最终调用的是FlinkWriteClient.scheduleCleaning方法,即CleanPlanActionExecutor.execute方法

          这里最重要的就是requestClean方法:

          CleanPlanner planner = new CleanPlanner<>(context, table, config);
          Option earliestInstant = planner.getEarliestCommitToRetain();
          List partitionsToClean = planner.getPartitionPathsToClean(earliestInstant)    
          int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
          Map>> cleanOpsWithPartitionMeta = context
              .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
              .stream()
              .collect(Collectors.toMap(Pair::getKey, Pair::getValue))    
          Map> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
              .collect(Collectors.toMap(Map.Entry::getKey,
                  e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())))    
          List partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey)
              .collect(Collectors.toList())    
          return new HoodieCleanerPlan(earliestInstant
              .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
              planner.getLastCompletedCommitTimestamp(),
              config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
              CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete)    
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • planner.getEarliestCommitToRetain();
            根据保留策略,获取到最早需要保留的commit的HoodieInstant,在这里会兼顾考虑到hoodie.cleaner.commits.retained(默认是10)以及hoodie.cleaner.hours.retained默认是24小时以及hoodie.cleaner.policy策略(默认是KEEP_LATEST_COMMITS)
          • planner.getPartitionPathsToClean(earliestInstant);
            根据保留的最新commit的HoodieInstant,得到要删除的分区,这里会根据配置hoodie.cleaner.incremental.mode(默认是true)来进行增量清理,
            这个时候就会根据上一次已经clean的信息,只需要删除差量的分区数据就行
          • cleanOpsWithPartitionMeta = context
            根据上面得到的需要删除的分区信息,获取需要删除的文件信息,具体的实现可以参考CleanPlanner.getFilesToCleanKeepingLatestCommits
            这里的操作主要是先通过fileSystemView获取分区下所有的FileGroup,之后再获取每个FileGroup下的所有的FileSlice(这里的FileSlice就有版本的概念,也就是commit的版本),之后再与最新保留的commit的时间戳进行比较得到需要删除的文件信息
          • new HoodieCleanerPlan
            最后组装成HoodieCleanPlan的计划,并且在外层调用table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); 方法把clean request的状态存储到对应的.hoodie目录下,并建立一个xxxx.clean.requested的元数据文件
        • table.getMetaClient().reloadActiveTimeline()
          重新加载timeline,便于过滤出来刚才scheduleTableServiceInternal操作生成的xxxxxxxxxxxxxx.clean.requested的元数据文件

        • table.clean(context, cleanInstantTime, skipLocking)
          真正执行clean的部分,主要是调用CleanActionExecutor.execute的方法,最终调用的是*runPendingClean(table, hoodieInstant)*方法:

           HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
           return runClean(table, cleanInstant, cleanerPlan);
          
          • 1
          • 2

          首先是反序列化CleanPlan,然后在进行清理,主要是删除1. 如果没有满足的分区,直接删除该分区,2. 否则删除该分区下的满足条件的文件,最后返回HoodieCleanStat包含删除的文件信息等。

    • snapshotState方法

      • 如果clean.async.enabled是true(默认是true),并且不是正在进行clean动作,则会进行异步清理
        this.writeClient.startAsyncCleaning(); 这里最终也是调用的writeClient.clean方法。
      • this.isCleaning = true;
        设置标志位,用来保证clean操作的有序性
    • notifyCheckpointComplete方法

      • 如果clean.async.enabled是true(默认是true),并且正在进行clean动作,则等待clean操作完成,
        并且设置清理标识位,用来和snapshotState方法进行呼应以保证clean操作的有序性
  • 相关阅读:
    Linux项目自动化构建工具-make/Makefile
    OpenCV4(C++)—— 仿射变换、透射变换和极坐标变换
    【linux】Linux 查看内存使用情况的几种方法汇总
    视频智能分析国标GB28181云平台EasyCVR加密机授权异常是什么原因?
    TOYOTA MOTOR CORPORATION Programming Contest 2022(AtCoder Beginner Contest 270)
    JS-Dom转为图片,并放入pdf中进行下载
    vite和webpack的区别
    鲲鹏devkit开发套件——编译调试工具介绍
    LAXCUS分布式操作系统相比LINUX的优势
    echarts 解决tooltip显示框随鼠标移动,且显示不全问题_付月半子的博客-CSDN博客
  • 原文地址:https://blog.csdn.net/monkeyboy_tech/article/details/133346816