暫停和恢復監聽器容器

版本 2.1.3 為偵聽器容器添加了 pause()resume() 方法。此前,您可以在 ConsumerAwareMessageListener 中暫停消費者,並透過偵聽 ListenerContainerIdleEvent 來恢復它,該事件提供了對 Consumer 物件的訪問。雖然您可以使用事件偵聽器在空閒容器中暫停消費者,但在某些情況下,這並非執行緒安全,因為無法保證事件偵聽器在消費者執行緒上呼叫。為了安全地暫停和恢復消費者,您應該使用偵聽器容器上的 pauseresume 方法。pause() 在下一次 poll() 之前生效;resume() 在當前 poll() 返回之後生效。當容器暫停時,它會繼續 poll() 消費者,避免在使用組管理時重新平衡,但它不會檢索任何記錄。有關更多資訊,請參閱 Kafka 文件。

從版本 2.1.5 開始,您可以呼叫 isPauseRequested() 來檢視是否已呼叫 pause()。但是,消費者可能尚未實際暫停。如果所有 Consumer 例項都已實際暫停,則 isConsumerPaused() 返回 true。

此外(同樣從 2.1.5 開始),ConsumerPausedEventConsumerResumedEvent 例項會發布,其中容器作為 source 屬性,涉及的 TopicPartition 例項作為 partitions 屬性。

從版本 2.9 開始,一個新的容器屬性 pauseImmediate,當設定為 true 時,會導致暫停在當前記錄處理後立即生效。預設情況下,暫停會在處理完上次 poll 的所有記錄後生效。請參閱 pauseImmediate

以下簡單的 Spring Boot 應用程式透過使用容器登錄檔獲取 @KafkaListener 方法的容器引用,並暫停或恢復其消費者以及接收相應的事件來演示

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下列表顯示了前一個示例的結果

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2
© . This site is unofficial and not affiliated with VMware.