`@KafkaListener` 生命週期管理
`@KafkaListener` 註解建立的監聽器容器不是應用上下文中的 bean。它們被註冊到一個型別為 `KafkaListenerEndpointRegistry` 的基礎設施 bean 中。這個 bean 由框架自動宣告,並管理容器的生命週期;它會自動啟動任何 `autoStartup` 設定為 `true` 的容器。所有容器工廠建立的所有容器必須位於相同的 `phase`。更多資訊請參見 監聽器容器自動啟動。您可以透過登錄檔以程式設計方式管理生命週期。啟動或停止登錄檔將啟動或停止所有已註冊的容器。或者,您可以使用容器的 `id` 屬性獲取對單個容器的引用。您可以在註解上設定 `autoStartup`,這將覆蓋容器工廠中配置的預設設定。您可以從應用上下文中獲取對該 bean 的引用(例如透過自動注入),以管理其註冊的容器。以下示例展示瞭如何實現這一點:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
登錄檔只維護它管理的容器的生命週期;宣告為 bean 的容器不受登錄檔管理,並且可以從應用上下文中獲取。透過呼叫登錄檔的 `getListenerContainers()` 方法可以獲取託管容器的集合。版本 2.2.5 增加了一個便利方法 `getAllListenerContainers()`,它返回所有容器的集合,包括登錄檔管理的容器和宣告為 bean 的容器。返回的集合將包含任何已初始化的原型 bean,但不會初始化任何延遲載入的 bean 宣告。
應用上下文重新整理後註冊的端點會立即啟動,無論其 `autoStartup` 屬性為何,這是為了遵守 `SmartLifecycle` 契約,其中 `autoStartup` 只在應用上下文初始化期間考慮。延遲註冊的一個例子是原型作用域中包含 `@KafkaListener` 的 bean,其例項在上下文初始化後建立。從版本 2.8.7 開始,您可以將登錄檔的 `alwaysStartAfterRefresh` 屬性設定為 `false`,然後容器的 `autoStartup` 屬性將決定容器是否啟動。 |
從 KafkaListenerEndpointRegistry 中檢索 MessageListenerContainer
`KafkaListenerEndpointRegistry` 提供了多種檢索 `MessageListenerContainer` 例項的方法,以適應不同的管理場景:
所有容器:對於涵蓋所有監聽器容器的操作,使用 `getListenerContainers()` 方法來獲取一個完整的集合。
Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();
按 ID 獲取特定容器:要管理單個容器,可以使用 `getListenerContainer(String id)` 方法透過其 ID 進行檢索。
MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");
動態容器過濾:版本 3.2 中引入了兩個過載的 `getListenerContainersMatching` 方法,可以實現更精細的容器選擇。一個方法接受一個 `Predicate<String>` 作為引數,用於基於 ID 的過濾;另一個方法接受一個 `BiPredicate<String, MessageListenerContainer>` 作為引數,用於更高階的條件過濾,這些條件可能包括容器屬性或狀態。
// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));
// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
registry.getListenerContainersMatching(myPattern::matches);
// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
registry.getListenerContainersMatching(myIdSet::contains);
// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
registry.getListenerContainersMatching(
(id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
);
利用這些方法可以有效地管理和查詢應用程式中的 `MessageListenerContainer` 例項。