空載荷和“墓碑”記錄的日誌壓縮
使用日誌壓縮時,您可以傳送和接收帶有 null
載荷的訊息來標識鍵的刪除。您也可能由於其他原因接收到 null
值,例如反序列化器在無法反序列化值時可能返回 null
。
生產空載荷
您可以透過將 null
訊息引數值傳遞給 ReactivePulsarTemplate
的其中一個 send
方法來發送 null
值,例如
reactiveTemplate
.send(null, Schema.STRING)
.subscribe();
傳送 null 值時,必須指定 Schema 型別,因為系統無法從 null 載荷確定訊息的型別。 |
消費空載荷
對於 `@ReactivePulsarListener`,`null` 載荷根據其訊息引數的型別傳遞到監聽器方法中,具體如下
引數型別 | 傳入的值 |
---|---|
基本型別 |
|
使用者定義型別 |
|
|
一個非空的 Pulsar 訊息,其 |
|
一個非空的 Spring 訊息,其 |
|
一個非空的 Flux,其條目是非空的 Pulsar 訊息,這些訊息的 |
|
一個非空的 Flux,其條目是非空的 Spring 訊息,這些訊息的 |
當傳入的值為 null 時(即使用基本型別或使用者定義型別的單記錄監聽器),您必須使用 `@Payload` 引數註解並設定 required = false 。 |
當使用 Spring 的 org.springframework.messaging.Message 作為監聽器的載荷型別時,其泛型型別資訊必須足夠寬泛,以接受 Message<PulsarNull> (例如 Message 、Message<?> 或 Message<Object> )。這是因為 Spring Message 不允許其載荷為 null 值,而是使用 PulsarNull 佔位符。 |
如果它是用於壓縮日誌的墓碑訊息,您通常還需要獲取 key,以便您的應用程式可以確定哪個 key 被“`刪除`”了。以下示例展示了這種配置
@ReactivePulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
Mono<Void> myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
使用流式訊息監聽器(Flux )時,頭部支援有限,因此在日誌壓縮場景中不太有用。 |