• Kafka Stream 学习笔记-3 DSL‘s stateless stateful


    Topology

    Stream的执行过程被称为Topology

    KStream的主要function

    • merge 合并多个stream
    • mapValues/flatMapValues 循环处理,返回1个/多个结果
    • peek stateless模式循环处理, it may execute multiple times for a single record in failure cases,无返回
    • 数据filter-branche
      • filterNot/filter
      • branch/predicate 通过对predicate进行执行,返回true填到对应branch中,都返回false的数据将丢弃,即不保存在任何branch内
    • 输出
      • through: Materialize this stream to a topic and creates a new KStream from the topic
      • to(不返回Stream) :Materialize this stream to a topic
      • transform Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily)
      • suppress 只输出windows或者Timeout的最后一条记录 KStream pulseEvents; KTable pulseCounts =pulseEvents.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded().shutDownWhenFull()));
    • build() : Returns the Topology that represents the specified processing logic.

    serdes

    serdes 其实就是序列/反序列类的集合体,可以通过

    • SchemaRegistry

    • 本地Avro/json等实现

      avro 非schema模式可以使用'com.mitchseymour:kafka-registryless-avro-serdes
      
      • 1

    Buffer-内存使用规则

    Stream (window模式下)在内存中保留数据大小和数据满后的处理策略,可以通过Buffer来设置。

    • 大小限制
      • BufferConfig.maxBytes()
      • BufferConfig.maxRecords()
      • BufferConfig.unbounded()
    • Buffer Full Strategies
      • shutDownWhenFull :Gracefully shut down the application when the buffer is full
      • emitEarlyWhenFull : Emit the oldest results when the buffer is full instead of shutting down the application.

    Stateful operators/KTable 及globalTable

    Joining data - key必须一致,因此只需要一个key Serde

    • join (inner join)
           @NotNull 
    @Contract(value = "_,_,_->new", pure = true) 
    public static <K, V, VO> Joined<K, V, VO> with(     org.apache.kafka.common.serialization.Serde<K> keySerde,
        org.apache.kafka.common.serialization.Serde<V> valueSerde,
        org.apache.kafka.common.serialization.Serde<VO> otherValueSerde )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • leftJoin
    • outerJoin
      在这里插入图片描述

    Aggregating data

    • aggregate-method and parameter
      • add/Subtractor
      • initializer
    • count
    • reduce
    • Windowing data
      • windowedBy
    • group
      • groupByKey
      • groupBy

    Materialized

      输出到一个可以供外部查询的state store 中
    
    • 1
        public abstract <VR> org.apache.kafka.streams.kstream.KTable<K, VR> aggregate(     org.apache.kafka.streams.kstream.Initializer<VR> initializer,
    org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR> aggregator,
    org.apache.kafka.streams.kstream.Materialized<K, VR, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized )
    
    • 1
    • 2
    • 3

    toStream

    转换为kStream
    
    • 1

    State Stores

    默认 Embedded:The default state store implementations that are included in Kafka Streams are embedded within your Kafka Streams application at the task level

    Embedded优势: 延迟低、无需连接外部设备和担心外部设备带来的不可用性
    默认保存在内存 ,同时在磁盘也有rockdb 模式保存log,In-Memory Stores(spilling to disk)

    对State store的基本要求

    • Fault tolerant
    • Key-based
    • Multiple access modes
  • 相关阅读:
    怎么样零代码零成本搭建个人网站?
    Reparameterization trick(重参数化技巧)
    vue3写一个定时器
    CNN优化trick
    使用Locust进行分布式性能测试
    uC/OS-Il的初始化函数---OSInit()---其他特定文件部分
    RocketMQ
    python模块hmac,hmac哈希算法库
    20231014后台面经总结
    Python Pandas Series转换为DataFrame
  • 原文地址:https://blog.csdn.net/weixin_40455124/article/details/126821956