監聽器併發

SimpleMessageListenerContainer

預設情況下,監聽器容器啟動一個消費者來接收佇列中的訊息。

在檢視前一節的表格時,您可以看到許多控制併發的屬性。最簡單的是 concurrentConsumers,它會建立(固定)數量的消費者來併發處理訊息。

在 1.3.0 版本之前,這是唯一可用的設定,必須先停止容器再重新啟動才能更改設定。

從 1.3.0 版本開始,您現在可以動態調整 concurrentConsumers 屬性。如果在容器執行時更改此屬性,將根據需要新增或移除消費者以適應新設定。

此外,新增了一個名為 maxConcurrentConsumers 的屬性,容器會根據工作負載動態調整併發。這與另外四個屬性配合使用:consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval。在預設設定下,增加消費者的演算法如下:

如果尚未達到 maxConcurrentConsumers,並且一個現有消費者連續十個週期處於活動狀態,並且自上次啟動消費者以來至少過去了 10 秒,則會啟動一個新的消費者。如果在 batchSize * receiveTimeout 毫秒內收到至少一條訊息,則認為消費者處於活動狀態。

在預設設定下,減少消費者的演算法如下:

如果執行的消費者數量多於 concurrentConsumers,並且一個消費者檢測到連續十次超時(空閒),並且自上次停止消費者以來至少過去了 60 秒,則會停止一個消費者。超時取決於 receiveTimeoutbatchSize 屬性。如果在 batchSize * receiveTimeout 毫秒內未收到任何訊息,則認為消費者處於空閒狀態。因此,在預設超時時間(一秒)和 batchSize 為四的情況下,40 秒空閒時間後(四次超時對應一次空閒檢測)會考慮停止一個消費者。

實際上,只有當整個容器空閒一段時間後,消費者才能被停止。這是因為 Broker 會將其工作分配給所有活動的消費者。

每個消費者使用一個獨立的通道,無論配置了多少個佇列。

從 2.0 版本開始,concurrentConsumersmaxConcurrentConsumers 屬性可以透過 concurrency 屬性設定,例如 2-4

使用 DirectMessageListenerContainer

對於此容器,併發是基於配置的佇列和 consumersPerQueue 的。每個佇列的每個消費者使用一個單獨的通道,並且併發由 Rabbit 客戶端庫控制。在撰寫本文時,預設情況下,它使用一個執行緒池,其大小為 DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2

您可以配置一個 taskExecutor 來提供所需的最大併發。