使用 Reactive Kafka Binder 的基本示例

在本節中,我們將展示一些使用 reactive binder 編寫響應式 Kafka 應用程式的基本程式碼片段及其相關細節。

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

您可以將上述 `uppercase` 函式與基於訊息通道的 Kafka binder (`spring-cloud-stream-binder-kafka`) 以及本文討論的 reactive Kafka binder (`spring-cloud-stream-binder-kafka-reactive`) 一起使用。當與常規 Kafka binder 一起使用此函式時,儘管您在應用程式(即在 `uppercase` 函式中)使用了響應式型別,但您僅在函式執行範圍內獲得了響應式流。在函式執行上下文之外,由於底層 binder 不是基於響應式堆疊構建的,因此沒有響應式優勢。因此,儘管這可能看起來像帶來了完整的端到端響應式堆疊,但此應用程式僅是部分響應式的。

現在假設您正在使用適用於 Kafka 的適當的 reactive binder - `spring-cloud-stream-binder-kafka-reactive` 來構建上述函式的應用程式。此 binder 實現將從鏈頂端的消費到鏈底端的釋出,提供完整的響應式優勢。這是因為底層 binder 構建在 Reactor Kafka 的核心 API 之上。在消費者端,它使用了 KafkaReceiver,這是一個 Kafka 消費者的響應式實現。類似地,在生產者端,它使用了 KafkaSender API,這是一個 Kafka 生產者的響應式實現。由於 reactive Kafka binder 的基礎是建立在適當的響應式 Kafka API 之上的,因此應用程式在使用此 reactive Kafka binder 時可以獲得使用響應式技術的全部好處。諸如自動背壓等響應式能力均內置於應用程式中。

從版本 4.0.2 開始,您可以透過提供一個或多個 `ReceiverOptionsCustomizer` 或 `SenderOptionsCustomizer` Bean 來定製 `ReceiverOptions` 和 `SenderOptions`。它們是 `BiFunction`,接收繫結名稱和初始選項,並返回定製後的選項。這些介面擴充套件了 `Ordered`,因此當存在多個定製器時,它們將按照所需的順序應用。

binder 預設不提交偏移量。從版本 4.0.2 開始,`KafkaHeaders.ACKNOWLEDGMENT` 頭中包含一個 `ReceiverOffset` 物件,您可以透過呼叫其 `acknowledge()` 或 `commit()` 方法來提交偏移量。
@Bean
public Consumer<Flux<Message<String>> consume() {
    return msg -> {
        process(msg.getPayload());
        msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
    }
}

有關更多資訊,請參閱 `reactor-kafka` 文件和 javadocs。

此外,從版本 4.0.3 開始,可以將 Kafka 消費者屬性 `reactiveAtmostOnce` 設定為 `true`,binder 將在處理每個 poll 返回的記錄之前自動提交偏移量。同樣,從版本 4.0.3 開始,您可以將消費者屬性 `reactiveAutoCommit` 設定為 `true`,binder 將在處理每個 poll 返回的記錄之後自動提交偏移量。在這些情況下,不存在確認頭。

4.0.2 也提供了 `reactiveAutoCommit`,但實現不正確,其行為類似於 `reactiveAtMostOnce`。

以下是如何使用 `reactiveAutoCommit` 的示例。

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
	return flux -> flux
			.doOnNext(inner -> inner
				.doOnNext(val -> {
					log.info(val.value());
				})
				.subscribe())
			.subscribe();
}

請注意,當使用自動提交時,`reactor-kafka` 返回 `Flux<Flux<ConsumerRecord<?, ?>>>`。鑑於 Spring 無法訪問內部 flux 的內容,應用程式必須處理原生的 `ConsumerRecord`;內容沒有應用訊息轉換或轉換服務。這需要使用原生解碼(透過在配置中指定相應型別的 `Deserializer`)來返回所需型別的記錄鍵/值。