空載荷和“墓碑”記錄的日誌壓縮

使用日誌壓縮時,您可以傳送和接收帶有 null 載荷的訊息來標識鍵的刪除。您也可能由於其他原因接收到 null 值,例如反序列化器在無法反序列化值時可能返回 null

生產空載荷

您可以透過將 null 訊息引數值傳遞給 ReactivePulsarTemplate 的其中一個 send 方法來發送 null 值,例如

reactiveTemplate
        .send(null, Schema.STRING)
        .subscribe();
傳送 null 值時,必須指定 Schema 型別,因為系統無法從 null 載荷確定訊息的型別。

消費空載荷

對於 `@ReactivePulsarListener`,`null` 載荷根據其訊息引數的型別傳遞到監聽器方法中,具體如下

引數型別 傳入的值

基本型別

null

使用者定義型別

null

org.apache.pulsar.client.api.Message<T>

一個非空的 Pulsar 訊息,其 getValue() 返回 null

org.springframework.messaging.Message<T>

一個非空的 Spring 訊息,其 getPayload() 返回 PulsarNull

Flux<org.apache.pulsar.client.api.Message<T>>

一個非空的 Flux,其條目是非空的 Pulsar 訊息,這些訊息的 getValue() 返回 null

Flux<org.springframework.messaging.Message<T>>

一個非空的 Flux,其條目是非空的 Spring 訊息,這些訊息的 getPayload() 返回 PulsarNull

當傳入的值為 null 時(即使用基本型別或使用者定義型別的單記錄監聽器),您必須使用 `@Payload` 引數註解並設定 required = false
當使用 Spring 的 org.springframework.messaging.Message 作為監聽器的載荷型別時,其泛型型別資訊必須足夠寬泛,以接受 Message<PulsarNull>(例如 MessageMessage<?>Message<Object>)。這是因為 Spring Message 不允許其載荷為 null 值,而是使用 PulsarNull 佔位符。

如果它是用於壓縮日誌的墓碑訊息,您通常還需要獲取 key,以便您的應用程式可以確定哪個 key 被“`刪除`”了。以下示例展示了這種配置

@ReactivePulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
Mono<Void> myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}
使用流式訊息監聽器(Flux)時,頭部支援有限,因此在日誌壓縮場景中不太有用。