空載荷和 'Tombstone' 記錄的日誌壓縮
使用日誌壓縮 (Log Compaction) 時,您可以傳送和接收包含 null
載荷的訊息,以標識某個鍵已被刪除。
您也可能由於其他原因收到 null
值,例如反序列化器 (Deserializer) 在無法反序列化值時可能返回 null
。
要使用 KafkaTemplate
傳送 null
載荷,可以將 null
傳遞給 send()
方法的值引數。一個例外是 send(Message>)
變體。由於 spring-messaging
的 Message>
不能包含 null
載荷,您可以使用一個特殊的載荷型別 KafkaNull
,框架會發送 null
。為了方便起見,提供了靜態的 KafkaNull.INSTANCE
。
當您使用訊息監聽器容器時,接收到的 ConsumerRecord
的 value()
為 null
。
要配置 @KafkaListener
來處理 null
載荷,必須使用 @Payload
註解並設定 required = false
。如果它是壓縮日誌的 Tombstone 訊息,通常您還需要獲取鍵,以便您的應用能夠確定哪個鍵被“刪除”了。以下示例展示了這樣的配置:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
當您在類級別使用 @KafkaListener
並且包含多個 @KafkaHandler
方法時,需要額外的配置。具體來說,您需要一個使用 KafkaNull
載荷的 @KafkaHandler
方法。以下示例展示瞭如何配置:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
請注意,這裡的引數是 null
,而不是 KafkaNull
。
參見 手動分配所有分割槽。 |
此功能需要使用 KafkaNullAwarePayloadArgumentResolver ,框架在使用預設的 MessageHandlerMethodFactory 時會進行配置。如果使用自定義的 MessageHandlerMethodFactory ,請參見 向 @KafkaListener 新增自定義 HandlerMethodArgumentResolver 。 |