強制消費者重新平衡
Kafka 客戶端現在支援一個選項,用於觸發強制重新平衡。從 3.1.2 版本開始,Spring for Apache Kafka 提供了一個選項,可以透過訊息監聽器容器在 Kafka 消費者上呼叫此 API。當呼叫此 API 時,它只是通知 Kafka 消費者觸發強制重新平衡;實際的重新平衡只會在下一次 poll() 操作中發生。如果已經有一個重新平衡正在進行中,呼叫強制重新平衡是無效操作。呼叫者必須等待當前的重新平衡完成,然後才能呼叫另一次。有關詳細資訊,請參閱 enforceRebalance 的 javadoc。
以下程式碼片段展示了使用訊息監聽器容器強制重新平衡的要點。
@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
System.out.println("From KafkaListener: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
return args -> {
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
System.out.println("Enforcing a rebalance");
Thread.sleep(5_000);
listenerContainer.enforceRebalance();
Thread.sleep(5_000);
};
}
如上述程式碼所示,應用程式使用 KafkaListenerEndpointRegistry 獲取訊息監聽器容器的訪問許可權,然後在其上呼叫 enforceRebalance API。當在監聽器容器上呼叫 enforceRebalance 時,它將呼叫委託給底層的 Kafka 消費者。Kafka 消費者將在下一次 poll() 操作中觸發重新平衡。