在類上使用 @KafkaListener

當你在類級別使用 @KafkaListener 時,你必須在方法級別指定 @KafkaHandler。訊息投遞時,會根據轉換後的訊息載荷型別來確定呼叫哪個方法。以下示例展示瞭如何做到這一點:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

從 2.1.3 版本開始,你可以將一個 @KafkaHandler 方法指定為預設方法,如果沒有匹配到其他方法,則會呼叫該方法。最多隻能指定一個預設方法。當使用 @KafkaHandler 方法時,載荷必須已經轉換為領域物件(以便進行匹配)。請使用自定義反序列化器、JsonDeserializer 或將 TypePrecedence 設定為 TYPE_IDJsonMessageConverter。有關更多資訊,請參閱 序列化、反序列化和訊息轉換

由於 Spring 解析方法引數的方式存在一些限制,預設的 @KafkaHandler 不能接收離散的訊息頭;它必須使用 ConsumerRecordMetadata,如 消費者記錄元資料 中所述。

例如:

@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

如果物件是 String 型別,這將不起作用;topic 引數也將獲得對 object 的引用。

如果你在預設方法中需要記錄的元資料,請使用以下方式:

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}