訊息監聽器容器配置

有相當多的選項可以配置 SimpleMessageListenerContainer (SMLC) 和 DirectMessageListenerContainer (DMLC),這些選項與事務和服務質量有關,其中一些選項相互作用。適用於 SMLC、DMLC 或 StreamListenerContainer (StLC) 的屬性(請參閱 使用 RabbitMQ 流外掛)透過在相應列中的複選標記表示。有關幫助您決定哪個容器適合您的應用程式的資訊,請參閱 選擇容器

下表顯示了容器屬性名稱及其對應的屬性名稱(括號中),當使用名稱空間配置 <rabbit:listener-container/> 時。該元素的 type 屬性可以是 simple(預設)或 direct,分別指定 SMLCDMLC。一些屬性未透過名稱空間公開。這些屬性用 N/A 表示。

表 1. 訊息監聽器容器的配置選項
屬性(Attribute) 描述 SMLC DMLC StLC

ackTimeout
(N/A)

當設定了 messagesPerAck 時,此超時用作傳送確認的替代方案。當有新訊息到達時,未確認訊息的數量會與 messagesPerAck 進行比較,自上次確認以來的時間會與此值進行比較。如果任一條件為 true,則訊息將被確認。當沒有新訊息到達且有未確認訊息時,此超時是近似的,因為條件僅在每個 monitorInterval 檢查一次。另請參閱此表中的 messagesPerAckmonitorInterval

tickmark

acknowledgeMode
(acknowledge)

  • NONE:不傳送確認(與 channelTransacted=true 不相容)。RabbitMQ 稱之為“自動確認”,因為代理在消費者沒有任何操作的情況下假定所有訊息都已確認。

  • MANUAL:監聽器必須透過呼叫 Channel.basicAck() 來確認所有訊息。

  • AUTO:容器自動確認訊息,除非 MessageListener 丟擲異常。請注意,acknowledgeModechannelTransacted 是互補的——如果通道是事務性的,代理除了確認之外還需要提交通知。這是預設模式。另請參閱 batchSize

tickmark
tickmark

adviceChain
(advice-chain)

一個 AOP 建議陣列,用於應用於監聽器執行。這可以用於應用額外的橫切關注點,例如在代理死亡時自動重試。請注意,在 AMQP 錯誤後簡單的重新連線由 CachingConnectionFactory 處理,只要代理仍然存活。

tickmark
tickmark

afterReceivePostProcessors
(N/A)

一個 MessagePostProcessor 例項陣列,在呼叫監聽器之前呼叫。後處理器可以實現 PriorityOrderedOrdered。陣列按順序排序,未排序的成員最後呼叫。如果後處理器返回 null,則訊息將被丟棄(並酌情確認)。

tickmark
tickmark

alwaysRequeueWithTxManagerRollback
(N/A)

當配置了事務管理器時,設定為 true 以在回滾時始終重新排隊訊息。

tickmark
tickmark

autoDeclare
(auto-declare)

當設定為 true(預設)時,如果容器在啟動期間檢測到至少有一個佇列丟失(可能是因為它是一個 auto-delete 佇列或一個已過期的佇列),它會使用 RabbitAdmin 重新宣告所有 AMQP 物件(佇列、交換器、繫結),但如果佇列因任何原因丟失,則重新宣告會繼續。要停用此行為,請將此屬性設定為 false。請注意,如果所有佇列都丟失,容器將無法啟動。

在 1.6 版本之前,如果上下文中存在多個管理員,容器會隨機選擇一個。如果沒有管理員,它會在內部建立一個。在這兩種情況下,都可能導致意外結果。從 1.6 版本開始,為了使 autoDeclare 工作,上下文中必須只有一個 RabbitAdmin,或者必須使用 rabbitAdmin 屬性在容器上配置對特定例項的引用。
tickmark
tickmark

autoStartup
(auto-startup)

標誌,指示容器是否應在 ApplicationContext 啟動時(作為 SmartLifecycle 回撥的一部分,發生在所有 bean 初始化之後)啟動。預設為 true,但如果您的代理在啟動時可能不可用,您可以將其設定為 false,並在您知道代理準備就緒後手動呼叫 start()

tickmark
tickmark
tickmark

batchSize
(transaction-size) (batch-size)

當與 acknowledgeMode 設定為 AUTO 結合使用時,容器會嘗試處理最多此數量的訊息,然後傳送確認(等待每條訊息直到接收超時設定)。這也是事務通道提交的時候。如果 prefetchCount 小於 batchSize,它將增加以匹配 batchSize

tickmark

batchingStrategy
(N/A)

訊息解除批處理時使用的策略。預設 SimpleDebatchingStrategy。參見 批處理帶有批處理的 @RabbitListener

tickmark
tickmark

channelTransacted
(channel-transacted)

布林標誌,指示所有訊息都應在事務中確認(手動或自動)。

tickmark
tickmark

concurrency
(N/A)

m-n 每個監聽器的併發消費者範圍(最小,最大)。如果只提供了 n,則 n 是固定數量的消費者。參見 監聽器併發

tickmark

concurrentConsumers
(concurrency)

為每個監聽器最初啟動的併發消費者數量。參見 監聽器併發。對於 StLC,併發性透過過載的 superStream 方法控制;參見 使用單活動消費者消費超級流

tickmark
tickmark

connectionFactory
(connection-factory)

ConnectionFactory 的引用。當使用 XML 名稱空間配置時,預設引用的 bean 名稱是 rabbitConnectionFactory

tickmark
tickmark

consecutiveActiveTrigger
(min-consecutive-active)

消費者連續接收訊息的最小數量,且未發生接收超時,此時考慮啟動新消費者。也受“batchSize”影響。參見 監聽器併發。預設值:10。

tickmark

consecutiveIdleTrigger
(min-consecutive-idle)

消費者在考慮停止消費者之前必須經歷的最小接收超時次數。也受“batchSize”影響。參見 監聽器併發。預設值:10。

tickmark

consumerBatchEnabled
(batch-enabled)

如果 MessageListener 支援,將其設定為 true 會啟用離散訊息的批處理,最多達到 batchSize;如果在 receiveTimeout 內沒有新訊息到達或收集批處理訊息的時間超過 batchReceiveTimeout,則會傳遞部分批處理。當此為 false 時,批處理僅支援由生產者建立的批處理;請參閱 批處理

tickmark

consumerCustomizer
(N/A)

一個 ConsumerCustomizer bean,用於修改容器建立的流消費者。

tickmark

consumerStartTimeout
(N/A)

等待消費者執行緒啟動的時間(毫秒)。如果此時間過去,則會寫入錯誤日誌。例如,當配置的 taskExecutor 沒有足夠的執行緒來支援容器的 concurrentConsumers 時,可能會發生這種情況。

請參閱 執行緒和非同步消費者。預設值:60000(一分鐘)。

tickmark

consumerTagStrategy
(consumer-tag-strategy)

設定 ConsumerTagStrategy 的實現,啟用為每個消費者建立唯一的標籤。

tickmark
tickmark

consumersPerQueue
(consumers-per-queue)

為每個配置的佇列建立的消費者數量。參見 監聽器併發

tickmark

consumeDelay
(N/A)

當使用 RabbitMQ 分片外掛concurrentConsumers > 1 時,存在一種競態條件,可能會阻止消費者在分片之間均勻分佈。使用此屬性在消費者啟動之間新增一個小的延遲以避免此競態條件。您應該嘗試不同的值以確定適合您的環境的延遲。

tickmark
tickmark

debatchingEnabled
(N/A)

當為 true 時,監聽器容器將解除批處理訊息,並用批處理中的每條訊息呼叫監聽器。從 2.2.7 版本開始,如果監聽器是 BatchMessageListenerChannelAwareBatchMessageListener,則 生產者建立的批處理 將被解除批處理為 List<Message>。否則,批處理中的訊息將逐個呈現。預設值為 true。請參閱 批處理帶有批處理的 @RabbitListener

tickmark
tickmark

declarationRetries
(declaration-retries)

被動佇列宣告失敗時的重試次數。被動佇列聲明發生在消費者啟動時,或者當從多個佇列消費時,如果並非所有佇列在初始化期間都可用。當重試次數耗盡後,所有配置的佇列都無法被被動宣告(出於任何原因),容器行為由前面描述的 missingQueuesFatal 屬性控制。預設值:三次重試(總共四次嘗試)。

tickmark

defaultRequeueRejected
(requeue-rejected)

確定因監聽器丟擲異常而被拒絕的訊息是否應重新排隊。預設值:true

tickmark
tickmark

errorHandler
(error-handler)

ErrorHandler 策略的引用,用於處理在 MessageListener 執行期間可能發生的任何未捕獲異常。預設值:ConditionalRejectingErrorHandler

tickmark
tickmark

exclusive
(exclusive)

確定此容器中的單個消費者是否對佇列具有獨佔訪問許可權。當此為 true 時,容器的併發性必須為 1。如果另一個消費者具有獨佔訪問許可權,容器會根據 recovery-intervalrecovery-back-off 嘗試恢復消費者。當使用名稱空間時,此屬性與佇列名稱一起出現在 <rabbit:listener/> 元素上。預設值:false

tickmark
tickmark

exclusiveConsumerExceptionLogger
(N/A)

一個異常記錄器,用於當獨佔消費者無法訪問佇列時。預設情況下,此資訊以 WARN 級別記錄。

tickmark
tickmark

failedDeclarationRetryInterval
(failed-declaration -retry-interval)

被動佇列宣告重試嘗試之間的間隔。被動佇列聲明發生在消費者啟動時,或者當從多個佇列消費時,如果並非所有佇列在初始化期間都可用。預設值:5000(五秒)。

tickmark
tickmark

forceCloseChannel
(N/A)

如果消費者在 shutdownTimeout 內沒有響應關機,如果此值為 true,通道將被關閉,導致任何未確認的訊息重新排隊。自 2.0 版本起預設為 true。您可以將其設定為 false 以恢復到以前的行為。

tickmark
tickmark

forceStop
(N/A)

設定為 true 表示在處理完當前記錄後停止(當容器停止時);導致所有預取的訊息重新排隊。預設情況下,容器將取消消費者並在停止前處理所有預取的訊息。從 2.4.14、3.0.6 版本開始,預設為 false

tickmark
tickmark

globalQos
(global-qos)

當為 true 時,prefetchCount 全域性應用於通道,而不是通道上的每個消費者。有關更多資訊,請參閱 basicQos.global

tickmark
tickmark

(group)

此僅在使用名稱空間時可用。指定時,將以該名稱註冊一個型別為 Collection<MessageListenerContainer> 的 bean,並且為每個 <listener/> 元素的容器將新增到該集合中。這允許,例如,透過迭代集合來啟動和停止容器組。如果多個 <listener-container/> 元素具有相同的 group 值,則集合中的容器構成所有指定容器的聚合。

tickmark
tickmark

idleEventInterval
(idle-event-interval)

請參閱 檢測空閒非同步消費者

tickmark
tickmark

javaLangErrorHandler
(N/A)

一個 AbstractMessageListenerContainer.JavaLangErrorHandler 實現,在容器執行緒捕獲到 Error 時呼叫。預設實現呼叫 System.exit(99);要恢復到以前的行為(不執行任何操作),請新增一個空操作處理器。

tickmark
tickmark

maxConcurrentConsumers
(max-concurrency)

根據需要啟動的最大併發消費者數量。必須大於或等於 'concurrentConsumers'。參見 監聽器併發

tickmark

messagesPerAck
(N/A)

兩次確認之間接收的訊息數量。使用此屬性可減少傳送給代理的確認數量(代價是增加訊息重新傳遞的可能性)。通常,您應該只在大量監聽器容器上設定此屬性。如果設定了此屬性並且訊息被拒絕(丟擲異常),則未決的確認將被確認,並且失敗的訊息將被拒絕。事務通道不允許使用此屬性。如果 prefetchCount 小於 messagesPerAck,它將增加以匹配 messagesPerAck。預設值:每條訊息都確認。另請參閱此表中的 ackTimeout

tickmark

mismatchedQueuesFatal
(mismatched-queues-fatal)

當容器啟動時,如果此屬性為 true(預設值:false),容器將檢查上下文中宣告的所有佇列是否與代理上已存在的佇列相容。如果存在不匹配的屬性(例如 auto-delete)或引數(例如 x-message-ttl),容器(和應用程式上下文)將因致命異常而無法啟動。

如果在恢復期間檢測到問題(例如,連線丟失後),容器將停止。

應用程式上下文中必須只有一個 RabbitAdmin(或者透過 rabbitAdmin 屬性專門在容器上配置一個)。否則,此屬性必須為 false

如果在初始啟動期間代理不可用,容器將啟動並在建立連線時檢查條件。
檢查是針對上下文中所有佇列進行的,而不僅僅是特定監聽器配置使用的佇列。如果您希望將檢查限制在容器使用的那些佇列,您應該為容器配置一個單獨的 RabbitAdmin,並使用 rabbitAdmin 屬性提供對其的引用。有關更多資訊,請參閱 條件宣告
當啟動標記為 @Lazy 的 bean 中的 @RabbitListener 的容器時,將停用不匹配佇列引數的檢測。這是為了避免潛在的死鎖,這可能會將此類容器的啟動延遲長達 60 秒。使用懶惰監聽器 bean 的應用程式應在獲取對懶惰 bean 的引用之前檢查佇列引數。
tickmark
tickmark

missingQueuesFatal
(missing-queues-fatal)

當設定為 true(預設值)時,如果代理上沒有任何配置的佇列可用,則被認為是致命的。這將導致應用程式上下文在啟動期間初始化失敗。此外,當佇列在容器執行時被刪除時,預設情況下,消費者會嘗試三次連線到佇列(以五秒間隔),如果這些嘗試失敗,則停止容器。

在以前的版本中,此功能不可配置。

當設定為 false 時,在進行三次重試後,容器將進入恢復模式,就像其他問題一樣,例如代理宕機。容器將根據 recoveryInterval 屬性嘗試恢復。在每次恢復嘗試期間,每個消費者將再次嘗試四次以被動宣告佇列,間隔為五秒。此過程將無限期地繼續。

您還可以使用屬性 bean 全域性設定所有容器的屬性,如下所示

<util:properties
        id="spring.amqp.global.properties">
    <prop key="mlc.missing.queues.fatal">
        false
    </prop>
</util:properties>

此全域性屬性不應用於任何已設定顯式 missingQueuesFatal 屬性的容器。

預設重試屬性(三次重試,間隔五秒)可以透過設定以下屬性來覆蓋。

當啟動標記為 @Lazy 的 bean 中的 @RabbitListener 的容器時,將停用缺少佇列的檢測。這是為了避免潛在的死鎖,這可能會將此類容器的啟動延遲長達 60 秒。使用懶惰監聽器 bean 的應用程式應在獲取對懶惰 bean 的引用之前檢查佇列。
tickmark
tickmark

monitorInterval
(monitor-interval)

對於 DMLC,在此間隔內排程一個任務以監視消費者的狀態並恢復任何已失敗的消費者。

tickmark

noLocal
(N/A)

設定為 true 可停用從伺服器向在同一通道連線上釋出訊息的消費者遞送訊息。

tickmark
tickmark

phase
(phase)

autoStartuptrue 時,此容器應在其內部啟動和停止的生命週期階段。值越低,此容器啟動越早,停止越晚。預設值為 Integer.MAX_VALUE,這意味著容器儘可能晚啟動,儘可能早停止。

tickmark
tickmark

possibleAuthenticationFailureFatal
(possible-authentication-failure-fatal)

當設定為 true (SMLC 的預設值)時,如果在連線期間丟擲 PossibleAuthenticationFailureException,則認為它是致命的。這將導致應用程式上下文在啟動期間初始化失敗 (如果容器配置了自動啟動)。

版本 2.0 起。

DirectMessageListenerContainer

當設定為 false(預設值)時,每個消費者將根據 monitorInterval 嘗試重新連線。

SimpleMessageListenerContainer

當設定為 false 時,在 3 次重試之後,容器將進入恢復模式,就像其他問題一樣,例如代理宕機。容器將嘗試根據 recoveryInterval 屬性進行恢復。在每次恢復嘗試期間,每個消費者將再次嘗試 4 次啟動。此過程將無限期地繼續。

您還可以使用屬性 bean 全域性設定所有容器的屬性,如下所示

<util:properties
    id="spring.amqp.global.properties">
  <prop
    key="mlc.possible.authentication.failure.fatal">
     false
  </prop>
</util:properties>

此全域性屬性不適用於任何已設定顯式 missingQueuesFatal 屬性的容器。

預設重試屬性(3 次重試,間隔 5 秒)可以使用此屬性之後的屬性進行覆蓋。

tickmark
tickmark

prefetchCount
(prefetch)

每個消費者可以未確認的未決訊息數量。此值越高,訊息遞送速度越快,但非順序處理的風險也越高。如果 acknowledgeModeNONE,則忽略此值。如有必要,此值會增加以匹配 batchSizemessagePerAck。自 2.0 版本起預設為 250。您可以將其設定為 1 以恢復到以前的行為。

在某些情況下,預取值應該較低——例如,對於大型訊息,尤其是處理速度較慢時(訊息可能在客戶端程序中累積大量記憶體),以及如果需要嚴格的訊息順序(在這種情況下,預取值應設定為 1)。此外,在訊息量低且有多個消費者(包括單個監聽器容器例項中的併發)的情況下,您可能希望減少預取以獲得更均勻的訊息分佈。

另請參閱 globalQos

tickmark
tickmark

rabbitAdmin
(admin)

當監聽器容器監聽至少一個自動刪除佇列,並且在啟動期間發現該佇列丟失時,容器會使用 RabbitAdmin 宣告該佇列以及任何相關的繫結和交換器。如果這些元素配置為使用條件宣告(請參閱 條件宣告),則容器必須使用配置為宣告這些元素的管理員。在此處指定該管理員。僅在使用帶有條件宣告的自動刪除佇列時才需要。如果您不希望自動刪除佇列在容器啟動之前宣告,請將管理員上的 auto-startup 設定為 false。預設為宣告所有非條件元素的 RabbitAdmin

tickmark
tickmark

receiveTimeout
(receive-timeout)

等待每條訊息的最大時間。如果 acknowledgeMode=NONE,這幾乎沒有效果——容器會迴圈並請求另一條訊息。對於 batchSize > 1 的事務性 Channel,其影響最大,因為它可能導致已消費的訊息在超時到期之前不被確認。當 consumerBatchEnabled 為 true 時,如果此超時在批處理完成之前發生,則會傳遞部分批處理。

tickmark

batchReceiveTimeout
(batch-receive-timeout)

收集批處理訊息的超時時間(毫秒)。它限制了等待填充批處理大小的時間。當 batchSize > 1 且收集批處理訊息的時間大於 batchReceiveTime 時,將遞送批處理。預設值為 0(無超時)。

tickmark

recoveryBackOff
(recovery-back-off)

指定在消費者因非致命原因啟動失敗時,嘗試啟動消費者之間的時間間隔的 BackOff。預設是 FixedBackOff,以五秒間隔無限重試。與 recoveryInterval 互斥。

tickmark
tickmark

recoveryInterval
(recovery-interval)

確定消費者因非致命原因啟動失敗時,嘗試啟動消費者之間的時間間隔(毫秒)。預設值:5000。與 recoveryBackOff 互斥。

tickmark
tickmark

retryDeclarationInterval
(missing-queue- retry-interval)

如果在消費者初始化期間一部分配置的佇列可用,則消費者將開始從這些佇列消費。消費者將使用此間隔嘗試被動宣告丟失的佇列。當此間隔過去後,將再次使用 'declarationRetries' 和 'failedDeclarationRetryInterval'。如果仍然存在丟失的佇列,消費者將再次等待此間隔,然後再次嘗試。此過程將無限期地持續,直到所有佇列都可用。預設值:60000(一分鐘)。

tickmark

shutdownTimeout
(N/A)

當容器關閉時(例如,其包含的 ApplicationContext 關閉時),它會在處理完正在處理的訊息之前等待,直到達到此限制。預設值為五秒。

tickmark
tickmark

startConsumerMinInterval
(min-start-interval)

每個新消費者按需啟動之前必須經過的時間(毫秒)。參見 監聽器併發。預設值:10000(10 秒)。

tickmark

statefulRetryFatal
WithNullMessageId (N/A)

當使用有狀態重試建議時,如果收到缺少 messageId 屬性的訊息,預設情況下這被認為是致命的(它將停止消費者)。將其設定為 false 以丟棄(或路由到死信佇列)此類訊息。

tickmark
tickmark

stopConsumerMinInterval
(min-stop-interval)

當檢測到空閒消費者時,自上次消費者停止以來,新的消費者停止前必須經過的時間(毫秒)。參見 監聽器併發。預設值:60000(一分鐘)。

tickmark

streamConverter
(N/A)

一個 StreamMessageConverter,用於將原生流訊息轉換為 Spring AMQP 訊息。

tickmark

taskExecutor
(task-executor)

對 Spring TaskExecutor(或標準 JDK 1.5+ Executor)的引用,用於執行監聽器呼叫器。預設是 SimpleAsyncTaskExecutor,使用內部管理的執行緒。

tickmark
tickmark

taskScheduler
(task-scheduler)

對於 DMLC,用於在“monitorInterval”執行監視任務的排程程式。

tickmark

transactionManager
(transaction-manager)

監聽器操作的外部事務管理器。也與 channelTransacted 互補——如果 Channel 是事務性的,其事務將與外部事務同步。

tickmark
tickmark
© . This site is unofficial and not affiliated with VMware.