類上的 @KafkaListener
當您在類級別使用 @KafkaListener 時,您必須在方法級別指定 @KafkaHandler。如果此類或其子類的任何方法上沒有 @KafkaHandler,框架將拒絕此類配置。@KafkaHandler 註解是方法明確和簡潔目的所必需的。否則,很難在沒有額外限制的情況下決定此方法或其他方法。
當訊息傳遞時,轉換後的訊息載荷型別用於確定要呼叫的方法。以下示例展示瞭如何實現此目的
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
從 2.1.3 版本開始,您可以將 @KafkaHandler 方法指定為預設方法,如果與其他方法不匹配,則呼叫此方法。最多隻能指定一個此類方法。當使用 @KafkaHandler 方法時,載荷必須已經轉換為域物件(以便可以執行匹配)。使用自定義反序列化器、JacksonJsonDeserializer 或將 TypePrecedence 設定為 TYPE_ID 的 JacksonJsonMessageConverter。有關更多資訊,請參閱序列化、反序列化和訊息轉換。
由於 Spring 解析方法引數的方式存在一些限制,預設的 @KafkaHandler 無法接收離散的標頭;它必須使用 消費者記錄元資料 中討論的 ConsumerRecordMetadata。 |
例如:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果物件是 String,這將不起作用;topic 引數也將獲得對 object 的引用。
如果您在預設方法中需要有關記錄的元資料,請使用此方法
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
此外,這也將不起作用。topic 被解析為 payload。
@KafkaHandler(isDefault = true)
public void listenDefault(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// payload.equals(topic) is True.
...
}
如果存在預設方法中需要離散自定義標頭的用例,請使用此方法
@KafkaHandler(isDefault = true)
void listenDefault(String payload, @Headers Map<String, Object> headers) {
Object myValue = headers.get("MyCustomHeader");
...
}