暫停和恢復監聽器容器

版本 2.1.3 為監聽器容器添加了 pause()resume() 方法。以前,您可以在 ConsumerAwareMessageListener 中暫停消費者,並透過監聽 ListenerContainerIdleEvent 來恢復它,ListenerContainerIdleEvent 提供了對 Consumer 物件的訪問。雖然您可以使用事件監聽器在空閒容器中暫停消費者,但在某些情況下,這並非執行緒安全,因為無法保證事件監聽器在消費者執行緒上呼叫。要安全地暫停和恢復消費者,您應該使用監聽器容器上的 pauseresume 方法。pause() 在下一次 poll() 之前生效;resume() 在當前 poll() 返回之後立即生效。當容器暫停時,它會繼續 poll() 消費者,如果在進行組管理,則避免了重平衡,但它不會檢索任何記錄。有關更多資訊,請參閱 Kafka 文件。

從版本 2.1.5 開始,您可以呼叫 isPauseRequested() 來檢視是否已呼叫 pause()。但是,消費者可能實際上尚未暫停。如果所有 Consumer 例項都已實際暫停,isConsumerPaused() 返回 true。

此外(同樣從 2.1.5 開始),會發布 ConsumerPausedEventConsumerResumedEvent 例項,其中容器作為 source 屬性,涉及的 TopicPartition 例項在 partitions 屬性中。

從版本 2.9 開始,一個新的容器屬性 pauseImmediate,當設定為 true 時,會導致暫停在處理當前記錄後立即生效。預設情況下,暫停在前一次 poll 的所有記錄都處理完畢後生效。請參閱 pauseImmediate

以下簡單的 Spring Boot 應用程式演示瞭如何使用容器登錄檔獲取 `@KafkaListener` 方法的容器引用,並暫停或恢復其消費者以及接收相應的事件。

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下列表顯示了前面示例的結果。

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2