過濾訊息

在某些場景下,例如再平衡(rebalancing),已經處理過的訊息可能會被重新投遞。框架無法知道這樣的訊息是否已經被處理。這是應用層面的功能。這被稱為冪等接收者模式,Spring Integration 提供了一個實現

Spring for Apache Kafka 專案也透過 FilteringMessageListenerAdapter 類提供了一些幫助,它可以包裝你的 MessageListener。這個類接受一個 RecordFilterStrategy 的實現,你在其中實現 filter 方法來標記訊息是否是重複的,是否應該被丟棄。它還有一個額外的屬性 ackDiscarded,表示介面卡是否應該確認(acknowledge)被丟棄的記錄。預設值是 false

當你使用 @KafkaListener 時,在容器工廠上設定 RecordFilterStrategy(以及可選的 ackDiscarded),以便監聽器被包裝在適當的過濾介面卡中。

此外,還提供了 FilteringBatchMessageListenerAdapter,用於你使用批次訊息監聽器的場景。

如果你的 @KafkaListener 接收的是 ConsumerRecords<?, ?> 而不是 List<ConsumerRecord<?, ?>>FilteringBatchMessageListenerAdapter 將被忽略,因為 ConsumerRecords 是不可變的。

從 2.8.4 版本開始,你可以透過監聽器註解上的 filter 屬性覆蓋監聽器容器工廠的預設 RecordFilterStrategy

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}

從 3.3 版本開始,支援忽略因 RecordFilterStrategy 過濾而產生的空批次。在實現 RecordFilterStrategy 時,可以透過 ignoreEmptyBatch() 進行配置。預設設定是 false,表示即使所有 ConsumerRecord 都被過濾掉,KafkaListener 仍將被呼叫。

如果返回 true,當所有 ConsumerRecord 都被過濾掉時,KafkaListener 將不會被呼叫。然而,提交到 broker 的操作仍然會執行。

如果返回 false,當所有 ConsumerRecord 都被過濾掉時,KafkaListener 將會被呼叫

以下是一些示例。

public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return true;
    }
};

// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

在這種情況下,IgnoreEmptyBatchRecordFilterStrategy 總是返回空列表,並且 ignoreEmptyBatch() 的結果為 true。因此 KafkaListener#listen(…​) 將永遠不會被呼叫。

public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return false;
    }
};

// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

然而,在這種情況下,IgnoreEmptyBatchRecordFilterStrategy 總是返回空列表,並且 ignoreEmptyBatch() 的結果為 false。因此 KafkaListener#listen(…​) 總是會被呼叫。