暫停和恢復監聽器容器
版本 2.1.3 為偵聽器容器添加了 pause() 和 resume() 方法。此前,您可以在 ConsumerAwareMessageListener 中暫停消費者,並透過偵聽 ListenerContainerIdleEvent 來恢復它,該事件提供了對 Consumer 物件的訪問。雖然您可以使用事件偵聽器在空閒容器中暫停消費者,但在某些情況下,這並非執行緒安全,因為無法保證事件偵聽器在消費者執行緒上呼叫。為了安全地暫停和恢復消費者,您應該使用偵聽器容器上的 pause 和 resume 方法。pause() 在下一次 poll() 之前生效;resume() 在當前 poll() 返回之後生效。當容器暫停時,它會繼續 poll() 消費者,避免在使用組管理時重新平衡,但它不會檢索任何記錄。有關更多資訊,請參閱 Kafka 文件。
從版本 2.1.5 開始,您可以呼叫 isPauseRequested() 來檢視是否已呼叫 pause()。但是,消費者可能尚未實際暫停。如果所有 Consumer 例項都已實際暫停,則 isConsumerPaused() 返回 true。
此外(同樣從 2.1.5 開始),ConsumerPausedEvent 和 ConsumerResumedEvent 例項會發布,其中容器作為 source 屬性,涉及的 TopicPartition 例項作為 partitions 屬性。
從版本 2.9 開始,一個新的容器屬性 pauseImmediate,當設定為 true 時,會導致暫停在當前記錄處理後立即生效。預設情況下,暫停會在處理完上次 poll 的所有記錄後生效。請參閱 pauseImmediate。
以下簡單的 Spring Boot 應用程式透過使用容器登錄檔獲取 @KafkaListener 方法的容器引用,並暫停或恢復其消費者以及接收相應的事件來演示
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
以下列表顯示了前一個示例的結果
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2