非同步 @KafkaListener 返回型別

從 3.2 版本開始,可以為 @KafkaListener(和 @KafkaHandler)方法指定非同步返回型別,從而允許非同步傳送回覆。返回型別包括 CompletableFuture<?>Mono<?> 和 Kotlin suspend 函式。

@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
    ...
    CompletableFuture<String> future = new CompletableFuture<>();
    future.complete("done");
    return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
    ...
    return Mono.empty();
}
檢測到非同步返回型別時,AckMode 將自動設定為 MANUAL 並啟用亂序提交;取而代之的是,非同步操作完成後將進行 ack。當非同步結果以錯誤完成時,訊息是否恢復取決於容器錯誤處理器。如果在監聽器方法中發生阻止建立非同步結果物件的異常,您必須捕獲該異常並返回適當的返回物件,該物件將導致訊息被 ack 或恢復。

如果在具有非同步返回型別(包括 Kotlin suspend 函式)的監聽器上配置了 KafkaListenerErrorHandler,則在失敗後會呼叫該錯誤處理器。有關此錯誤處理器及其用途的更多資訊,請參閱異常處理