@KafkaListener 生命週期管理

@KafkaListener 註解建立的監聽器容器不是應用程式上下文中的 bean。相反,它們註冊到一個 KafkaListenerEndpointRegistry 型別的基礎設施 bean 中。此 bean 由框架自動宣告並管理容器的生命週期;它將自動啟動任何將 autoStartup 設定為 true 的容器。所有容器工廠建立的所有容器必須處於相同的 phase。有關更多資訊,請參閱 監聽器容器自動啟動。您可以透過使用登錄檔以程式設計方式管理生命週期。啟動或停止登錄檔將啟動或停止所有註冊的容器。或者,您可以透過使用其 id 屬性獲取對單個容器的引用。您可以在註解上設定 autoStartup,它將覆蓋容器工廠中配置的預設設定。您可以從應用程式上下文(例如自動裝配)獲取對 bean 的引用,以管理其註冊的容器。以下示例展示瞭如何做到這一點

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

登錄檔只維護它管理的容器的生命週期;宣告為 bean 的容器不受登錄檔管理,可以從應用程式上下文獲取。可以透過呼叫登錄檔的 getListenerContainers() 方法獲取託管容器的集合。版本 2.2.5 添加了一個便捷方法 getAllListenerContainers(),它返回所有容器的集合,包括由登錄檔管理的容器和宣告為 bean 的容器。返回的集合將包含任何已初始化的原型 bean,但不會初始化任何延遲 bean 宣告。

應用程式上下文重新整理後註冊的端點將立即啟動,無論其 autoStartup 屬性如何,以遵守 SmartLifecycle 契約,其中 autoStartup 僅在應用程式上下文初始化期間考慮。後期註冊的一個例子是具有 @KafkaListener 的原型範圍的 bean,其中在上下文初始化後建立例項。從版本 2.8.7 開始,您可以將登錄檔的 alwaysStartAfterRefresh 屬性設定為 false,然後容器的 autoStartup 屬性將定義容器是否啟動。

從 KafkaListenerEndpointRegistry 中檢索 MessageListenerContainers

KafkaListenerEndpointRegistry 提供了用於檢索 MessageListenerContainer 例項的方法,以適應各種管理場景

所有容器:對於涵蓋所有監聽器容器的操作,使用 getListenerContainers() 檢索全面的集合。

Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();

按 ID 指定容器:要管理單個容器,getListenerContainer(String id) 允許透過其 ID 進行檢索。

MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");

動態容器篩選:在版本 3.2 中引入的兩個過載 getListenerContainersMatching 方法,允許對容器進行精細選擇。一個方法將 Predicate<String> 作為引數用於基於 ID 的篩選,而另一個方法將 BiPredicate<String, MessageListenerContainer> 作為引數用於更高級別的篩選,其中可能包括容器屬性或狀態。

// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
    registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));

// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
    registry.getListenerContainersMatching(myPattern::matches);

// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
    registry.getListenerContainersMatching(myIdSet::contains);

// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
    registry.getListenerContainersMatching(
        (id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
    );

利用這些方法可以高效地管理和查詢應用程式中的 MessageListenerContainer 例項。

© . This site is unofficial and not affiliated with VMware.