暫停和恢復監聽器容器
版本 2.1.3 為監聽器容器添加了 pause()
和 resume()
方法。以前,您可以在 ConsumerAwareMessageListener
中暫停消費者,並透過監聽 ListenerContainerIdleEvent
來恢復它,ListenerContainerIdleEvent
提供了對 Consumer
物件的訪問。雖然您可以使用事件監聽器在空閒容器中暫停消費者,但在某些情況下,這並非執行緒安全,因為無法保證事件監聽器在消費者執行緒上呼叫。要安全地暫停和恢復消費者,您應該使用監聽器容器上的 pause
和 resume
方法。pause()
在下一次 poll()
之前生效;resume()
在當前 poll()
返回之後立即生效。當容器暫停時,它會繼續 poll()
消費者,如果在進行組管理,則避免了重平衡,但它不會檢索任何記錄。有關更多資訊,請參閱 Kafka 文件。
從版本 2.1.5 開始,您可以呼叫 isPauseRequested()
來檢視是否已呼叫 pause()
。但是,消費者可能實際上尚未暫停。如果所有 Consumer
例項都已實際暫停,isConsumerPaused()
返回 true。
此外(同樣從 2.1.5 開始),會發布 ConsumerPausedEvent
和 ConsumerResumedEvent
例項,其中容器作為 source
屬性,涉及的 TopicPartition
例項在 partitions
屬性中。
從版本 2.9 開始,一個新的容器屬性 pauseImmediate
,當設定為 true 時,會導致暫停在處理當前記錄後立即生效。預設情況下,暫停在前一次 poll
的所有記錄都處理完畢後生效。請參閱 pauseImmediate
。
以下簡單的 Spring Boot 應用程式演示瞭如何使用容器登錄檔獲取 `@KafkaListener` 方法的容器引用,並暫停或恢復其消費者以及接收相應的事件。
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
以下列表顯示了前面示例的結果。
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2