• Kafka Stream 学习笔记-5 process api


    Summary

    Kafka Streams allows us to mix both the DSL and Processor API in an
    application。

    advantage

    • Access to record metadata (topic, partition, offset information, record headers,and so on)
    • ability to schedule periodic functions (DSL不支持)
    • More fine-grained control over when records get forwarded to downstream processors
    • More granular access to state stores
    • Ability to circumvent any limitations you come across in the DSL

    disadvantages

    • More verbose code, which can lead to higher maintenance costs and impair readability
    • A higher barrier to entry for other project maintainers
    • More footguns, including accidental reinvention of DSL features or abstractions, exotic problem-framing,1 and performance traps

    Topology 主要方法

    • addSource method to create a source processor.
    • addProcessor–需要挂到parent(source processor 获取前置processor)下
     public <KIn, VIn, KOut, VOut> Topology addProcessor(     String name,
        org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
        String... parentNames )
    
    • 1
    • 2
    • 3
    • Stateless Processors
     public interface Processor<K, V> {
    void init(ProcessorContext context);
    void process(K key, V value);
    void close();
    }
        context.forward(newRecord); 发送record给next processor
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • Stateful Processors

    主要materialized差异

    KeyValueBytesStoreSupplier storeSupplier =
    Stores.persistentTimestampedKeyValueStore("my-store");
    grouped.aggregate(
    initializer,
    adder,
    Materialized.<String, String>as(storeSupplier));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    aggregate类函数不带默认state store,需要指定materialized,即state store,stream/topology指定state store。

      builder.addStateStore(storeBuilder, "Digital Twin Processor");
    
      this.kvStore = (KeyValueStore) context.getStateStore("digital-twin-store");
    
      kvStore.get(key)/kvStore.put(key, digitalTwin);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • sink-输出

      addSink

    Periodic Functions with Punctuate

    DSL 无法实现,只能在process api实现

      this.context.schedule(
            Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, this::enforceTtl);
    
    • 1
    • 2

    schedule(创建)时候会返回Cancellable对象,用于后续取消

      @Overridepublic void close() {// cancel the punctuatorpunctuator.cancel();}
    
    • 1

    types of punctuations-触发模式

    Stream time :not execute unless data arrives on a continuous basis.必须有后续record

    Wall clock time:无论有无新record都会执行,This means periodic functions will continue to execute regardless of whether or not new messages arrive.

    Accessing Record Metadata

    Record headers context.headers()

    Offset context.offset()

    Partition context.partition()

    Timestamp context.timestamp()

    Topic context.topic()

    Combining the Processor API with the DSL

    Processors: A processor is a terminal operation (meaning it returns void and downstream operators cannot be chained)

    Apply a Processor to each record at a time

    Transformers (多个XXTransformerXX接口) can return one or more records (depending on which variation you use), and are therefore more optimal
    if you need to chain a downstream operator.

  • 相关阅读:
    04设计模式-建造型模式-工厂模式
    检测登革热NS1蛋白分子/银纳米颗粒/金纳米颗粒/铂纳米颗粒修饰二氧化硅微球
    辅助驾驶功能开发-功能算法篇(1)-ACC-多目标选择
    excel FORMULA
    Linux驱动【day2】
    [H5动画制作系列] Sprite及Text Demo
    医学图像分割的深度学习:综述
    汽车行驶中是怎么保障轴瓦安全的?
    深度解读篇章:剖析构建互联网大厦的基石——TCP/IP协议全貌
    shell基础篇:Bash特性和shell变量
  • 原文地址:https://blog.csdn.net/weixin_40455124/article/details/126863690