空載荷和“墓碑”記錄的日誌壓縮
當您使用日誌壓縮時,可以傳送和接收帶有null載荷的訊息,以標識鍵的刪除。
您也可能因為其他原因收到null值,例如Deserializer在無法反序列化值時可能會返回null。
要使用KafkaTemplate傳送null載荷,您可以將null傳遞給send()方法的value引數。其中一個例外是send(Message<?> message)變體。由於spring-messaging的Message<?>不能有null載荷,您可以使用一種特殊的載荷型別,稱為KafkaNull,框架會發送null。為方便起見,提供了靜態的KafkaNull.INSTANCE。
當您使用訊息監聽器容器時,收到的ConsumerRecord的value()為null。
要配置@KafkaListener來處理null載荷,您必須使用@Payload註解並設定required = false。如果它是用於壓縮日誌的墓碑訊息,您通常還需要鍵,以便您的應用程式可以確定哪個鍵被“刪除”了。以下示例展示了這樣的配置。
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
當您使用類級別的@KafkaListener和多個@KafkaHandler方法時,需要一些額外的配置。具體來說,您需要一個帶有KafkaNull載荷的@KafkaHandler方法。以下示例展示瞭如何配置一個。
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
請注意,引數是null,而不是KafkaNull。
| 請參閱手動分配所有分割槽。 |
此功能需要使用KafkaNullAwarePayloadArgumentResolver,框架在使用預設MessageHandlerMethodFactory時將配置它。當使用自定義MessageHandlerMethodFactory時,請參閱向@KafkaListener新增自定義HandlerMethodArgumentResolver。 |