類上的 @KafkaListener

當您在類級別使用 @KafkaListener 時,您必須在方法級別指定 @KafkaHandler。如果此類或其子類的任何方法上沒有 @KafkaHandler,框架將拒絕此類配置。@KafkaHandler 註解是方法明確和簡潔目的所必需的。否則,很難在沒有額外限制的情況下決定此方法或其他方法。

當訊息傳遞時,轉換後的訊息載荷型別用於確定要呼叫的方法。以下示例展示瞭如何實現此目的

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

從 2.1.3 版本開始,您可以將 @KafkaHandler 方法指定為預設方法,如果與其他方法不匹配,則呼叫此方法。最多隻能指定一個此類方法。當使用 @KafkaHandler 方法時,載荷必須已經轉換為域物件(以便可以執行匹配)。使用自定義反序列化器、JacksonJsonDeserializer 或將 TypePrecedence 設定為 TYPE_IDJacksonJsonMessageConverter。有關更多資訊,請參閱序列化、反序列化和訊息轉換

由於 Spring 解析方法引數的方式存在一些限制,預設的 @KafkaHandler 無法接收離散的標頭;它必須使用 消費者記錄元資料 中討論的 ConsumerRecordMetadata

例如:

@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

如果物件是 String,這將不起作用;topic 引數也將獲得對 object 的引用。

如果您在預設方法中需要有關記錄的元資料,請使用此方法

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}

此外,這也將不起作用。topic 被解析為 payload

@KafkaHandler(isDefault = true)
public void listenDefault(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    // payload.equals(topic) is True.
    ...
}

如果存在預設方法中需要離散自定義標頭的用例,請使用此方法

@KafkaHandler(isDefault = true)
void listenDefault(String payload, @Headers Map<String, Object> headers) {
    Object myValue = headers.get("MyCustomHeader");
    ...
}
© . This site is unofficial and not affiliated with VMware.