訊息監聽器容器配置

配置 SimpleMessageListenerContainer (SMLC) 和 DirectMessageListenerContainer (DMLC) 有相當多的選項,涉及到事務和質量保證,其中一些選項之間會相互作用。適用於 SMLC、DMLC 或 StreamListenerContainer (StLC) 的屬性(參見 使用 RabbitMQ Stream 外掛)在相應的列中用勾號表示。有關選擇哪種容器適用於您的應用程式的資訊,請參見 選擇容器

下表顯示了訊息監聽器容器的屬性名稱及其在使用名稱空間配置 <rabbit:listener-container/> 時的等效屬性名稱(括號中)。該元素的 type 屬性可以是 simple(預設)或 direct,分別指定 SMLCDMLC。名稱空間未公開某些屬性。這些屬性的屬性名顯示為 N/A

表 1. 訊息監聽器容器的配置選項
屬性 (Attribute) 描述 SMLC DMLC StLC

ackTimeout
(N/A)

當設定 messagesPerAck 時,此超時用於替代傳送確認。當有新訊息到達時,將未確認訊息的數量與 messagesPerAck 進行比較,並將自上次確認以來的時間與此值進行比較。如果任一條件為 true,則確認訊息。當沒有新訊息到達且存在未確認訊息時,此超時是近似的,因為條件僅在每個 monitorInterval 期間檢查一次。另請參閱此表中的 messagesPerAckmonitorInterval

tickmark

acknowledgeMode
(acknowledge)

  • NONE: 不傳送確認(與 channelTransacted=true 不相容)。RabbitMQ 將其稱為“autoack”,因為代理假設所有訊息在消費者未執行任何操作的情況下都被確認。

  • MANUAL: 監聽器必須透過呼叫 Channel.basicAck() 手動確認所有訊息。

  • AUTO: 容器自動確認訊息,除非 MessageListener 丟擲異常。請注意,acknowledgeModechannelTransacted 的補充——如果通道是事務性的,代理除了確認之外還需要提交通知。這是預設模式。另請參閱 batchSize

tickmark
tickmark

adviceChain
(advice-chain)

應用於監聽器執行的 AOP Advice 陣列。可用於應用額外的橫切關注點,例如在代理死亡時自動重試。請注意,只要代理仍然存活,CachingConnectionFactory 即可處理 AMQP 錯誤後的簡單重連。

tickmark
tickmark

afterReceivePostProcessors
(N/A)

在呼叫監聽器之前呼叫的 MessagePostProcessor 例項陣列。後處理器可以實現 PriorityOrderedOrdered。陣列按順序排序,未排序的成員最後呼叫。如果後處理器返回 null,則訊息將被丟棄(並在適當情況下被確認)。

tickmark
tickmark

alwaysRequeueWithTxManagerRollback
(N/A)

當配置了事務管理器時,設定為 true 表示在回滾時總是重新入隊訊息。

tickmark
tickmark

autoDeclare
(auto-declare)

當設定為 true(預設)時,如果容器在啟動期間檢測到其至少一個佇列丟失,則使用 RabbitAdmin 重新宣告所有 AMQP 物件(佇列、交換器、繫結)。這可能是因為佇列是 auto-delete 佇列或已過期佇列,但如果佇列因任何原因丟失,重新宣告仍會進行。要停用此行為,請將此屬性設定為 false。請注意,如果容器的所有佇列都丟失,則容器將無法啟動。

在版本 1.6 之前,如果上下文中存在多個 admin,容器將隨機選擇一個。如果沒有 admin,它將在內部建立一個。無論哪種情況,這都可能導致意外結果。從版本 1.6 開始,為了使 autoDeclare 工作,上下文中必須只有一個 RabbitAdmin,或者必須使用 rabbitAdmin 屬性在容器上配置特定例項的引用。
tickmark
tickmark

autoStartup
(auto-startup)

指示容器是否應隨 ApplicationContext 一起啟動的標誌(作為 SmartLifecycle 回撥的一部分,這發生在所有 bean 初始化之後)。預設為 true,但如果您的代理在啟動時可能不可用,您可以將其設定為 false,然後在您知道代理準備好時手動呼叫 start()

tickmark
tickmark
tickmark

batchSize
(transaction-size) (batch-size)

當與設定為 AUTOacknowledgeMode 一起使用時,容器會嘗試處理達到此數量的訊息,然後傳送確認(等待每條訊息的時間不超過接收超時設定)。這也是事務性通道提交的時候。如果 prefetchCount 小於 batchSize,它將增加到匹配 batchSize

tickmark

batchingStrategy
(N/A)

用於解批次訊息的策略。預設是 SimpleDebatchingStrategy。參見 批次傳送帶有批次處理的 @RabbitListener

tickmark
tickmark

channelTransacted
(channel-transacted)

布林標誌,指示所有訊息都應在事務中確認(手動或自動)。

tickmark
tickmark

concurrency
(N/A)

m-n 每個監聽器併發消費者的範圍(最小值、最大值)。如果只提供 n,則 n 是固定數量的消費者。參見 監聽器併發

tickmark

concurrentConsumers
(concurrency)

每個監聽器最初啟動的併發消費者數量。參見 監聽器併發。對於 StLC,併發性透過過載的 superStream 方法控制;參見 使用單個活躍消費者消費超級流

tickmark
tickmark

connectionFactory
(connection-factory)

ConnectionFactory 的引用。使用 XML 名稱空間配置時,預設引用的 bean 名稱是 rabbitConnectionFactory

tickmark
tickmark

consecutiveActiveTrigger
(min-consecutive-active)

消費者在考慮啟動新消費者時,連續接收到訊息且未發生接收超時的最小數量。也受 'batchSize' 影響。參見 監聽器併發。預設值:10。

tickmark

consecutiveIdleTrigger
(min-consecutive-idle)

消費者在考慮停止消費者之前必須經歷的最小接收超時次數。也受 'batchSize' 影響。參見 監聽器併發。預設值:10。

tickmark

consumerBatchEnabled
(batch-enabled)

如果 MessageListener 支援,將其設定為 true 可啟用對離散訊息的批處理,最多可達 batchSize;如果在 receiveTimeout 或收集批次訊息的時間超過 batchReceiveTimeout 之前沒有新訊息到達,則會發送部分批次。當此項為 false 時,僅支援由生產者建立的批次;參見 批次傳送

tickmark

consumerCustomizer
(N/A)

一個 ConsumerCustomizer bean,用於修改容器建立的流消費者。

tickmark

consumerStartTimeout
(N/A)

等待消費者執行緒啟動的時間(毫秒)。如果此時間過去,將寫入錯誤日誌。一個可能發生的情況是,配置的 taskExecutor 沒有足夠的執行緒來支援容器的 concurrentConsumers

參見 執行緒與非同步消費者。預設值:60000(一分鐘)。

tickmark

consumerTagStrategy
(consumer-tag-strategy)

設定 ConsumerTagStrategy 的實現,從而能夠為每個消費者建立(唯一)標籤。

tickmark
tickmark

consumersPerQueue
(consumers-per-queue)

為每個配置的佇列建立的消費者數量。參見 監聽器併發

tickmark

consumeDelay
(N/A)

當使用 RabbitMQ 分片外掛 並設定 concurrentConsumers > 1 時,可能存在一個競態條件,導致消費者無法均勻分佈到分片上。使用此屬性可在消費者啟動之間新增一個小的延遲,以避免此競態條件。您應透過實驗確定適合您環境的延遲值。

tickmark
tickmark

debatchingEnabled
(N/A)

當為 true 時,監聽器容器將解批次處理後的訊息,並呼叫監聽器處理批次中的每條訊息。從版本 2.2.7 開始,如果監聽器是 BatchMessageListenerChannelAwareBatchMessageListener生產者建立的批次 將作為 List<Message> 解批次。否則,批次中的訊息將逐個呈現。預設值為 true。參見 批次傳送帶有批次處理的 @RabbitListener

tickmark
tickmark

declarationRetries
(declaration-retries)

被動佇列宣告失敗時的重試次數。被動佇列聲明發生在消費者啟動時,或在消費多個佇列時,當並非所有佇列在初始化期間都可用時。如果在重試耗盡後,所有配置的佇列都無法被被動宣告(無論出於何種原因),容器的行為由 missingQueuesFatal 屬性控制(前面已描述)。預設值:三次重試(總共四次嘗試)。

tickmark

defaultRequeueRejected
(requeue-rejected)

確定因監聽器丟擲異常而被拒絕的訊息是否應重新入隊。預設值:true

tickmark
tickmark

errorHandler
(error-handler)

ErrorHandler 策略的引用,用於處理 MessageListener 執行期間可能發生的任何未捕獲異常。預設值:ConditionalRejectingErrorHandler

tickmark
tickmark

exclusive
(exclusive)

確定此容器中的單個消費者是否對佇列具有獨佔訪問權。當此項為 true 時,容器的併發數必須為 1。如果另一個消費者具有獨佔訪問權,容器會根據 recovery-intervalrecovery-back-off 嘗試恢復消費者。使用名稱空間時,此屬性出現在 <rabbit:listener/> 元素及其佇列名稱旁邊。預設值:false

tickmark
tickmark

exclusiveConsumerExceptionLogger
(N/A)

當獨佔消費者無法獲得佇列訪問權時使用的異常記錄器。預設情況下,此日誌級別為 WARN

tickmark
tickmark

failedDeclarationRetryInterval
(failed-declaration -retry-interval)

被動佇列宣告重試嘗試之間的間隔。被動佇列聲明發生在消費者啟動時,或在消費多個佇列時,當並非所有佇列在初始化期間都可用時。預設值:5000(五秒)。

tickmark
tickmark

forceCloseChannel
(N/A)

如果消費者在 shutdownTimeout 內沒有響應關閉請求,並且此屬性為 true,則通道將被關閉,導致任何未確認的訊息被重新入隊。自 2.0 版本以來,預設值為 true。您可以將其設定為 false 以恢復到之前的行為。

tickmark
tickmark

forceStop
(N/A)

設定為 true 以在當前記錄處理完成後停止(當容器停止時);這將導致所有預取的訊息被重新入隊。預設情況下,容器會取消消費者並在停止之前處理所有預取的訊息。自版本 2.4.14、3.0.6 以來,預設值為 false

tickmark
tickmark

globalQos
(global-qos)

當為 true 時,prefetchCount 將全域性應用於通道,而不是應用於通道上的每個消費者。更多資訊請參見 basicQos.global

tickmark
tickmark

(group)

此屬性僅在使用名稱空間時可用。指定後,將註冊一個名為該值的 Collection<MessageListenerContainer> 型別的 bean,並且每個 <listener/> 元素的容器將被新增到集合中。例如,這允許透過遍歷集合來啟動和停止容器組。如果多個 <listener-container/> 元素具有相同的 group 值,則集合中的容器將構成所有指定容器的聚合。

tickmark
tickmark

idleEventInterval
(idle-event-interval)

參見 檢測空閒非同步消費者

tickmark
tickmark

javaLangErrorHandler
(N/A)

一個 AbstractMessageListenerContainer.JavaLangErrorHandler 實現,當容器執行緒捕獲到 Error 時被呼叫。預設實現呼叫 System.exit(99);要恢復到之前的行為(不做任何事情),請新增一個無操作處理程式。

tickmark
tickmark

maxConcurrentConsumers
(max-concurrency)

按需啟動的最大併發消費者數量。必須大於或等於 'concurrentConsumers'。參見 監聽器併發

tickmark

messagesPerAck
(N/A)

每次確認之間接收的訊息數量。使用此項可減少傳送到代理的確認數量(代價是增加訊息被重新投遞的可能性)。通常,僅應在高流量的監聽器容器上設定此屬性。如果設定了此項並且訊息被拒絕(丟擲異常),則待處理的確認將被確認,並且失敗的訊息將被拒絕。不允許與事務性通道一起使用。如果 prefetchCount 小於 messagesPerAck,它將被增加到匹配 messagesPerAck。預設值:每條訊息都確認。另請參閱此表中的 ackTimeout

tickmark

mismatchedQueuesFatal
(mismatched-queues-fatal)

當容器啟動時,如果此屬性為 true(預設值:false),容器會檢查上下文中宣告的所有佇列是否與代理上已存在的佇列相容。如果存在不匹配的屬性(例如 auto-delete)或引數(例如 x-message-ttl),則容器(和應用程式上下文)將因致命異常而無法啟動。

如果在恢復期間檢測到問題(例如,連線丟失後),容器將停止。

應用程式上下文中必須有一個唯一的 RabbitAdmin(或一個使用 rabbitAdmin 屬性在容器上專門配置的例項)。否則,此屬性必須為 false

如果在初始啟動期間代理不可用,容器會啟動並在建立連線時檢查條件。
檢查針對上下文中所有佇列進行,而不僅僅是特定監聽器配置使用的佇列。如果您希望將檢查僅限於容器使用的佇列,則應為該容器配置一個單獨的 RabbitAdmin,並使用 rabbitAdmin 屬性提供其引用。更多資訊請參見 條件宣告
在啟動帶有 @Lazy 標記的 bean 中包含 @RabbitListener 的容器時,會停用不匹配佇列引數的檢測。這是為了避免可能發生的死鎖,這會延遲此類容器的啟動長達 60 秒。使用延遲監聽器 bean 的應用程式應在獲取延遲 bean 的引用之前檢查佇列引數。
tickmark
tickmark

missingQueuesFatal
(missing-queues-fatal)

當設定為 true(預設值)時,如果代理上配置的佇列均不可用,則視為致命錯誤。這將導致應用程式上下文在啟動期間初始化失敗。此外,當佇列在容器執行時被刪除時,預設情況下,消費者會嘗試連線佇列三次(間隔五秒),如果這些嘗試失敗,則停止容器。

這在之前的版本中無法配置。

當設定為 false 時,在嘗試三次後,容器將進入恢復模式,就像其他問題(如代理宕機)一樣。容器會根據 recoveryInterval 屬性嘗試恢復。在每次恢復嘗試期間,每個消費者將再次嘗試被動宣告佇列四次,間隔五秒。這個過程會無限期持續。

您還可以使用屬性 bean 為所有容器全域性設定該屬性,如下所示:

<util:properties
        id="spring.amqp.global.properties">
    <prop key="mlc.missing.queues.fatal">
        false
    </prop>
</util:properties>

此全域性屬性不會應用於任何已設定顯式 missingQueuesFatal 屬性的容器。

預設的重試屬性(三次重試,間隔五秒)可以透過設定以下屬性進行覆蓋。

在啟動帶有 @Lazy 標記的 bean 中包含 @RabbitListener 的容器時,會停用丟失佇列的檢測。這是為了避免可能發生的死鎖,這會延遲此類容器的啟動長達 60 秒。使用延遲監聽器 bean 的應用程式應在獲取延遲 bean 的引用之前檢查佇列。
tickmark
tickmark

monitorInterval
(monitor-interval)

對於 DMLC,會在此間隔內排程一個任務來監控消費者的狀態並恢復任何失敗的消費者。

tickmark

noLocal
(N/A)

設定為 true 可停用伺服器向在同一通道連線上釋出的消費者傳送訊息。

tickmark
tickmark

phase
(phase)

autoStartuptrue 時,此容器應在其中啟動和停止的生命週期階段。值越低,容器啟動得越早,停止得越晚。預設值為 Integer.MAX_VALUE,表示容器儘可能晚地啟動,儘可能早地停止。

tickmark
tickmark

possibleAuthenticationFailureFatal
(possible-authentication-failure-fatal)

當設定為 true(SMLC 的預設值)時,如果在連線期間丟擲 PossibleAuthenticationFailureException,則視為致命錯誤。這將導致應用程式上下文在啟動期間初始化失敗(如果容器配置為自動啟動)。

自 *版本 2.0* 開始。

DirectMessageListenerContainer

當設定為 false(預設值)時,每個消費者將根據 monitorInterval 嘗試重新連線。

SimpleMessageListenerContainer

當設定為 false 時,在嘗試 3 次後,容器將進入恢復模式,就像其他問題(如代理宕機)一樣。容器將根據 recoveryInterval 屬性嘗試恢復。在每次恢復嘗試期間,每個消費者將再次嘗試啟動 4 次。這個過程將無限期持續。

您還可以使用屬性 bean 為所有容器全域性設定該屬性,如下所示:

<util:properties
    id="spring.amqp.global.properties">
  <prop
    key="mlc.possible.authentication.failure.fatal">
     false
  </prop>
</util:properties>

此全域性屬性不會應用於任何已設定顯式 missingQueuesFatal 屬性的容器。

預設的重試屬性(3 次重試,間隔 5 秒)可以使用此後的屬性進行覆蓋。

tickmark
tickmark

prefetchCount
(prefetch)

每個消費者可未確認的最大訊息數量。此值越高,訊息投遞速度越快,但非順序處理的風險也越高。如果 acknowledgeModeNONE 則忽略。如果需要,它會被增加到匹配 batchSizemessagePerAck。自 2.0 版本以來預設為 250。您可以將其設定為 1 以恢復到之前的行為。

在某些情況下,預取值應較低——例如,處理大型訊息時,特別是如果處理速度較慢(訊息可能在客戶端程序中佔用大量記憶體),以及如果需要嚴格的訊息順序(在這種情況下應將預取值設定回 1)。此外,在低流量訊息傳遞和多個消費者(包括單個監聽器容器例項內的併發)的情況下,您可能希望減少預取值以實現訊息在消費者之間更均勻的分佈。

另請參閱 globalQos

tickmark
tickmark

rabbitAdmin
(admin)

當監聽器容器監聽至少一個自動刪除佇列,並且在啟動期間發現佇列丟失時,容器使用 RabbitAdmin 來宣告佇列以及任何相關的繫結和交換器。如果此類元素配置為使用條件宣告(參見 條件宣告),則容器必須使用配置了宣告這些元素的 admin。在此處指定該 admin。僅在使用自動刪除佇列和條件宣告時才需要。如果您不希望自動刪除佇列在容器啟動之前被宣告,請將 admin 上的 auto-startup 設定為 false。預設值為一個 RabbitAdmin,它宣告所有非條件元素。

tickmark
tickmark

receiveTimeout
(receive-timeout)

等待每條訊息的最大時間。如果 acknowledgeMode=NONE,這幾乎沒有影響——容器會迴圈請求下一條訊息。對於事務性 ChannelbatchSize > 1 的情況,影響最大,因為這可能導致已消費的訊息直到超時過期才被確認。當 consumerBatchEnabled 為 true 時,如果在批次完成之前發生此超時,將傳送部分批次。

tickmark

batchReceiveTimeout
(batch-receive-timeout)

收集批次訊息的超時時間(毫秒)。它限制了等待填充 batchSize 的時間。當 batchSize > 1 且收集批次訊息的時間大於 batchReceiveTime 時,將傳送批次。預設值為 0(無超時)。

tickmark

recoveryBackOff
(recovery-back-off)

指定當消費者因非致命原因啟動失敗時,嘗試啟動消費者之間的間隔使用的 BackOff 策略。預設值為 FixedBackOff,每五秒進行無限次重試。與 recoveryInterval 互斥。

tickmark
tickmark

recoveryInterval
(recovery-interval)

確定消費者因非致命原因啟動失敗時,嘗試啟動的間隔時間(毫秒)。預設值:5000。與 recoveryBackOff 互斥。

tickmark
tickmark

retryDeclarationInterval
(缺少佇列重試間隔)

如果在消費者初始化期間配置的佇列子集可用,則消費者將開始從這些佇列消費。消費者會嘗試使用此間隔被動地宣告缺少的佇列。當此間隔過去後,將再次使用 'declarationRetries' 和 'failedDeclarationRetryInterval'。如果仍有缺少的佇列,消費者將再次等待此間隔,然後再次嘗試。此過程會無限期地持續下去,直到所有佇列都可用。預設值:60000(一分鐘)。

tickmark

shutdownTimeout
(N/A)

當容器關閉時(例如,如果其封閉的 ApplicationContext 被關閉),它會等待正在處理的訊息完成,直到達到此限制。預設值為五秒。

tickmark
tickmark

startConsumerMinInterval
(最小啟動間隔)

按需啟動每個新消費者之前必須經過的時間(毫秒)。請參閱 監聽器併發。預設值:10000(10 秒)。

tickmark

statefulRetryFatal
WithNullMessageId (不適用)

當使用有狀態重試建議時,如果接收到缺少 messageId 屬性的訊息,預設情況下這被認為是致命的,會導致消費者停止。將其設定為 false 以丟棄(或路由到死信佇列)此類訊息。

tickmark
tickmark

stopConsumerMinInterval
(最小停止間隔)

檢測到空閒消費者時,自上次停止消費者以來,再次停止消費者之前必須經過的時間(毫秒)。請參閱 監聽器併發。預設值:60000(一分鐘)。

tickmark

streamConverter
(N/A)

用於將原生 Stream 訊息轉換為 Spring AMQP 訊息的 StreamMessageConverter

tickmark

taskExecutor
(任務執行器)

用於執行監聽器呼叫器的 Spring TaskExecutor(或標準 JDK 1.5+ Executor)引用。預設是一個 SimpleAsyncTaskExecutor,使用內部管理的執行緒。

tickmark
tickmark

taskScheduler
(任務排程器)

使用 DMLC 時,用於在 'monitorInterval' 執行監控任務的排程器。

tickmark

transactionManager
(事務管理器)

用於監聽器操作的外部事務管理器。也與 channelTransacted 互補 — 如果 Channel 帶有事務,其事務將與外部事務同步。

tickmark
tickmark