狀態儲存

當使用高階 DSL 並進行適當的呼叫來觸發狀態儲存時,Kafka Streams 會自動建立狀態儲存。

如果您想將傳入的 KTable 繫結具體化為命名狀態儲存,可以使用以下策略。

假設您有以下函式。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

然後透過設定以下屬性,傳入的 KTable 資料將被具體化到命名狀態儲存中。

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

您可以在應用程式中將自定義狀態儲存定義為 bean,繫結器將檢測到這些 bean 並將其新增到 Kafka Streams 構建器中。特別是當使用處理器 API 時,您需要手動註冊狀態儲存。為此,您可以在應用程式中將 StateStore 建立為 bean。以下是定義此類 bean 的示例。

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

這些狀態儲存可以由應用程式直接訪問。

在引導期間,上述 bean 將由繫結器處理並傳遞給 Streams 構建器物件。

訪問狀態儲存

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

這在註冊全域性狀態儲存時不起作用。要註冊全域性狀態儲存,請參閱下面關於自定義 StreamsBuilderFactoryBean 的部分。

© . This site is unofficial and not affiliated with VMware.