空載荷和 'Tombstone' 記錄的日誌壓縮

使用日誌壓縮 (Log Compaction) 時,您可以傳送和接收包含 null 載荷的訊息,以標識某個鍵已被刪除。

您也可能由於其他原因收到 null 值,例如反序列化器 (Deserializer) 在無法反序列化值時可能返回 null

要使用 KafkaTemplate 傳送 null 載荷,可以將 null 傳遞給 send() 方法的值引數。一個例外是 send(Message) 變體。由於 spring-messagingMessage 不能包含 null 載荷,您可以使用一個特殊的載荷型別 KafkaNull,框架會發送 null。為了方便起見,提供了靜態的 KafkaNull.INSTANCE

當您使用訊息監聽器容器時,接收到的 ConsumerRecordvalue()null

要配置 @KafkaListener 來處理 null 載荷,必須使用 @Payload 註解並設定 required = false。如果它是壓縮日誌的 Tombstone 訊息,通常您還需要獲取鍵,以便您的應用能夠確定哪個鍵被“刪除”了。以下示例展示了這樣的配置:

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

當您在類級別使用 @KafkaListener 並且包含多個 @KafkaHandler 方法時,需要額外的配置。具體來說,您需要一個使用 KafkaNull 載荷的 @KafkaHandler 方法。以下示例展示瞭如何配置:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

請注意,這裡的引數是 null,而不是 KafkaNull

此功能需要使用 KafkaNullAwarePayloadArgumentResolver,框架在使用預設的 MessageHandlerMethodFactory 時會進行配置。如果使用自定義的 MessageHandlerMethodFactory,請參見 @KafkaListener 新增自定義 HandlerMethodArgumentResolver