強制消費者再平衡

Kafka 客戶端現在支援觸發強制再平衡的選項。從版本 3.1.2 開始,Spring for Apache Kafka 提供了透過訊息監聽器容器在 Kafka 消費者上呼叫此 API 的選項。呼叫此 API 時,只是通知 Kafka 消費者觸發強制再平衡;實際的再平衡只會在下一次 poll() 操作中發生。如果已經有再平衡正在進行中,呼叫強制再平衡是無效操作(NO-OP)。呼叫者必須等待當前再平衡完成才能再次呼叫。更多詳情請參閱 enforceRebalance 的 javadocs。

以下程式碼片段展示了使用訊息監聽器容器強制執行再平衡的要點。

@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
    System.out.println("From KafkaListener: " + in);
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
    return args -> {
        final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
        System.out.println("Enforcing a rebalance");
        Thread.sleep(5_000);
        listenerContainer.enforceRebalance();
        Thread.sleep(5_000);
    };
}

如上面的程式碼所示,應用使用 KafkaListenerEndpointRegistry 來獲取訊息監聽器容器的訪問許可權,然後呼叫其上的 enforceRebalance API。當在監聽器容器上呼叫 enforceRebalance 時,它會將呼叫委託給底層的 Kafka 消費者。Kafka 消費者將在下一次 poll() 操作中觸發再平衡。