過濾訊息
Spring for Apache Kafka 專案還透過 FilteringMessageListenerAdapter 類提供了一些幫助,該類可以包裝您的 MessageListener。該類需要一個 RecordFilterStrategy 的實現,您在該實現中實現 filter 方法以指示訊息是重複的並且應該被丟棄。它還有一個附加屬性 ackDiscarded,指示介面卡是否應該確認被丟棄的記錄。預設情況下,它為 false。
當您使用 @KafkaListener 時,在容器工廠上設定 RecordFilterStrategy(並可選地設定 ackDiscarded),以便監聽器被包裝在適當的過濾介面卡中。
此外,還提供了 FilteringBatchMessageListenerAdapter,用於當您使用批次訊息監聽器時。
如果您的 @KafkaListener 接收 ConsumerRecords<?, ?> 而不是 List<ConsumerRecord<?, ?>>,則 FilteringBatchMessageListenerAdapter 將被忽略,因為 ConsumerRecords 是不可變的。 |
從版本 2.8.4 開始,您可以透過使用監聽器註解上的 filter 屬性來覆蓋監聽器容器工廠的預設 RecordFilterStrategy。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
從版本 3.3 開始,支援忽略由 RecordFilterStrategy 過濾產生的空批次。在實現 RecordFilterStrategy 時,可以透過 ignoreEmptyBatch() 進行配置。預設設定為 false,表示即使所有 ConsumerRecord 都被過濾掉,KafkaListener 仍將被呼叫。
如果返回 true,當所有 ConsumerRecord 都被過濾掉時,KafkaListener 將不會被呼叫。但是,對 broker 的提交仍將執行。
如果返回 false,當所有 ConsumerRecord 都被過濾掉時,KafkaListener 仍將被呼叫。
以下是一些示例。
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
}
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
在這種情況下,IgnoreEmptyBatchRecordFilterStrategy 始終返回空列表,並且 ignoreEmptyBatch() 的結果返回 true。因此,KafkaListener#listen(…) 將永遠不會被呼叫。
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return false;
}
}
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
然而,在這種情況下,IgnoreEmptyBatchRecordFilterStrategy 始終返回空列表,並且 ignoreEmptyBatch() 的結果返回 false。因此,KafkaListener#listen(…) 總是會被呼叫。