• Apache Hudi 流转批 场景实践


    背景

    在某些业务场景下,我们需要一个标志来衡量hudi数据写入的进度,比如:Flink 实时向 Hudi 表写入数据,然后使用这个 Hudi 表来支持批量计算并通过一个 flag 来评估它的分区数据是否完整从而进一步写入分区数据进行分区级别的ETL,这也就是我们通常说的流转批

    EventTime计算原理

    1.jpg

    图中Flink Sink包含了两个算子。第一个writer 算子,它负责把数据写入文件,writer在checkpoint触发时,会把自己写入的最大的一个时间传到commit算子中,然后commit算子从多个上游传过来的时间中选取一个最小值作为这一批提交数据的时间,并写入HUDI表的元数据中。

    案例使用

    我们的方案是将这个进度值(EventTime)存储为 hudi 提交(版本)元数据的属性里,然后通过访问这个元数据属性获取这个进度值。在下游的批处理任务之前加一个监控任务去监控最新快照元数据。如果它的时间已经超过了当前的分区时间,就认为这个表的数据已经完备了,这个监控任务就会成功触发下游的批处理任务进行计算,这样可以防止在异常场景下数据管道或者批处理任务空跑的情况。

    下图是一个flink 1分钟级别入库到HUDI ODS表, 然后通过流转批计算写入HUDI DWD表的一个执行过程。

    2.jpg

    US调度系统轮询逻辑

    3.jpg

    如何解决乱序到来问题,  我们可以通过设置spedGapTime来设置允许延迟到来的范围默认是0 不会延迟到来。

    Maven pom 依赖

    针对此功能特性的Hudi依赖版本如下

    
    <dependencies>
      <dependency>
        <groupId>org.apache.hudigroupId>
        <artifactId>hudi-flink1.13-bundleartifactId>
        <version>0.12.1version>
      dependency>
    dependencies>
    
    <dependencies>
      <dependency>
        <groupId>org.apache.hudigroupId>
        <artifactId>hudi-flink1.15-bundleartifactId>
        <version>0.12.1version>
      dependency>
    dependencies>
    

    如何设置EventTime

    能够解析的字段类型及格式如下:

    类型 示例
    TIMESTAMP(3) 2012-12-12T12:12:12
    TIMESTAMP(3) 2012-12-12 12:12:12
    DATE 2012-12-12
    BIGINT 100L
    INT 100

    用户只需要设置flink conf指定时间字段作为时间推进字段

    Map options = new HashMap<>();
    // 这里省略其他表字段
    options.put(FlinkOptions.EVENT_TIME_FIELD.key(), "ts");
    HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
         .column("id int not null")
         .column("ts string")
         .column("dt string")
         .pk("id")
         .partition("dt")
         .options(options);
    

    通过设置hoodie.payload.event.time.field指定需要计算的eventtime的字段

    create table hudi_cow_01(\n" +
    "  uuid varchar(20),\n" +
    "  name varchar(10),\n" +
    "  age int,\n" +
    "  ts timestamp(3),\n" +
    "  PRIMARY KEY(uuid) NOT ENFORCED\n" +
    ")\n" +
    " with (\n" +
     // 这里省略其他参数
    "  'hoodie.payload.event.time.field' = 'ts'\n"
    ")
    

    如何读取EventTime

    Spark SQL

    call show_commit_extra_metadata(table => 'hudi_tauth_test.hudi_cow_01', metadata_key => 'hoodie.payload.event.time.field');
    

    4.png

    Java API

    代码获取片段如下

    Option commitMetadataOption = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, currentInstant);
    if (!commitMetadataOption.isPresent()) {
        throw new HoodieException(String.format("Commit %s not found commitMetadata in Commits %s.", currentInstant, timeline));
    }
    // 获取到当前版本的时间进度
    String eventTime = commitMetadataOption.get().getExtraMetadata().get(FlinkOptions.EVENT_TIME_FIELD.key());
    System.out.println("current eventTime: " + eventTime);
    

    输出结果如下

    current eventTime: 1667971364742
    
  • 相关阅读:
    STM32F4VGT6-DISCOVERY:uart1驱动
    7-29 删除字符串中的子串
    低代码平台的核心价值与优势
    setattr()函数与getattr()函数用法
    【网络爬虫 | Python】数字货币ok链上bitcoin大额交易实时爬取,存入 mysql 数据库
    第一次笔试【面试】
    计算机毕设项目论文介绍(Java智慧物业管理系统为例)
    CSS单位px、em、rem、vh、vw、vmin、vmax
    喜讯 | 智安零信任安全项目入选信通院“安全守卫者计划”优秀案例
    主机安全防护五大难点攻克
  • 原文地址:https://www.cnblogs.com/leesf456/p/17134219.html