監聽器併發

SimpleMessageListenerContainer

預設情況下,偵聽器容器啟動一個消費者,從佇列中接收訊息。

檢視上一節中的表格,您可以看到許多控制併發的屬性和特性。最簡單的是 concurrentConsumers,它建立(固定)數量的消費者來併發處理訊息。

在 1.3.0 版本之前,這是唯一可用的設定,並且必須停止並重新啟動容器才能更改此設定。

從 1.3.0 版本開始,您現在可以動態調整 concurrentConsumers 屬性。如果在容器執行時更改了此屬性,將根據需要新增或移除消費者以適應新設定。

此外,還添加了一個名為 maxConcurrentConsumers 的新屬性,容器會根據工作負載動態調整併發性。這與另外四個屬性結合使用:consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval。在預設設定下,增加消費者的演算法如下:

如果 maxConcurrentConsumers 尚未達到,並且現有消費者連續十個週期都處於活動狀態,並且距離上次啟動消費者至少已過去 10 秒,則會啟動一個新的消費者。如果消費者在 batchSize * receiveTimeout 毫秒內至少收到一條訊息,則認為該消費者處於活動狀態。

在預設設定下,減少消費者的演算法如下:

如果執行中的消費者數量多於 concurrentConsumers,並且某個消費者檢測到連續十次超時(空閒),並且距離上次停止消費者至少已過去 60 秒,則會停止一個消費者。超時時間取決於 receiveTimeoutbatchSize 屬性。如果消費者在 batchSize * receiveTimeout 毫秒內沒有收到任何訊息,則認為該消費者處於空閒狀態。因此,在預設超時(一秒)和 batchSize 為四的情況下,在空閒 40 秒後(四次超時對應一次空閒檢測)才會考慮停止消費者。

實際上,只有當整個容器空閒一段時間後,消費者才能被停止。這是因為代理會將工作分配給所有活動的消費者。

每個消費者使用一個通道,無論配置的佇列數量是多少。

從 2.0 版本開始,concurrentConsumersmaxConcurrentConsumers 屬性可以透過 concurrency 屬性進行設定——例如,2-4

使用 DirectMessageListenerContainer

使用此容器時,併發性基於配置的佇列和 consumersPerQueue。每個佇列的每個消費者都使用一個單獨的通道,併發性由 Rabbit 客戶端庫控制。預設情況下,在撰寫本文時,它使用一個執行緒池,其大小為 DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2

您可以配置一個 taskExecutor 來提供所需的併發最大值。

© . This site is unofficial and not affiliated with VMware.