重平衡監聽器

ContainerProperties 有一個名為 consumerRebalanceListener 的屬性,它接受 Kafka 客戶端 ConsumerRebalanceListener 介面的實現。如果未提供此屬性,容器會配置一個日誌監聽器,用於在 INFO 級別記錄重平衡事件。該框架還添加了一個子介面 ConsumerAwareRebalanceListener。以下列表顯示了 ConsumerAwareRebalanceListener 介面的定義

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

請注意,當分割槽被撤銷時有兩個回撥。第一個會立即呼叫。第二個會在任何待處理的偏移量提交後呼叫。如果您希望在某些外部倉庫中維護偏移量,這會非常有用,如下例所示

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
        store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
        consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});
從版本 2.4 開始,新增了一個方法 onPartitionsLost()(類似於 ConsumerRebalanceLister 中同名的方法)。ConsumerRebalanceLister 的預設實現僅呼叫 onPartitionsRevokedConsumerAwareRebalanceListener 的預設實現不做任何事情。當為監聽器容器提供自定義監聽器(任一型別)時,重要的是您的實現不要在 onPartitionsLost 中呼叫 onPartitionsRevoked。如果您實現了 ConsumerRebalanceListener,您應該覆蓋預設方法。這是因為監聽器容器在呼叫您的實現上的方法後,將從其 onPartitionsLost 的實現中呼叫其自身的 onPartitionsRevoked。如果您的實現委託給預設行為,那麼每次 Consumer 呼叫容器監聽器上的該方法時,onPartitionsRevoked 將被呼叫兩次。