非同步 @KafkaListener 返回型別
從 3.2 版本開始,@KafkaListener(和 @KafkaHandler)方法可以指定非同步返回型別,允許非同步傳送回覆。返回型別包括 CompletableFuture<?>、Mono<?> 和 Kotlin suspend 函式。
@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
...
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("done");
return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
...
return Mono.empty();
}
當檢測到非同步返回型別時,AckMode 將自動設定為 MANUAL 並啟用亂序提交;相反,非同步完成將在非同步操作完成時進行確認。當非同步結果以錯誤完成時,訊息是否恢復取決於容器錯誤處理器。如果在監聽器方法中發生阻止建立非同步結果物件的異常,您必須捕獲該異常並返回適當的返回物件,該物件將導致訊息被確認或恢復。 |
如果在具有非同步返回型別(包括 Kotlin suspend 函式)的監聽器上配置了 KafkaListenerErrorHandler,則錯誤處理程式在失敗後被呼叫。有關此錯誤處理程式及其用途的更多資訊,請參閱處理異常。