過濾訊息
在某些場景下,例如再平衡(rebalancing),已經處理過的訊息可能會被重新投遞。框架無法知道這樣的訊息是否已經被處理。這是應用層面的功能。這被稱為冪等接收者模式,Spring Integration 提供了一個實現。
Spring for Apache Kafka 專案也透過 FilteringMessageListenerAdapter
類提供了一些幫助,它可以包裝你的 MessageListener
。這個類接受一個 RecordFilterStrategy
的實現,你在其中實現 filter
方法來標記訊息是否是重複的,是否應該被丟棄。它還有一個額外的屬性 ackDiscarded
,表示介面卡是否應該確認(acknowledge)被丟棄的記錄。預設值是 false
。
當你使用 @KafkaListener
時,在容器工廠上設定 RecordFilterStrategy
(以及可選的 ackDiscarded
),以便監聽器被包裝在適當的過濾介面卡中。
此外,還提供了 FilteringBatchMessageListenerAdapter
,用於你使用批次訊息監聽器的場景。
如果你的 @KafkaListener 接收的是 ConsumerRecords<?, ?> 而不是 List<ConsumerRecord<?, ?>> ,FilteringBatchMessageListenerAdapter 將被忽略,因為 ConsumerRecords 是不可變的。 |
從 2.8.4 版本開始,你可以透過監聽器註解上的 filter
屬性覆蓋監聽器容器工廠的預設 RecordFilterStrategy
。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
從 3.3 版本開始,支援忽略因 RecordFilterStrategy
過濾而產生的空批次。在實現 RecordFilterStrategy
時,可以透過 ignoreEmptyBatch()
進行配置。預設設定是 false
,表示即使所有 ConsumerRecord
都被過濾掉,KafkaListener
仍將被呼叫。
如果返回 true
,當所有 ConsumerRecord
都被過濾掉時,KafkaListener
將不會被呼叫。然而,提交到 broker 的操作仍然會執行。
如果返回 false
,當所有 ConsumerRecord
都被過濾掉時,KafkaListener
將會被呼叫。
以下是一些示例。
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
};
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
在這種情況下,IgnoreEmptyBatchRecordFilterStrategy
總是返回空列表,並且 ignoreEmptyBatch()
的結果為 true
。因此 KafkaListener#listen(…)
將永遠不會被呼叫。
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return false;
}
};
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
然而,在這種情況下,IgnoreEmptyBatchRecordFilterStrategy
總是返回空列表,並且 ignoreEmptyBatch()
的結果為 false
。因此 KafkaListener#listen(…)
總是會被呼叫。