使用響應式 Kafka 繫結器的基本示例

在本節中,我們將展示一些使用響應式繫結器編寫響應式 Kafka 應用程式的基本程式碼片段及其詳細資訊。

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

您可以將上述 uppercase 函式與基於訊息通道的 Kafka 繫結器(spring-cloud-stream-binder-kafka)以及本節討論的響應式 Kafka 繫結器(spring-cloud-stream-binder-kafka-reactive)一起使用。當將此函式與常規 Kafka 繫結器一起使用時,儘管您在應用程式中(即在 uppercase 函式中)使用了響應式型別,但您只在函式執行期間獲得響應式流。在函式執行上下文之外,沒有響應式的好處,因為底層繫結器不是基於響應式堆疊的。因此,儘管這可能看起來像帶來了完整的端到端響應式堆疊,但此應用程式只是部分響應式的。

現在假設您正在將適用於 Kafka 的正確響應式繫結器(spring-cloud-stream-binder-kafka-reactive)與上述函式應用程式一起使用。此繫結器實現將從鏈的頂端消費到末端釋出,提供完整的響應式優勢。這是因為底層繫結器基於 Reactor Kafka 的核心 API 構建。在消費者端,它使用 KafkaReceiver,這是一個 Kafka 消費者的響應式實現。同樣,在生產者端,它使用 KafkaSender API,這是一個 Kafka 生產者的響應式實現。由於響應式 Kafka 繫結器的基礎是建立在適當的響應式 Kafka API 之上的,因此應用程式可以充分利用響應式技術的優勢。使用此響應式 Kafka 繫結器時,應用程式內建了諸如自動背壓等響應式功能。

從版本 4.0.2 開始,您可以透過分別提供一個或多個 ReceiverOptionsCustomizerSenderOptionsCustomizer bean 來定製 ReceiverOptionsSenderOptions。它們是 BiFunction,接收繫結名稱和初始選項,並返回定製的選項。這些介面擴充套件了 Ordered,因此當存在多個定製器時,它們將按所需的順序應用。

繫結器預設不提交偏移量。從版本 4.0.2 開始,KafkaHeaders.ACKNOWLEDGMENT 頭包含一個 ReceiverOffset 物件,您可以透過呼叫其 acknowledge()commit() 方法來提交偏移量。
@Bean
public Consumer<Flux<Message<String>>> consume() {
    return msg -> {
        process(msg.getPayload());
        msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
    }
}

有關更多資訊,請參閱 reactor-kafka 文件和 javadoc。

此外,從版本 4.0.3 開始,Kafka 消費者屬性 reactiveAtmostOnce 可以設定為 true,繫結器將在處理每個輪詢返回的記錄之前自動提交偏移量。此外,從版本 4.0.3 開始,您可以將消費者屬性 reactiveAutoCommit 設定為 true,繫結器將在處理每個輪詢返回的記錄之後自動提交偏移量。在這些情況下,不存在確認頭。

4.0.2 版本也提供了 reactiveAutoCommit,但實現不正確,它的行為類似於 reactiveAtMostOnce

以下是如何使用 reactiveAutoCommit 的示例。

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
	return flux -> flux
			.doOnNext(inner -> inner
				.doOnNext(val -> {
					log.info(val.value());
				})
				.subscribe())
			.subscribe();
}

請注意,當使用自動提交時,reactor-kafka 返回一個 Flux<Flux<ConsumerRecord<?, ?>>>。鑑於 Spring 無法訪問內部 Flux 的內容,應用程式必須處理原生的 ConsumerRecord;對內容不應用訊息轉換或轉換服務。這需要使用原生解碼(透過在配置中指定適當型別的 Deserializer)來返回所需型別的記錄鍵/值。

© . This site is unofficial and not affiliated with VMware.