按順序啟動 @KafkaListener

一個常見的用例是在一個監聽器消費完一個主題中的所有記錄後,再啟動另一個監聽器。例如,在處理來自其他主題的記錄之前,您可能希望將一個或多個壓縮主題的內容載入到記憶體中。從版本 2.7.3 開始,引入了一個新的元件 ContainerGroupSequencer。它使用 @KafkaListenercontainerGroup 屬性將容器分組,並在當前組中的所有容器都空閒時啟動下一個組中的容器。

透過一個例子最能說明這一點。

@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}

@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}

@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}

@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}

@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
    return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}

這裡,我們在 g1 和 g2 兩個組中有 4 個監聽器。

在應用上下文初始化期間,sequencer 將所提供組中所有容器的 autoStartup 屬性設定為 false。它還將任何容器(尚未設定該屬性的)的 idleEventInterval 設定為提供的值(在本例中為 5000ms)。然後,當 sequencer 由應用上下文啟動時,將啟動第一個組中的容器。當收到 ListenerContainerIdleEvent 時,每個容器中的每個單獨子容器都會停止。當 ConcurrentMessageListenerContainer 中的所有子容器都停止時,父容器會停止。當一個組中的所有容器都停止後,將啟動下一個組中的容器。組的數量或組中的容器數量沒有限制。

預設情況下,最後一個組(即上面的 g2)中的容器在空閒時不停止。要修改此行為,請在 sequencer 上將 stopLastGroupWhenIdle 設定為 true

順帶一提,之前每個組中的容器都被新增到一個型別為 Collection<MessageListenerContainer> 的 bean 中,其 bean 名稱就是 containerGroup。這些集合現在已被棄用,取而代之的是型別為 ContainerGroup 的 bean,其 bean 名稱是組名後加上 .group 字尾;在上面的例子中,將會有兩個 bean,g1.groupg2.groupCollection bean 將在未來的版本中移除。