重平衡監聽器
ContainerProperties
有一個名為 consumerRebalanceListener
的屬性,它接受 Kafka 客戶端 ConsumerRebalanceListener
介面的實現。如果未提供此屬性,容器會配置一個日誌監聽器,用於在 INFO
級別記錄重平衡事件。該框架還添加了一個子介面 ConsumerAwareRebalanceListener
。以下列表顯示了 ConsumerAwareRebalanceListener
介面的定義
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
請注意,當分割槽被撤銷時有兩個回撥。第一個會立即呼叫。第二個會在任何待處理的偏移量提交後呼叫。如果您希望在某些外部倉庫中維護偏移量,這會非常有用,如下例所示
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
從版本 2.4 開始,新增了一個方法 onPartitionsLost() (類似於 ConsumerRebalanceLister 中同名的方法)。ConsumerRebalanceLister 的預設實現僅呼叫 onPartitionsRevoked 。ConsumerAwareRebalanceListener 的預設實現不做任何事情。當為監聽器容器提供自定義監聽器(任一型別)時,重要的是您的實現不要在 onPartitionsLost 中呼叫 onPartitionsRevoked 。如果您實現了 ConsumerRebalanceListener ,您應該覆蓋預設方法。這是因為監聽器容器在呼叫您的實現上的方法後,將從其 onPartitionsLost 的實現中呼叫其自身的 onPartitionsRevoked 。如果您的實現委託給預設行為,那麼每次 Consumer 呼叫容器監聽器上的該方法時,onPartitionsRevoked 將被呼叫兩次。 |