使用響應式 Kafka 繫結器的基本示例
在本節中,我們將展示一些使用響應式繫結器編寫響應式 Kafka 應用程式的基本程式碼片段及其詳細資訊。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
您可以將上述 uppercase 函式與基於訊息通道的 Kafka 繫結器(spring-cloud-stream-binder-kafka)以及本節討論的響應式 Kafka 繫結器(spring-cloud-stream-binder-kafka-reactive)一起使用。當將此函式與常規 Kafka 繫結器一起使用時,儘管您在應用程式中(即在 uppercase 函式中)使用了響應式型別,但您只在函式執行期間獲得響應式流。在函式執行上下文之外,沒有響應式的好處,因為底層繫結器不是基於響應式堆疊的。因此,儘管這可能看起來像帶來了完整的端到端響應式堆疊,但此應用程式只是部分響應式的。
現在假設您正在將適用於 Kafka 的正確響應式繫結器(spring-cloud-stream-binder-kafka-reactive)與上述函式應用程式一起使用。此繫結器實現將從鏈的頂端消費到末端釋出,提供完整的響應式優勢。這是因為底層繫結器基於 Reactor Kafka 的核心 API 構建。在消費者端,它使用 KafkaReceiver,這是一個 Kafka 消費者的響應式實現。同樣,在生產者端,它使用 KafkaSender API,這是一個 Kafka 生產者的響應式實現。由於響應式 Kafka 繫結器的基礎是建立在適當的響應式 Kafka API 之上的,因此應用程式可以充分利用響應式技術的優勢。使用此響應式 Kafka 繫結器時,應用程式內建了諸如自動背壓等響應式功能。
從版本 4.0.2 開始,您可以透過分別提供一個或多個 ReceiverOptionsCustomizer 或 SenderOptionsCustomizer bean 來定製 ReceiverOptions 和 SenderOptions。它們是 BiFunction,接收繫結名稱和初始選項,並返回定製的選項。這些介面擴充套件了 Ordered,因此當存在多個定製器時,它們將按所需的順序應用。
繫結器預設不提交偏移量。從版本 4.0.2 開始,KafkaHeaders.ACKNOWLEDGMENT 頭包含一個 ReceiverOffset 物件,您可以透過呼叫其 acknowledge() 或 commit() 方法來提交偏移量。 |
@Bean
public Consumer<Flux<Message<String>>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
有關更多資訊,請參閱 reactor-kafka 文件和 javadoc。
此外,從版本 4.0.3 開始,Kafka 消費者屬性 reactiveAtmostOnce 可以設定為 true,繫結器將在處理每個輪詢返回的記錄之前自動提交偏移量。此外,從版本 4.0.3 開始,您可以將消費者屬性 reactiveAutoCommit 設定為 true,繫結器將在處理每個輪詢返回的記錄之後自動提交偏移量。在這些情況下,不存在確認頭。
4.0.2 版本也提供了 reactiveAutoCommit,但實現不正確,它的行為類似於 reactiveAtMostOnce。 |
以下是如何使用 reactiveAutoCommit 的示例。
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
請注意,當使用自動提交時,reactor-kafka 返回一個 Flux<Flux<ConsumerRecord<?, ?>>>。鑑於 Spring 無法訪問內部 Flux 的內容,應用程式必須處理原生的 ConsumerRecord;對內容不應用訊息轉換或轉換服務。這需要使用原生解碼(透過在配置中指定適當型別的 Deserializer)來返回所需型別的記錄鍵/值。