Kafka Streams allows us to mix both the DSL and Processor API in an
application。
advantage
disadvantages
public <KIn, VIn, KOut, VOut> Topology addProcessor( String name,
org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
String... parentNames )
public interface Processor<K, V> {
void init(ProcessorContext context);
void process(K key, V value);
void close();
}
context.forward(newRecord); 发送record给next processor
主要materialized差异
KeyValueBytesStoreSupplier storeSupplier =
Stores.persistentTimestampedKeyValueStore("my-store");
grouped.aggregate(
initializer,
adder,
Materialized.<String, String>as(storeSupplier));
aggregate类函数不带默认state store,需要指定materialized,即state store,stream/topology指定state store。
builder.addStateStore(storeBuilder, "Digital Twin Processor");
this.kvStore = (KeyValueStore) context.getStateStore("digital-twin-store");
kvStore.get(key)/kvStore.put(key, digitalTwin);
sink-输出
addSink
DSL 无法实现,只能在process api实现
this.context.schedule(
Duration.ofSeconds(10), PunctuationType.WALL_CLOCK_TIME, this::enforceTtl);
schedule(创建)时候会返回Cancellable对象,用于后续取消
@Overridepublic void close() {// cancel the punctuatorpunctuator.cancel();}
Stream time :not execute unless data arrives on a continuous basis.必须有后续record
Wall clock time:无论有无新record都会执行,This means periodic functions will continue to execute regardless of whether or not new messages arrive.
Record headers context.headers()
Offset context.offset()
Partition context.partition()
Timestamp context.timestamp()
Topic context.topic()
Processors: A processor is a terminal operation (meaning it returns void and downstream operators cannot be chained)
Apply a Processor to each record at a time
Transformers (多个XXTransformerXX接口) can return one or more records (depending on which variation you use), and are therefore more optimal
if you need to chain a downstream operator.