在類上使用 @KafkaListener
當你在類級別使用 @KafkaListener
時,你必須在方法級別指定 @KafkaHandler
。訊息投遞時,會根據轉換後的訊息載荷型別來確定呼叫哪個方法。以下示例展示瞭如何做到這一點:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
從 2.1.3 版本開始,你可以將一個 @KafkaHandler
方法指定為預設方法,如果沒有匹配到其他方法,則會呼叫該方法。最多隻能指定一個預設方法。當使用 @KafkaHandler
方法時,載荷必須已經轉換為領域物件(以便進行匹配)。請使用自定義反序列化器、JsonDeserializer
或將 TypePrecedence
設定為 TYPE_ID
的 JsonMessageConverter
。有關更多資訊,請參閱 序列化、反序列化和訊息轉換。
由於 Spring 解析方法引數的方式存在一些限制,預設的 @KafkaHandler 不能接收離散的訊息頭;它必須使用 ConsumerRecordMetadata ,如 消費者記錄元資料 中所述。 |
例如:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果物件是 String
型別,這將不起作用;topic
引數也將獲得對 object
的引用。
如果你在預設方法中需要記錄的元資料,請使用以下方式:
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}