Apache Kafka Streams 支援

從 1.1.4 版本開始,Spring for Apache Kafka 為 Kafka Streams 提供了第一類支援。要在 Spring 應用中使用它,classpath 中必須包含 kafka-streams jar。它是 Spring for Apache Kafka 專案的可選依賴項,不會被傳遞下載。

基本概念

Apache Kafka Streams 參考文件建議以下使用 API 的方式

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

因此,我們有兩個主要元件

  • StreamsBuilder:提供用於構建 KStream(或 KTable)例項的 API。

  • KafkaStreams:用於管理這些例項的生命週期。

單個 StreamsBuilderKafkaStreams 例項公開的所有 KStream 例項會同時啟動和停止,即使它們的邏輯不同。換句話說,由 StreamsBuilder 定義的所有流都繫結到同一個生命週期控制。一旦 KafkaStreams 例項透過 streams.close() 關閉,就無法重新啟動。相反,必須建立一個新的 KafkaStreams 例項來重新啟動流處理。

Spring 管理

為了簡化從 Spring 應用上下文角度使用 Kafka Streams 並透過容器進行生命週期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean。這是一個 AbstractFactoryBean 實現,用於將 StreamsBuilder 單例例項作為 bean 公開。以下示例建立了這樣一個 bean

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
從 2.2 版本開始,流配置現在以 KafkaStreamsConfiguration 物件而不是 StreamsConfig 提供。

StreamsBuilderFactoryBean 還實現了 SmartLifecycle 來管理內部 KafkaStreams 例項的生命週期。與 Kafka Streams API 類似,你必須在啟動 KafkaStreams 之前定義 KStream 例項。這同樣適用於 Spring for Kafka Streams API。因此,當你在 StreamsBuilderFactoryBean 上使用預設的 autoStartup = true 時,你必須在應用上下文重新整理之前在 StreamsBuilder 上宣告 KStream 例項。例如,KStream 可以是一個常規的 bean 定義,而 Kafka Streams API 的使用不受任何影響。以下示例展示瞭如何做到這一點

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果你想手動控制生命週期(例如,根據某些條件停止和啟動),你可以使用 factory bean (&) 字首直接引用 StreamsBuilderFactoryBean bean。由於 StreamsBuilderFactoryBean 使用其內部 KafkaStreams 例項,因此可以安全地再次停止和重新啟動它。每次 start() 時都會建立一個新的 KafkaStreams 例項。如果你想分別控制 KStream 例項的生命週期,也可以考慮使用不同的 StreamsBuilderFactoryBean 例項。

你還可以在 StreamsBuilderFactoryBean 上指定 KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListener 選項,這些選項會被委託給內部 KafkaStreams 例項。

此外,除了在 StreamsBuilderFactoryBean 上間接設定這些選項之外,你還可以使用 KafkaStreamsCustomizer 回撥介面來

  1. (從 版本 2.1.5 開始)使用 customize(KafkaStreams) 配置內部 KafkaStreams 例項

  2. (從 版本 3.3.0 開始)使用 initKafkaStreams(Topology, Properties, KafkaClientSupplier) 例項化 KafkaStreams 的自定義實現

請注意,KafkaStreamsCustomizer 會覆蓋 StreamsBuilderFactoryBean 提供的選項。

如果你需要直接執行一些 KafkaStreams 操作,可以透過 StreamsBuilderFactoryBean.getKafkaStreams() 訪問該內部 KafkaStreams 例項。

你可以按型別自動裝配 StreamsBuilderFactoryBean bean,但必須確保在 bean 定義中使用完整型別,如下例所示

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

另外,如果使用介面 bean 定義,可以新增 @Qualifier 按名稱注入。以下示例展示瞭如何做到這一點

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

從 2.4.1 版本開始,factory bean 有一個新屬性 infrastructureCustomizer,型別為 KafkaStreamsInfrastructureCustomizer;這允許在建立流之前自定義 StreamsBuilder(例如,新增狀態儲存)和/或 Topology

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

提供了預設的無操作(no-op)實現,以避免在不需要實現某個方法時必須實現兩者。

提供了 CompositeKafkaStreamsInfrastructureCustomizer,用於需要應用多個自定義器(customizers)的情況。

Kafka Streams Micrometer 支援

從 2.5.3 版本開始引入,你可以配置 KafkaStreamsMicrometerListener 來為 factory bean 管理的 KafkaStreams 物件自動註冊 micrometer 指標。

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

Streams JSON 序列化和反序列化

為了在以 JSON 格式讀寫主題或狀態儲存時進行資料序列化和反序列化,Spring for Apache Kafka 提供了一個使用 JSON 的 JsonSerde 實現,它委託給序列化、反序列化和訊息轉換中描述的 JsonSerializerJsonDeserializerJsonSerde 實現透過其建構函式(目標型別或 ObjectMapper)提供了相同的配置選項。在以下示例中,我們使用 JsonSerde 對 Kafka 流的 Cat 載荷進行序列化和反序列化(JsonSerde 可以在任何需要例項的地方以類似的方式使用)

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

自 2.3 版本起,在以程式設計方式構建序列化器/反序列化器用於生產者/消費者工廠時,你可以使用流式 API,這簡化了配置。

stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

使用 KafkaStreamBrancher

KafkaStreamBrancher 類引入了一種更方便的方式來在 KStream 之上構建條件分支。

考慮以下不使用 KafkaStreamBrancher 的示例

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下示例使用 KafkaStreamBrancher

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

配置

要配置 Kafka Streams 環境,StreamsBuilderFactoryBean 需要一個 KafkaStreamsConfiguration 例項。有關所有可能的選項,請參閱 Apache Kafka 文件

從 2.2 版本開始,流配置現在以 KafkaStreamsConfiguration 物件而不是 StreamsConfig 提供。

為了避免大多數情況下的樣板程式碼,尤其是在開發微服務時,Spring for Apache Kafka 提供了 @EnableKafkaStreams 註解,你應該將其放在 @Configuration 類上。你只需要宣告一個名為 defaultKafkaStreamsConfigKafkaStreamsConfiguration bean。一個名為 defaultKafkaStreamsBuilderStreamsBuilderFactoryBean bean 會自動在應用上下文中宣告。你也可以宣告和使用任何額外的 StreamsBuilderFactoryBean bean。透過提供一個實現 StreamsBuilderFactoryBeanConfigurer 的 bean,你可以對該 bean 進行額外的自定義。如果存在多個這樣的 bean,它們將按照其 Ordered.order 屬性進行應用。

清理 & 停止配置

當 factory 停止時,會呼叫 KafkaStreams.close() 方法,帶有 2 個引數

  • closeTimeout:等待執行緒關閉的時間(預設為 DEFAULT_CLOSE_TIMEOUT,設定為 10 秒)。可以透過 StreamsBuilderFactoryBean.setCloseTimeout() 進行配置。

  • leaveGroupOnClose:是否觸發消費者離開組的呼叫(預設為 false)。可以透過 StreamsBuilderFactoryBean.setLeaveGroupOnClose() 進行配置。

預設情況下,當 factory bean 停止時,會呼叫 KafkaStreams.cleanUp() 方法。從 2.1.2 版本開始,factory bean 提供了額外的建構函式,接受一個 CleanupConfig 物件,該物件包含屬性,允許你控制 cleanUp() 方法是在 start()stop() 期間呼叫,還是都不呼叫。從 2.7 版本開始,預設情況下永遠不清理本地狀態。

頭部豐富器

3.0 版本添加了 ContextualProcessor 的擴充套件 HeaderEnricherProcessor;提供了與已廢棄的 HeaderEnricher 相同的功能,後者實現了已廢棄的 Transformer 介面。這可用於在流處理中新增頭部;頭部值是 SpEL 表示式;表示式評估的根物件具有 3 個屬性

  • record - org.apache.kafka.streams.processor.api.Recordkeyvaluetimestampheaders

  • key - 當前記錄的鍵

  • value - 當前記錄的值

  • context - ProcessorContext,允許訪問當前記錄元資料

表示式必須返回 byte[]String(後者將使用 UTF-8 轉換為 byte[])。

要在流中使用 enricher

.process(() -> new HeaderEnricherProcessor(expressions))

該處理器不會改變 keyvalue;它僅新增頭部。

你需要為每條記錄建立一個新例項。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

這裡有一個簡單示例,新增一個字面量頭部和一個變數

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

3.0 版本添加了 ContextualProcessor 的擴充套件 MessagingProcessor,提供了與已廢棄的 MessagingTransformer 相同的功能,後者實現了已廢棄的 Transformer 介面。這允許 Kafka Streams 拓撲與 Spring Messaging 元件(例如 Spring Integration 流)進行互動。該 transformer 需要 MessagingFunction 的實現。

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration 自動使用其 GatewayProxyFactoryBean 提供一個實現。它還需要一個 MessagingMessageConverter 將 key、value 和元資料(包括頭部)與 Spring Messaging Message 相互轉換。有關更多資訊,請參閱[從 KStream 呼叫 Spring Integration 流]

從反序列化異常中恢復

2.3 版本引入了 RecoveringDeserializationExceptionHandler,它可以在發生反序列化異常時採取一些行動。請參考 Kafka 關於 DeserializationExceptionHandler 的文件,RecoveringDeserializationExceptionHandler 是它的一個實現。RecoveringDeserializationExceptionHandler 配置了一個 ConsumerRecordRecoverer 實現。框架提供了 DeadLetterPublishingRecoverer,它將失敗的記錄傳送到死信主題。有關此 recoverer 的更多資訊,請參閱釋出死信記錄

要配置 recoverer,請將以下屬性新增到你的流配置中

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

當然,recoverer() bean 可以是你自己的 ConsumerRecordRecoverer 實現。

互動式查詢支援

從 3.2 版本開始,Spring for Apache Kafka 提供了 Kafka Streams 互動式查詢所需的基本功能。互動式查詢在有狀態的 Kafka Streams 應用中非常有用,因為它們提供了一種持續查詢應用中狀態儲存的方式。因此,如果應用想要實現系統中當前檢視的實體化,互動式查詢提供了一種實現方式。要了解更多關於互動式查詢的資訊,請參閱這篇文章。Spring for Apache Kafka 中的支援圍繞一個名為 KafkaStreamsInteractiveQueryService 的 API,它是 Kafka Streams 庫中互動式查詢 API 的一個外觀(facade)。應用可以將此服務例項建立為一個 bean,然後使用它按名稱檢索狀態儲存。

以下程式碼片段展示了一個示例。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

假設 Kafka Streams 應用有一個名為 app-store 的狀態儲存,那麼可以透過如下所示的 KafkStreamsInteractiveQuery API 檢索該儲存。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

一旦應用訪問到狀態儲存,就可以從中查詢鍵值資訊。

在這種情況下,應用使用的狀態儲存是一個只讀的鍵值儲存。Kafka Streams 應用還可以使用其他型別的狀態儲存。例如,如果應用希望查詢基於視窗的儲存,可以在 Kafka Streams 應用業務邏輯中構建該儲存,然後稍後檢索它。因此,KafkaStreamsInteractiveQueryService 中檢索可查詢儲存的 API 具有通用儲存型別簽名,以便終端使用者可以指定正確的型別。

這是 API 中的型別簽名。

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

呼叫此方法時,使用者可以像我們在上面的示例中那樣,明確請求正確的狀態儲存型別。

重試狀態儲存檢索

嘗試使用 KafkaStreamsInteractiveQueryService 檢索狀態儲存時,狀態儲存可能因各種原因找不到。如果這些原因是暫時的,KafkaStreamsInteractiveQueryService 提供了重試檢索狀態儲存的選項,允許注入自定義的 RetryTemplate。預設情況下,KafkaStreamsInteractiveQueryService 中使用的 RetryTemplate 最大嘗試次數為三次,固定退避時間為一秒。

以下是如何將最大嘗試次數設定為十的自定義 RetryTemplate 注入到 KafkaStreamsInteractiveQueryService 中。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

查詢遠端狀態儲存

上面展示的用於檢索狀態儲存的 API - retrieveQueryableStore,主要用於本地可用的鍵值狀態儲存。在生產環境中,Kafka Streams 應用很可能根據分割槽數量進行分散式部署。如果一個主題有四個分割槽,並且有四個相同的 Kafka Streams 處理器例項正在執行,那麼每個例項可能負責處理該主題的一個分割槽。在這種情況下,呼叫 retrieveQueryableStore 可能不會得到例項正在尋找的正確結果,即使它可能返回一個有效的儲存。假設具有四個分割槽的主題包含關於各種鍵的資料,並且一個分割槽總是負責特定的鍵。如果呼叫 retrieveQueryableStore 的例項正在查詢該例項未託管的鍵的資訊,那麼它將不會收到任何資料。這是因為當前的 Kafka Streams 例項對此鍵一無所知。為了解決這個問題,呼叫例項首先需要確保它們擁有託管特定鍵的 Kafka Streams 處理器例項的主機資訊。這可以從同一 application.id 下的任何 Kafka Streams 例項檢索,如下所示。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

在上面的示例程式碼中,呼叫例項正在從名為 app-store 的狀態儲存中查詢特定鍵 12345。該 API 還需要相應的鍵序列化器,在本例中是 IntegerSerializer。Kafka Streams 會在其同一 application.id 下的所有例項中查詢哪個例項託管了此特定鍵,找到後,它將該主機資訊作為 HostInfo 物件返回。

該 API 如下所示

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

在這種分散式方式下使用相同 application.id 的多個 Kafka Streams 處理器例項時,應用應提供一個 RPC 層,以便可以透過 RPC 端點(例如 REST)查詢狀態儲存。有關此內容的更多詳細資訊,請參閱這篇文章。使用 Spring for Apache Kafka 時,利用 spring-web 技術新增基於 Spring 的 REST 端點非常容易。一旦有了 REST 端點,就可以從任何 Kafka Streams 例項查詢狀態儲存,前提是該例項知道託管該鍵的 HostInfo

如果託管鍵的例項是當前例項,那麼應用無需呼叫 RPC 機制,而是進行 JVM 內部呼叫。然而,問題在於應用可能不知道發起呼叫的例項就是託管鍵的例項,因為特定伺服器可能由於消費者再平衡而失去一個分割槽。為了解決這個問題,KafkaStreamsInteractiveQueryService 提供了一個方便的 API,透過 API 方法 getCurrentKafkaStreamsApplicationHostInfo() 查詢當前主機資訊,該方法返回當前的 HostInfo。其思想是應用可以先獲取關於鍵所在位置的資訊,然後將該 HostInfo 與當前例項的 HostInfo 進行比較。如果 HostInfo 資料匹配,則可以透過 retrieveQueryableStore 進行簡單的 JVM 呼叫,否則選擇 RPC 選項。

Kafka Streams 示例

以下示例結合了我們在本章中介紹的各種主題

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}