彈性:從錯誤和 Broker 故障中恢復
Spring AMQP 提供的一些關鍵(也是最受歡迎的)高階功能與在協議錯誤或 broker 故障時進行恢復和自動重新連線有關。我們已經在指南中看到了所有相關元件,但在這裡將它們整合起來並單獨列出功能和恢復場景會有所幫助。
主要的重連功能由 CachingConnectionFactory 本身啟用。使用 RabbitAdmin 自動宣告功能也通常很有益。此外,如果你關心保證交付,你可能還需要在 RabbitTemplate 和 SimpleMessageListenerContainer 中使用 channelTransacted 標誌,以及在 SimpleMessageListenerContainer 中使用 AcknowledgeMode.AUTO(如果你自己進行確認,則使用手動確認)。
Exchange、佇列和繫結的自動宣告
RabbitAdmin 元件可以在啟動時宣告 exchange、佇列和繫結。它透過 ConnectionListener 懶惰地完成此操作。因此,如果 broker 在啟動時不存在,這並不重要。第一次使用 Connection 時(例如,透過傳送訊息),監聽器會觸發,並應用管理功能。在監聽器中進行自動宣告的另一個好處是,如果連線因任何原因(例如,broker 宕機、網路故障等)斷開,它們會在連線重新建立時再次應用。
以這種方式宣告的佇列必須具有固定名稱——要麼是顯式宣告的,要麼是框架為 AnonymousQueue 例項生成的。匿名佇列是不可持久的、獨佔的且自動刪除的。 |
只有當 CachingConnectionFactory 快取模式為 CHANNEL(預設)時,才會執行自動宣告。存在此限制是因為獨佔和自動刪除佇列繫結到連線。 |
從版本 2.2.2 開始,RabbitAdmin 將檢測型別為 DeclarableCustomizer 的 bean,並在實際處理宣告之前應用該函式。例如,這對於在框架中提供一流支援之前設定新引數(屬性)非常有用。
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
這在不直接提供對 Declarable bean 定義訪問的專案中也很有用。
另請參閱 RabbitMQ 自動連線/拓撲恢復。
同步操作中的故障和重試選項
如果在同步序列中使用 RabbitTemplate 時(例如)失去與 broker 的連線,Spring AMQP 會丟擲 AmqpException(通常但不總是 AmqpIOException)。我們不會試圖隱藏問題,因此你必須能夠捕獲並響應異常。如果你懷疑連線已斷開(並且不是你的錯),最簡單的做法是再次嘗試操作。你可以手動執行此操作,或者可以考慮使用 Spring Retry 來處理重試(命令式或宣告式)。
Spring Retry 提供了一些 AOP 攔截器和極大的靈活性來指定重試的引數(嘗試次數、異常型別、退避演算法等)。Spring AMQP 還提供了一些便利的工廠 bean,用於以 AMQP 用例的便捷形式建立 Spring Retry 攔截器,並帶有強型別回撥介面,你可以使用它們來實現自定義恢復邏輯。有關詳細資訊,請參閱 StatefulRetryOperationsInterceptorFactoryBean 和 StatelessRetryOperationsInterceptorFactoryBean 的 Javadoc 和屬性。如果不存在事務或事務在重試回撥中啟動,則無狀態重試是合適的。請注意,無狀態重試比有狀態重試更容易配置和分析,但如果存在必須回滾或肯定會回滾的正在進行的事務,則通常不適用。事務中間的連線斷開應與回滾具有相同的效果。因此,對於事務在堆疊中更高層啟動的重新連線,有狀態重試通常是最佳選擇。有狀態重試需要一種機制來唯一標識訊息。最簡單的方法是讓傳送方在 MessageId 訊息屬性中放置一個唯一值。提供的訊息轉換器提供了一個選項來執行此操作:你可以將 createMessageIds 設定為 true。否則,你可以將 MessageKeyGenerator 實現注入到攔截器中。金鑰生成器必須為每條訊息返回一個唯一的金鑰。在 2.0 版本之前,提供了 MissingMessageIdAdvice。它允許沒有 messageId 屬性的訊息恰好重試一次(忽略重試設定)。此通知不再提供,因為,與 spring-retry 1.2 版本一起,其功能已內建到攔截器和訊息監聽器容器中。
為了向後相容,預設情況下(一次重試後),具有空訊息 ID 的訊息被消費者視為致命(消費者停止)。為了複製 MissingMessageIdAdvice 提供的功能,你可以將監聽器容器上的 statefulRetryFatalWithNullMessageId 屬性設定為 false。在此設定下,消費者繼續執行,並且訊息被拒絕(一次重試後)。它被丟棄或路由到死信佇列(如果已配置)。 |
從 1.3 版本開始,提供了一個 builder API,以幫助使用 Java(在 @Configuration 類中)組裝這些攔截器。以下示例展示瞭如何操作:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxRetries(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只能以這種方式配置重試功能的一個子集。更高階的功能需要配置 RetryPolicy。有關可用策略及其配置的更多資訊,請參閱 RetryPolicy Javadoc。
帶批處理監聽器的重試
不建議為批處理監聽器配置重試,除非該批處理是由生產者在單個記錄中建立的。有關消費者和生產者建立的批處理的資訊,請參閱 批處理訊息。對於消費者建立的批處理,框架無法知道批處理中的哪條訊息導致了故障,因此在重試耗盡後無法恢復。對於生產者建立的批處理,由於實際上只有一條訊息失敗,因此可以恢復整個訊息。應用程式可能希望通知自定義恢復器批處理中發生故障的位置,或許可以透過設定丟擲異常的索引屬性。
批處理監聽器的重試恢復器必須實現 MessageBatchRecoverer。
訊息監聽器和非同步情況
如果 MessageListener 因業務異常而失敗,異常將由訊息監聽器容器處理,然後容器返回繼續監聽其他訊息。如果故障是由連線斷開(而不是業務異常)引起的,則為監聽器收集訊息的消費者必須被取消並重新啟動。SimpleMessageListenerContainer 無縫地處理此問題,並留下日誌說明監聽器正在重新啟動。事實上,它會無限迴圈,嘗試重新啟動消費者。只有當消費者行為非常糟糕時,它才會放棄。一個副作用是,如果 broker 在容器啟動時宕機,它會一直嘗試,直到可以建立連線。
業務異常處理,與協議錯誤和連線斷開不同,可能需要更多的思考和一些自定義配置,特別是當使用事務或容器確認時。在 2.8.x 之前,RabbitMQ 沒有死信行為的定義。因此,預設情況下,由於業務異常而被拒絕或回滾的訊息可能會無限次地重新交付。為了限制客戶端重新交付的次數,一個選擇是在監聽器的建議鏈中使用 StatefulRetryOperationsInterceptor。攔截器可以有一個恢復回撥,該回調實現自定義死信操作——根據你的特定環境選擇適當的操作。
另一種選擇是將容器的 defaultRequeueRejected 屬性設定為 false。這將導致所有失敗的訊息被丟棄。當使用 RabbitMQ 2.8.x 或更高版本時,這也有助於將訊息傳遞到死信 exchange。
或者,你可以丟擲 AmqpRejectAndDontRequeueException。這樣做可以防止訊息重新排隊,無論 defaultRequeueRejected 屬性的設定如何。
從 2.1 版本開始,引入了 ImmediateRequeueAmqpException,以執行完全相反的邏輯:訊息將重新排隊,無論 defaultRequeueRejected 屬性的設定如何。
通常,兩種技術結合使用。你可以在建議鏈中使用 StatefulRetryOperationsInterceptor,並使用一個丟擲 AmqpRejectAndDontRequeueException 的 MessageRecoverer。當所有重試都耗盡時,會呼叫 MessageRecoverer。RejectAndDontRequeueRecoverer 正是這樣做的。預設的 MessageRecoverer 消耗錯誤訊息併發出 WARN 訊息。
從 1.3 版本開始,提供了一個新的 RepublishMessageRecoverer,允許在重試耗盡後重新發布失敗的訊息。
當恢復器處理最終異常時,訊息被 ack,並且如果配置了 broker,則不會將其傳送到死信 exchange。
當在消費者端使用 RepublishMessageRecoverer 時,接收到的訊息在 receivedDeliveryMode 訊息屬性中包含 deliveryMode。在這種情況下,deliveryMode 為 null。這意味著 broker 上的 NON_PERSISTENT 交付模式。從 2.0 版本開始,你可以為 RepublishMessageRecoverer 配置在重新發布時要設定到訊息中的 deliveryMode(如果它為 null)。預設情況下,它使用 MessageProperties 預設值 - MessageDeliveryMode.PERSISTENT。 |
以下示例展示瞭如何將 RepublishMessageRecoverer 設定為恢復器:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxRetries(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
RepublishMessageRecoverer 將訊息與訊息頭中的附加資訊一起釋出,例如異常訊息、堆疊跟蹤、原始 exchange 和路由鍵。可以透過建立子類並重寫 additionalHeaders() 來新增附加頭。deliveryMode(或任何其他屬性)也可以在 additionalHeaders() 中更改,如下例所示:
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
從版本 2.0.5 開始,如果堆疊跟蹤過大,可能會被截斷;這是因為所有頭都必須適合單個幀。預設情況下,如果堆疊跟蹤會導致其他頭可用空間不足 20,000 位元組(“餘量”),它將被截斷。如果你需要更多或更少的空間用於其他頭,可以透過設定恢復器的 frameMaxHeadroom 屬性進行調整。從版本 2.1.13、2.2.3 開始,異常訊息被包含在此計算中,並且堆疊跟蹤的數量將使用以下演算法最大化:
-
如果僅堆疊跟蹤就超出限制,則異常訊息頭將被截斷為 97 位元組加上
…,並且堆疊跟蹤也會被截斷。 -
如果堆疊跟蹤很小,訊息將被截斷(加上
…)以適應可用位元組(但堆疊跟蹤本身中的訊息被截斷為 97 位元組加上…)。
每當發生任何型別的截斷時,原始異常將被記錄以保留完整資訊。評估在頭被增強後執行,因此異常型別等資訊可以在表示式中使用。
從 2.4.8 版本開始,錯誤 exchange 和路由鍵可以作為 SpEL 表示式提供,其中 Message 是評估的根物件。
從 2.3.3 版本開始,提供了一個新的子類 RepublishMessageRecovererWithConfirms;它支援兩種釋出者確認樣式,並將在返回之前等待確認(如果未確認或訊息被退回,則丟擲異常)。
如果確認型別為 CORRELATED,則子類還會檢測訊息是否被退回並丟擲 AmqpMessageReturnedException;如果釋出被否定確認,它將丟擲 AmqpNackReceivedException。
如果確認型別為 SIMPLE,則子類將在通道上呼叫 waitForConfirmsOrDie 方法。
有關確認和退回的更多資訊,請參閱 釋出者確認和退回。
從 2.1 版本開始,添加了 ImmediateRequeueMessageRecoverer 以丟擲 ImmediateRequeueAmqpException,它會通知監聽器容器重新排隊當前失敗的訊息。
Spring Retry 的異常分類
Spring Retry 在確定哪些異常可以觸發重試方面具有極大的靈活性。預設配置會重試所有異常。鑑於使用者異常被封裝在 ListenerExecutionFailedException 中,我們需要確保分類檢查異常原因。預設分類器只檢視頂層異常。
自 Spring Retry 1.0.3 起,BinaryExceptionClassifier 有一個名為 traverseCauses 的屬性(預設值:false)。當設定為 true 時,它會遍歷異常原因,直到找到匹配項或沒有原因。
要將此分類器用於重試,你可以使用透過建構函式建立的 SimpleRetryPolicy,該建構函式接受最大嘗試次數、Exception 例項的 Map 和布林值(traverseCauses),並將此策略注入到 RetryTemplate 中。
透過 Broker 重試
從佇列中死信的訊息可以在從 DLX 重新路由後重新發布回此佇列。這種重試行為在 broker 端透過 x-death 頭進行控制。有關此方法的更多資訊,請參閱官方 RabbitMQ 文件。
另一種方法是從應用程式手動將失敗的訊息重新發布回原始 exchange。從版本 4.0 開始,RabbitMQ broker 不考慮客戶端傳送的 x-death 頭。本質上,客戶端傳送的任何 x-* 頭都會被忽略。
為了緩解 RabbitMQ broker 的這種新行為,Spring AMQP 從版本 3.2 開始引入了 retry_count 頭。當此頭不存在且伺服器端 DLX 正在執行時,x-death.count 屬性將對映到此頭。當失敗訊息手動重新發布以進行重試時,retry_count 頭值必須手動遞增。有關更多資訊,請參閱 Javadoc。
以下示例總結了透過 broker 進行手動重試的演算法:
@RabbitListener(queues = "some_queue")
public void rePublish(Message message) {
try {
// Process message
}
catch (Exception ex) {
Long retryCount = message.getMessageProperties().getRetryCount();
if (retryCount < 3) {
message.getMessageProperties().incrementRetryCount();
this.rabbitTemplate.send("", "some_queue", message);
}
else {
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
}
}