消息上timestamp 设置
public interface TimestampExtractor 负责从record/message 抽取时间
,默认使用FailOnInvalidTimestamp, This extractor will throw a Streams Exception if the timestamp is invalid:extracts the timestamp from the consumer record, which is either the event time (when message.timestamp.type is set to CreateTime) or ingestion time (when mes sage.timestamp.type is set to LogAppendTime).
还可以使用LogAndSkipOnInvalidTimestamp,在init的时候通过prop DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG指定extractor类
和flink 一样支持三种类型。
TimeWindows tumblingWindow = TimeWindows
.of(Duration.ofSeconds(60))
.grace(Duration.ofSeconds(5));
TimeWindows hoppingWindow = TimeWindows
.of(Duration.ofSeconds(5))
.advanceBy(Duration.ofSeconds(4));
SessionWindows sessionWindow = SessionWindows
.with(Duration.ofSeconds(5));
The suppress operator can be used to only emit the final computation of a window 一个window/时间段只emit最后一个数据
默认存放方式:both in-memory and persistent state stores,persistent 使用rockdb模式存放到disk,disk存放路径:setting the StreamsConfig.STATE_DIR_CONFIG property
Changelog topics are configurable using the Materialized class in the DSL
除非显式disable ,kafka使用changelog topic 来back state store,即可以通过一个CHANGELOG Topic 保存state store内容
Materialized.as("pulse-counts").withLoggingDisabled();
不推荐disable,同时可以使用topicConfigs在materiallized时候配置topic
.withLoggingEnabled(topicConfigs)
也可以使用kafka-configs来修改changelog topic配置
设置 Tombstones 标志record/message可以被删除,设置方法:
Return null to generate a tombstone ,This will cause the related key to be removed from the underlying state store.
stream
.groupByKey()
.reduce(
(value1, value2) -> {
if (value2.equals(PATIENT_CHECKED_OUT)) {
// create a tombstone
System.out.println("Creating tombstone for " + value2);
return null;
}
System.out.println("Returning aggregation for " + value2);
return doSomething(value1, value2);
});
Materialize a windowed store 默认保存一天, materialize 可以通过.withRetention(Duration.ofHours(6))) 指定retention
一种state cache 只保存在内存,不持久化到disk,withLoggingEnabled不起作用,但有changelog topic
KeyQueryMetadata metadata =
streams.queryMetadataForKey(storeName, key, Serdes.String().serializer());
Rebalancing is Enemy of the State (Store)
Preventing State Migration 机制:
Incremental Cooperative Rebalancing(Stream version >2.4 默认)机制
通过replicas 提供HA props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
streams.setStateListener
streams.setGlobalStateRestoreListener
cache.max.bytes.buffering-系统参数 CACHE_MAX_BYTES_BUFFERING
_CONFIG,默认10M
commit.interval.ms :COMMIT_INTERVAL_MS_CONFIG,默认30秒