• Kafka Stream 学习笔记-4 window and state store


    Windows and time

    时间类型

    • Event time
    • Ingestion time
    • Processing time

    消息上timestamp 设置

    • log.message.timestamp.type (broker level)
    • message.timestamp.type (topic level)
      以上的timestamp.type 都可以设置为CreateTime or LogAppendTime,topic上的配置会覆盖broker的配置

    timestamp extractors of KStream

    public interface TimestampExtractor 负责从record/message 抽取时间

    ,默认使用FailOnInvalidTimestamp, This extractor will throw a Streams Exception if the timestamp is invalid:extracts the timestamp from the consumer record, which is either the event time (when message.timestamp.type is set to CreateTime) or ingestion time (when mes sage.timestamp.type is set to LogAppendTime).

    还可以使用LogAndSkipOnInvalidTimestamp,在init的时候通过prop DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG指定extractor类

    Window Types

    flink 一样支持三种类型。

    • Tumbling windows,固定时长,连续不覆盖, grace属性用于处理延迟message
        TimeWindows tumblingWindow = TimeWindows
    .of(Duration.ofSeconds(60))
    .grace(Duration.ofSeconds(5));
    
    • 1
    • 2
    • 3
    • Hopping windows,对应flink 的Sliding Window,fixed-sized windows that may overlap, advanceBy指定hop:交叉时间
       TimeWindows hoppingWindow = TimeWindows
    .of(Duration.ofSeconds(5))
    .advanceBy(Duration.ofSeconds(4));
    
    • 1
    • 2
    • 3
    • Session windows
      gap时间内没有新message即结束当前window
        SessionWindows sessionWindow = SessionWindows
    .with(Duration.ofSeconds(5));
    
    • 1
    • 2

    suppress

    The suppress operator can be used to only emit the final computation of a window 一个window/时间段只emit最后一个数据

    • Suppressed.untilWindowCloses Only emit the final results of a window.
    • Suppressed.untilTimeLimit:Emit the results of a window after a configurable amount of time has elapsed since the last event was received. If another event with the same key arrives before the time limit is up, it replaces the first event in the buffer (note, the timer is not restarted when this happens). This has the effect of rate-limiting updates.

    state stores

    默认存放方式:both in-memory and persistent state stores,persistent 使用rockdb模式存放到disk,disk存放路径:setting the StreamsConfig.STATE_DIR_CONFIG property

    changelog topics

    Changelog topics are configurable using the Materialized class in the DSL
    除非显式disable ,kafka使用changelog topic 来back state store,即可以通过一个CHANGELOG Topic 保存state store内容

    Materialized.as("pulse-counts").withLoggingDisabled();
      
    
    • 1
    • 2

    不推荐disable,同时可以使用topicConfigs在materiallized时候配置topic

    .withLoggingEnabled(topicConfigs)
    也可以使用kafka-configs来修改changelog topic配置

    Controlling State Size

    设置 Tombstones 标志record/message可以被删除,设置方法:
    Return null to generate a tombstone ,This will cause the related key to be removed from the underlying state store.

     stream
            .groupByKey()
            .reduce(
                (value1, value2) -> {
                  if (value2.equals(PATIENT_CHECKED_OUT)) {
                    // create a tombstone
                    System.out.println("Creating tombstone for " + value2);
                    return null;
                  }
                  System.out.println("Returning aggregation for " + value2);
                  return doSomething(value1, value2);
                });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    Window retention

    Materialize a windowed store 默认保存一天, materialize 可以通过.withRetention(Duration.ofHours(6))) 指定retention

    通过参数指定大小和清理机制

    • segment.bytes
    • segment.ms force the log to roll, even if the segment file isn’t full,
    • min.cleanable.dirty.ratio
    • max.compaction.lag.ms :The maximum time a message will remain
      ineligible for compaction in the log. Only applicable for logs that are being compacted.
    • min.compaction.lag.ms

    Fixed-size LRU cache

    一种state cache 只保存在内存,不持久化到disk,withLoggingEnabled不起作用,但有changelog topic

    查看metadata

        KeyQueryMetadata metadata =
    streams.queryMetadataForKey(storeName, key, Serdes.String().serializer());
    
    • 1
    • 2

    Rebalancing

    Rebalancing is Enemy of the State (Store)

    Preventing State Migration 机制:

    • Sticky Assignment:使用同一instance(主机)来继续接管stateful进程,即保障rebalance后stateful consumer 尽量不进行主机迁移
    • static membership 通过group.instance.id指定(consumer)的id

    Incremental Cooperative Rebalancing(Stream version >2.4 默认)机制

    • One global round of rebalancing is replaced with several smaller rounds (incremental).
    • Clients hold on to resources (tasks) that do not need to change ownership, and they only stop processing the tasks that are being migrated (cooperative).

    Standby Replicas

    通过replicas 提供HA props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);

    Custom State Stores

    State Store Monitoring:Listeners

    streams.setStateListener

    streams.setGlobalStateRestoreListener

    写入持久性back的参数

    cache.max.bytes.buffering-系统参数 CACHE_MAX_BYTES_BUFFERING
    _CONFIG,默认10M

    commit.interval.ms :COMMIT_INTERVAL_MS_CONFIG,默认30秒

  • 相关阅读:
    (01)ORB-SLAM2源码无死角解析-(50) 局部建图线程→SearchInNeighbors():融合重复地图点
    图形学-几何
    httprunner实战接口测试笔记,拿走不谢
    ACP知识串联
    0826学习笔记(vim)
    c++中如何利用VA_LIST 和单体模式,构建自己的log小系统
    深度学习环境配置9——Ubuntu下的tensorflow-gpu==2.4.0环境配置
    Pixel3系统提取镜像备份并还原到另一台手机
    SQL语句关联表 如何添加关联表的条件 [需要null值或不需要null值]
    RabbitMQ
  • 原文地址:https://blog.csdn.net/weixin_40455124/article/details/126863628