狀態儲存
當使用高階 DSL 並呼叫適當的方法觸發狀態儲存時,Kafka Streams 會自動建立狀態儲存。
如果您想將傳入的 KTable
繫結具體化為一個命名狀態儲存,可以使用以下策略來實現。
假設您有以下函式。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然後透過設定以下屬性,傳入的 KTable
資料將被具體化到命名狀態儲存中。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
您可以在應用程式中將自定義狀態儲存定義為 bean,它們將被 Binder 檢測到並新增到 Kafka Streams builder 中。尤其在使用 processor API 時,您需要手動註冊狀態儲存。為此,您可以在應用程式中將 StateStore 建立為一個 bean。以下是定義此類 bean 的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
然後應用程式可以直接訪問這些狀態儲存。
在啟動期間,上述 bean 將由 Binder 處理並傳遞給 Streams builder 物件。
訪問狀態儲存
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
在註冊全域性狀態儲存時,這種方法將不起作用。要註冊全域性狀態儲存,請參閱下面關於自定義 StreamsBuilderFactoryBean
的章節。