Apache Kafka Streams 支援

從 1.1.4 版本開始,Spring for Apache Kafka 提供了對 Kafka Streams 的一流支援。要從 Spring 應用程式中使用它,kafka-streams jar 必須存在於類路徑中。它是 Spring for Apache Kafka 專案的一個可選依賴項,不會被傳遞下載。

基礎

參考 Apache Kafka Streams 文件建議以下使用 API 的方式

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

因此,我們有兩個主要元件

  • StreamsBuilder:具有構建 KStream(或 KTable)例項的 API。

  • KafkaStreams:管理這些例項的生命週期。

由單個 StreamsBuilder 暴露給 KafkaStreams 例項的所有 KStream 例項,即使它們具有不同的邏輯,也會同時啟動和停止。換句話說,由 StreamsBuilder 定義的所有流都與單個生命週期控制繫結。一旦 KafkaStreams 例項透過 streams.close() 關閉,它就不能重新啟動。相反,必須建立一個新的 KafkaStreams 例項來重新啟動流處理。

Spring 管理

為了簡化從 Spring 應用程式上下文角度使用 Kafka Streams 並透過容器使用生命週期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean。這是一個 AbstractFactoryBean 實現,用於將 StreamsBuilder 單例例項公開為 bean。以下示例建立了這樣一個 bean

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
從 2.2 版本開始,流配置現在以 KafkaStreamsConfiguration 物件而不是 StreamsConfig 的形式提供。

StreamsBuilderFactoryBean 也實現了 SmartLifecycle 來管理內部 KafkaStreams 例項的生命週期。與 Kafka Streams API 類似,您必須在啟動 KafkaStreams 之前定義 KStream 例項。這也適用於 Kafka Streams 的 Spring API。因此,當您在 StreamsBuilderFactoryBean 上使用預設的 autoStartup = true 時,您必須在應用程式上下文重新整理之前在 StreamsBuilder 上宣告 KStream 例項。例如,KStream 可以是一個常規的 bean 定義,而 Kafka Streams API 的使用沒有任何影響。以下示例展示瞭如何實現

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果您想手動控制生命週期(例如,根據某些條件停止和啟動),您可以使用工廠 bean (&) 字首 直接引用 StreamsBuilderFactoryBean bean。由於 StreamsBuilderFactoryBean 使用其內部 KafkaStreams 例項,因此可以安全地停止並重新啟動它。每次 start() 都會建立一個新的 KafkaStreams。如果您希望單獨控制 KStream 例項的生命週期,您還可以考慮使用不同的 StreamsBuilderFactoryBean 例項。

您還可以在 StreamsBuilderFactoryBean 上指定 KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListener 選項,這些選項將委託給內部 KafkaStreams 例項。

此外,除了間接在 StreamsBuilderFactoryBean 上設定這些選項之外,您還可以使用 KafkaStreamsCustomizer 回撥介面來

  1. (從 2.1.5 版本開始)使用 customize(KafkaStreams) 配置內部 KafkaStreams 例項

  2. (從 3.3.0 版本開始)使用 initKafkaStreams(Topology, Properties, KafkaClientSupplier) 例項化 KafkaStreams 的自定義實現

請注意,KafkaStreamsCustomizer 會覆蓋 StreamsBuilderFactoryBean 提供的選項。

如果您需要直接執行某些 KafkaStreams 操作,您可以透過使用 StreamsBuilderFactoryBean.getKafkaStreams() 訪問該內部 KafkaStreams 例項。

您可以透過型別自動裝配 StreamsBuilderFactoryBean bean,但您應該確保在 bean 定義中使用完整型別,如下例所示

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

或者,如果您使用介面 bean 定義,您可以新增 @Qualifier 以透過名稱進行注入。以下示例展示瞭如何實現

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

從 2.4.1 版本開始,工廠 bean 有一個名為 infrastructureCustomizer 的新屬性,型別為 KafkaStreamsInfrastructureCustomizer;這允許在流建立之前自定義 StreamsBuilder(例如,新增狀態儲存)和/或 Topology

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

提供了預設的無操作實現,以避免在不需要時必須實現這兩個方法。

提供了一個 CompositeKafkaStreamsInfrastructureCustomizer,用於在您需要應用多個自定義器時使用。

KafkaStreams Micrometer 支援

在 2.5.3 版本中引入,您可以配置一個 KafkaStreamsMicrometerListener,以自動為工廠 bean 管理的 KafkaStreams 物件註冊 Micrometer 計量器

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

流 JSON 序列化和反序列化

為了在以 JSON 格式讀寫主題或狀態儲存時序列化和反序列化資料,Spring for Apache Kafka 提供了一個 JacksonJsonSerde 實現,它使用 JSON,委託給 序列化、反序列化和訊息轉換 中描述的 JacksonJsonSerializerJacksonJsonDeserializerJacksonJsonSerde 實現透過其建構函式(目標型別或 ObjectMapper)提供相同的配置選項。在以下示例中,我們使用 JacksonJsonSerde 序列化和反序列化 Kafka 流的 Cat 有效負載(JacksonJsonSerde 可以在需要例項的任何地方以類似方式使用)

stream.through(Serdes.Integer(), new JacksonJsonSerde<>(Cat.class), "cats");

在以程式設計方式構建序列化器/反序列化器以在生產者/消費者工廠中使用時,從 2.3 版本開始,您可以使用流式 API,這簡化了配置。

stream.through(
    new JacksonJsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JacksonJsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

使用 KafkaStreamBrancher

KafkaStreamBrancher 類引入了一種更方便的方式來在 KStream 之上構建條件分支。

考慮以下不使用 KafkaStreamBrancher 的示例

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下示例使用 KafkaStreamBrancher

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

配置

為了配置 Kafka Streams 環境,StreamsBuilderFactoryBean 需要一個 KafkaStreamsConfiguration 例項。有關所有可能的選項,請參閱 Apache Kafka 文件

從 2.2 版本開始,流配置現在以 KafkaStreamsConfiguration 物件而不是 StreamsConfig 的形式提供。

為了避免大多數情況下的樣板程式碼,尤其是在您開發微服務時,Spring for Apache Kafka 提供了 @EnableKafkaStreams 註解,您應該將其放置在 @Configuration 類上。您所需要做的就是宣告一個名為 defaultKafkaStreamsConfigKafkaStreamsConfiguration bean。一個名為 defaultKafkaStreamsBuilderStreamsBuilderFactoryBean bean 會自動在應用程式上下文中宣告。您還可以宣告和使用任何額外的 StreamsBuilderFactoryBean bean。您可以透過提供一個實現 StreamsBuilderFactoryBeanConfigurer 的 bean 來對該 bean 執行額外的自定義。如果有多個這樣的 bean,它們將根據其 Ordered.order 屬性應用。

清理和停止配置

當工廠停止時,KafkaStreams.close() 會帶兩個引數呼叫

  • closeTimeout:等待執行緒關閉的時間(預設為 DEFAULT_CLOSE_TIMEOUT,設定為 10 秒)。可以使用 StreamsBuilderFactoryBean.setCloseTimeout() 進行配置。

  • leaveGroupOnClose:觸發消費者離開組的呼叫(預設為 false)。可以使用 StreamsBuilderFactoryBean.setLeaveGroupOnClose() 進行配置。

預設情況下,當工廠 bean 停止時,會呼叫 KafkaStreams.cleanUp() 方法。從 2.1.2 版本開始,工廠 bean 具有額外的建構函式,接受一個 CleanupConfig 物件,該物件具有屬性,允許您控制在 start()stop() 期間是否呼叫 cleanUp() 方法,或者兩者都不呼叫。從 2.7 版本開始,預設情況下永不清理本地狀態。

頭部增強器

版本 3.0 添加了 ContextualProcessorHeaderEnricherProcessor 擴充套件;提供了與已棄用的 HeaderEnricher 相同的功能,後者實現了已棄用的 Transformer 介面。這可用於在流處理中新增頭部;頭部值是 SpEL 表示式;表示式評估的根物件有 3 個屬性

  • record - org.apache.kafka.streams.processor.api.Record (key, value, timestamp, headers)

  • key - 當前記錄的鍵

  • value - 當前記錄的值

  • context - ProcessorContext,允許訪問當前記錄元資料

表示式必須返回 byte[]String(將使用 UTF-8 轉換為 byte[])。

要在流中使用增強器

.process(() -> new HeaderEnricherProcessor(expressions))

處理器不改變 keyvalue;它只是新增頭部。

每個記錄都需要一個新的例項。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

這是一個簡單的示例,新增一個字面量頭部和一個變數

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

版本 3.0 添加了 ContextualProcessorMessagingProcessor 擴充套件,提供了與已棄用的 MessagingTransformer 相同的功能,後者實現了已棄用的 Transformer 介面。這允許 Kafka Streams 拓撲與 Spring Messaging 元件(例如 Spring Integration 流)進行互動。轉換器需要 MessagingFunction 的實現。

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration 自動使用其 GatewayProxyFactoryBean 提供了一個實現。它還需要一個 MessagingMessageConverter 來將鍵、值和元資料(包括頭部)轉換為 Spring Messaging Message<?>。有關更多資訊,請參閱 KStream 呼叫 Spring Integration 流

從反序列化異常中恢復

版本 2.3 引入了 RecoveringDeserializationExceptionHandler,它可以在發生反序列化異常時採取一些操作。請參閱 Kafka 文件中關於 DeserializationExceptionHandler 的內容,RecoveringDeserializationExceptionHandler 是其實現之一。RecoveringDeserializationExceptionHandler 配置了一個 ConsumerRecordRecoverer 實現。框架提供了 DeadLetterPublishingRecoverer,它將失敗的記錄傳送到死信主題。有關此恢復器的更多資訊,請參閱 釋出死信記錄

要配置恢復器,請將以下屬性新增到您的流配置中

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

當然,recoverer() bean 可以是您自己的 ConsumerRecordRecoverer 實現。

互動式查詢支援

從 3.2 版本開始,Spring for Apache Kafka 提供了 Kafka Streams 中互動式查詢所需的基本設施。互動式查詢在有狀態的 Kafka Streams 應用程式中非常有用,因為它們提供了一種持續查詢應用程式中有狀態儲存的方法。因此,如果應用程式想要具體化所考慮系統的當前檢視,互動式查詢提供了一種實現方式。要了解有關互動式查詢的更多資訊,請參閱這篇文章。Spring for Apache Kafka 中的支援以一個名為 KafkaStreamsInteractiveQueryService 的 API 為中心,該 API 是 Kafka Streams 庫中互動式查詢 API 的一個外觀。應用程式可以建立此服務的一個例項作為 bean,然後使用它透過名稱檢索狀態儲存。

以下程式碼片段顯示了一個示例。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

假設一個 Kafka Streams 應用程式有一個名為 app-store 的狀態儲存,那麼該儲存可以透過 KafkaStreamsInteractiveQuery API 檢索,如下所示。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

一旦應用程式獲得了對狀態儲存的訪問許可權,它就可以從中查詢鍵值資訊。

在這種情況下,應用程式使用的狀態儲存是一個只讀的鍵值儲存。Kafka Streams 應用程式可以使用其他型別的狀態儲存。例如,如果應用程式更喜歡查詢基於視窗的儲存,它可以在 Kafka Streams 應用程式業務邏輯中構建該儲存,然後稍後檢索它。因此,KafkaStreamsInteractiveQueryService 中檢索可查詢儲存的 API 具有通用儲存型別簽名,以便終端使用者可以分配適當的型別。

這是 API 的型別簽名。

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

呼叫此方法時,使用者可以明確請求適當的狀態儲存型別,就像我們在上面的示例中所做的那樣。

重試狀態儲存檢索

嘗試使用 KafkaStreamsInteractiveQueryService 檢索狀態儲存時,狀態儲存可能會因各種原因而未找到。如果這些原因是暫時的,KafkaStreamsInteractiveQueryService 提供了一個選項,透過允許注入自定義的 RetryTemplate 來重試檢索狀態儲存。預設情況下,KafkaStreamsInteractiveQueryService 中使用的 RetryTemplate 使用最大三次嘗試,並具有一秒的固定回退。

您可以像這樣將自定義的 RetryTemplate 注入到 KafkaStreamsInteractiveQueryService 中,最大嘗試次數為十次。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

查詢遠端狀態儲存

上面顯示用於檢索狀態儲存的 API - retrieveQueryableStore 用於本地可用的鍵值狀態儲存。在生產環境中,Kafka Streams 應用程式很可能根據分割槽數量進行分散式。如果一個主題有四個分割槽,並且有四個相同 Kafka Streams 處理器例項正在執行,那麼每個例項可能負責處理主題中的單個分割槽。在這種情況下,呼叫 retrieveQueryableStore 可能無法提供例項正在尋找的正確結果,儘管它可能返回一個有效的儲存。讓我們假設一個有四個分割槽的主題包含各種鍵的資料,並且單個分割槽始終負責特定的鍵。如果呼叫 retrieveQueryableStore 的例項正在查詢此例項不託管的鍵的資訊,那麼它將不會收到任何資料。這是因為當前的 Kafka Streams 例項對該鍵一無所知。要解決此問題,呼叫例項首先需要確保它們擁有 Kafka Streams 處理器例項的主機資訊,該例項託管特定鍵。這可以從同一 application.id 下的任何 Kafka Streams 例項中檢索,如下所示。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

在上面的示例程式碼中,呼叫例項正在從名為 app-store 的狀態儲存中查詢特定鍵 12345。API 還需要一個相應的鍵序列化器,在本例中為 IntegerSerializer。Kafka Streams 會遍歷同一 application.id 下的所有例項,並嘗試查詢哪個例項託管此特定鍵。一旦找到,它會以 HostInfo 物件的形式返回該主機資訊。

API 如下所示

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

當以這種分散式方式使用同一 application.id 的多個 Kafka Streams 處理器例項時,應用程式應該提供一個 RPC 層,狀態儲存可以透過 RPC 端點(例如 REST 端點)進行查詢。有關此內容的更多詳細資訊,請參閱這篇文章。當使用 Spring for Apache Kafka 時,透過使用 spring-web 技術新增基於 Spring 的 REST 端點非常容易。一旦有了 REST 端點,就可以使用它從任何 Kafka Streams 例項查詢狀態儲存,前提是例項知道託管鍵的 HostInfo

如果託管金鑰的例項是當前例項,則應用程式無需呼叫 RPC 機制,而是進行 JVM 內部呼叫。然而,問題是應用程式可能不知道進行呼叫的例項就是託管金鑰的例項,因為特定伺服器可能會由於消費者重新平衡而丟失分割槽。為了解決這個問題,KafkaStreamsInteractiveQueryService 提供了一個方便的 API,透過 API 方法 getCurrentKafkaStreamsApplicationHostInfo() 查詢當前主機資訊,該方法返回當前的 HostInfo。其思想是應用程式可以首先獲取有關金鑰所在位置的資訊,然後將 HostInfo 與當前例項的 HostInfo 進行比較。如果 HostInfo 資料匹配,則可以透過 retrieveQueryableStore 進行簡單的 JVM 呼叫,否則選擇 RPC 選項。

Kafka Streams 示例

以下示例結合了本章中涵蓋的各種主題

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}
© . This site is unofficial and not affiliated with VMware.