按順序啟動 @KafkaListeners
一種常見的用例是在一個監聽器消費完主題中的所有記錄後啟動另一個監聽器。例如,您可能希望在處理來自其他主題的記錄之前,將一個或多個壓縮主題的內容載入到記憶體中。從版本2.7.3開始,引入了一個新的元件ContainerGroupSequencer。它使用@KafkaListener的containerGroup屬性將容器分組在一起,並在當前組中的所有容器都空閒後啟動下一個組中的容器。
透過一個例子可以更好地說明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
這裡,我們在兩個組g1和g2中有4個監聽器。
在應用程式上下文初始化期間,序列器將所提供組中所有容器的autoStartup屬性設定為false。它還為任何容器(如果尚未設定)設定idleEventInterval為提供的值(在本例中為5000ms)。然後,當序列器由應用程式上下文啟動時,將啟動第一個組中的容器。當接收到ListenerContainerIdleEvent時,每個容器中的每個單獨的子容器都會停止。當ConcurrentMessageListenerContainer中的所有子容器都停止時,父容器停止。當一個組中的所有容器都停止後,將啟動下一個組中的容器。組的數量或組中容器的數量沒有限制。
預設情況下,最終組(上面是g2)中的容器在空閒時不會停止。要修改該行為,請在序列器上將stopLastGroupWhenIdle設定為true。
另外,之前每個組中的容器都被新增到型別為Collection<MessageListenerContainer>的bean中,bean名稱為containerGroup。這些集合現在已棄用,取而代之的是型別為ContainerGroup的bean,其bean名稱是組名,字尾為.group;在上面的示例中,將有兩個bean g1.group和g2.group。Collection bean將在未來的版本中移除。