重新平衡監聽器
ContainerProperties 有一個名為 consumerRebalanceListener 的屬性,它接受 Kafka 客戶端 ConsumerRebalanceListener 介面的實現。如果未提供此屬性,容器將配置一個日誌監聽器,以 INFO 級別記錄重新平衡事件。該框架還添加了一個子介面 ConsumerAwareRebalanceListener。以下列表顯示了 ConsumerAwareRebalanceListener 介面定義
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
請注意,當分割槽被撤銷時有兩個回撥。第一個會立即被呼叫。第二個在任何待處理的偏移量提交後被呼叫。如果您希望在某些外部儲存庫中維護偏移量,這會很有用,如以下示例所示
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
從 2.4 版本開始,添加了一個新方法 onPartitionsLost()(類似於 ConsumerRebalanceLister 中同名的方法)。ConsumerRebalanceLister 的預設實現只是呼叫 onPartitionsRevoked。ConsumerAwareRebalanceListener 的預設實現不執行任何操作。當為監聽器容器提供自定義監聽器(任一型別)時,重要的是您的實現不要從 onPartitionsLost 呼叫 onPartitionsRevoked。如果您實現 ConsumerRebalanceListener,則應覆蓋預設方法。這是因為監聽器容器將在呼叫您的實現上的方法後,從其 onPartitionsLost 實現中呼叫其自身的 onPartitionsRevoked。如果您的實現委託給預設行為,那麼每次 Consumer 在容器的監聽器上呼叫該方法時,onPartitionsRevoked 將被呼叫兩次。 |
Kafka 4.0 消費者重新平衡協議
Spring for Apache Kafka 4.0 支援 Apache Kafka 4.0 的新消費者重新平衡協議 (KIP-848),該協議透過伺服器驅動的增量分割槽分配提高了效能。這減少了消費者組的重新平衡停機時間。
要啟用新協議,請配置 group.protocol 屬性
spring.kafka.consumer.properties.group.protocol=consumer
請記住,上述屬性是 Spring Boot 屬性。如果您不使用 Spring Boot,您可能需要手動設定它,如下所示。
或者,透過程式設計方式設定它
Map<String, Object> props = new HashMap<>();
props.put("group.protocol", "consumer");
ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);
新協議與 ConsumerAwareRebalanceListener 無縫協作。由於增量重新平衡,onPartitionsAssigned 可能會被多次呼叫,並帶有較小的分割槽集,這與傳統協議中典型的單次回撥不同。
新協議使用伺服器端分割槽分配,忽略透過 spring.kafka.consumer.partition-assignment-strategy 設定的客戶端自定義分配器。如果檢測到自定義分配器,將記錄警告。要使用自定義分配器,請設定 group.protocol=classic(如果您未指定 group.protocol 的值,則這是預設值)。