彈性:從錯誤和代理故障中恢復

Spring AMQP 提供的一些關鍵(也是最受歡迎)的高階功能與在協議錯誤或代理故障發生時的恢復和自動重連有關。我們已經在指南中看到了所有相關的元件,但在本節中將它們集中在一起並單獨指出這些功能和恢復場景將會很有幫助。

主要的重連功能由 CachingConnectionFactory 本身啟用。使用 RabbitAdmin 的自動宣告功能通常也很有益。此外,如果你關心保證送達,可能還需要在 RabbitTemplateSimpleMessageListenerContainer 中使用 channelTransacted 標誌,並在 SimpleMessageListenerContainer 中使用 AcknowledgeMode.AUTO(如果你自己處理確認,則使用手動模式)。

Exchange、Queue 和 Binding 的自動宣告

RabbitAdmin 元件可以在啟動時宣告 exchange、queue 和 binding。它透過一個 ConnectionListener 延遲執行此操作。因此,如果代理在啟動時不存在,也沒有關係。第一次使用 Connection(例如,透過傳送訊息)時,監聽器會觸發並應用 admin 功能。在監聽器中進行自動宣告的另一個好處是,如果連線因任何原因(例如,代理宕機、網路故障等)斷開,當連線重新建立時,它們會再次被應用。

以這種方式宣告的佇列必須具有固定的名稱——要麼是顯式宣告的,要麼是框架為 AnonymousQueue 例項生成的。匿名佇列是不可持久化、排他的且自動刪除的。
自動宣告僅在 CachingConnectionFactory 的快取模式為 CHANNEL(預設值)時執行。此限制存在的原因是排他佇列和自動刪除佇列繫結到連線。

從 2.2.2 版本開始,RabbitAdmin 將檢測 DeclarableCustomizer 型別的 bean,並在實際處理宣告之前應用該功能。這很有用,例如,可以在框架內部對某個新引數(屬性)提供一流支援之前,先對其進行設定。

@Bean
public DeclarableCustomizer customizer() {
    return dec -> {
        if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
            dec.addArgument("some.new.queue.argument", true);
        }
        return dec;
    };
}

在不直接訪問 Declarable bean 定義的專案中,這也很有用。

同步操作中的故障與重試選項

在使用 RabbitTemplate 進行同步操作序列時,如果丟失了與代理的連線(例如),Spring AMQP 會丟擲 AmqpException(通常是 AmqpIOException,但不總是)。我們不會隱藏出現問題的事實,因此你必須能夠捕獲並響應此異常。如果你懷疑連線丟失(並且不是你的原因),最簡單的做法是再次嘗試該操作。你可以手動執行此操作,或者考慮使用 Spring Retry 來處理重試(命令式或宣告式)。

Spring Retry 提供了一些 AOP 攔截器,並在指定重試引數(嘗試次數、異常型別、退避演算法等)方面提供了極大的靈活性。Spring AMQP 還提供了一些便利的工廠 bean,用於以方便的形式建立適用於 AMQP 用例的 Spring Retry 攔截器,它們帶有強型別回撥介面,你可以使用這些介面實現自定義恢復邏輯。有關更多詳細資訊,請參閱 StatefulRetryOperationsInterceptorStatelessRetryOperationsInterceptor 的 Javadoc 和屬性。如果不存在事務或事務在重試回撥內部啟動,則無狀態重試是合適的。請注意,無狀態重試比有狀態重試更容易配置和分析,但如果存在必須回滾或肯定會回滾的正在進行的事務,則通常不適用。在事務中間斷開連線應該與回滾具有相同的效果。因此,對於事務在堆疊更高層啟動的重連,有狀態重試通常是最佳選擇。有狀態重試需要一種機制來唯一標識訊息。最簡單的方法是讓傳送者在 MessageId 訊息屬性中放入一個唯一值。提供的訊息轉換器提供了執行此操作的選項:你可以將 createMessageIds 設定為 true。否則,你可以將 MessageKeyGenerator 實現注入到攔截器中。金鑰生成器必須為每條訊息返回一個唯一金鑰。在 2.0 版本之前,提供了 MissingMessageIdAdvice。它允許沒有 messageId 屬性的訊息恰好重試一次(忽略重試設定)。現在不再提供此 advice,因為從 spring-retry 1.2 版本開始,其功能已內建到攔截器和訊息監聽器容器中。

為了向後相容,預設情況下(在一次重試後),帶有 null 訊息 ID 的訊息對消費者來說被視為致命錯誤(消費者停止)。要複製 MissingMessageIdAdvice 提供的功能,可以在監聽器容器上將 statefulRetryFatalWithNullMessageId 屬性設定為 false。透過此設定,消費者會繼續執行,並且訊息將被拒絕(在一次重試後)。它將被丟棄或路由到死信佇列(如果已配置)。

從 1.3 版本開始,提供了一個構建器 API,用於透過 Java(在 @Configuration 類中)輔助組裝這些攔截器。以下示例展示瞭如何執行此操作

@Bean
public StatefulRetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateful()
            .maxAttempts(5)
            .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
            .build();
}

透過這種方式只能配置重試能力的一個子集。更高階的功能需要將 RetryTemplate 配置為 Spring bean。有關可用策略及其配置的完整資訊,請參閱 Spring Retry Javadoc

批次監聽器的重試

不建議為批次監聽器配置重試,除非該批次是由生產者在一個記錄中建立的。有關消費者和生產者建立的批次的資訊,請參閱批次訊息。對於消費者建立的批次,框架不知道批次中的哪條訊息導致了故障,因此在重試耗盡後無法恢復。對於生產者建立的批次,由於只有一條訊息實際失敗,因此可以恢復整個訊息。應用程式可能希望通知自定義恢復器故障發生在批次的哪個位置,或許可以透過設定丟擲異常的索引屬性。

批次監聽器的重試恢復器必須實現 MessageBatchRecoverer

訊息監聽器與非同步情況

如果 MessageListener 因業務異常而失敗,該異常由訊息監聽器容器處理,然後容器會返回繼續監聽下一條訊息。如果故障是由連線斷開(非業務異常)引起的,則為監聽器收集訊息的消費者必須被取消並重新啟動。SimpleMessageListenerContainer 會無縫地處理這種情況,並記錄日誌表示正在重啟監聽器。實際上,它會無限迴圈,嘗試重啟消費者。只有當消費者行為非常糟糕時,它才會放棄。一個副作用是,如果在容器啟動時代理宕機,它會一直嘗試直到連線建立成功。

與協議錯誤和連線斷開不同,業務異常處理可能需要更多的考慮和一些自定義配置,尤其是在使用事務或容器確認的情況下。在 2.8.x 之前,RabbitMQ 沒有死信行為的定義。因此,預設情況下,因業務異常而被拒絕或回滾的訊息可能會被無限次地重新投遞。為了在客戶端限制重新投遞的次數,一種選擇是在監聽器的 advice 鏈中使用 StatefulRetryOperationsInterceptor。該攔截器可以有一個恢復回撥,用於實現自定義的死信操作——根據你的特定環境進行相應的處理。

另一種選擇是將容器的 defaultRequeueRejected 屬性設定為 false。這將導致所有失敗的訊息被丟棄。當使用 RabbitMQ 2.8.x 或更高版本時,這也方便將訊息投遞到死信 exchange。

或者,你可以丟擲 AmqpRejectAndDontRequeueException。這樣做可以阻止訊息重新入隊,無論 defaultRequeueRejected 屬性設定如何。

從 2.1 版本開始,引入了 ImmediateRequeueAmqpException,用於執行完全相反的邏輯:無論 defaultRequeueRejected 屬性設定如何,訊息都將被重新入隊。

通常,這兩種技術會結合使用。可以在 advice 鏈中使用 StatefulRetryOperationsInterceptor,並配合一個丟擲 AmqpRejectAndDontRequeueExceptionMessageRecoverer。當所有重試都已耗盡時,會呼叫 MessageRecoverRejectAndDontRequeueRecoverer 正是執行此操作的。預設的 MessageRecoverer 會處理錯誤的 message 併發出 WARN 級別的訊息。

從 1.3 版本開始,提供了一個新的 RepublishMessageRecoverer,允許在重試耗盡後釋出失敗的訊息。

當恢復器處理最終異常時,訊息會被確認,並且如果已配置,代理也不會將其傳送到死信 exchange。

當在消費者端使用 RepublishMessageRecoverer 時,接收到的訊息在 receivedDeliveryMode 訊息屬性中包含 deliveryMode。在這種情況下,deliveryMode 可能為 null。這意味著代理上的投遞模式為 NON_PERSISTENT。從 2.0 版本開始,你可以為 RepublishMessageRecoverer 配置要在重新發布的訊息中設定的 deliveryMode,如果它為 null 的話。預設情況下,它使用 MessageProperties 的預設值 - MessageDeliveryMode.PERSISTENT

以下示例展示瞭如何將 RepublishMessageRecoverer 設定為恢復器

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(5)
            .recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
            .build();
}

RepublishMessageRecoverer 在訊息頭部中釋出訊息,並附帶額外資訊,例如異常訊息、堆疊跟蹤、原始 exchange 和路由鍵。可以透過建立子類並重寫 additionalHeaders() 來新增額外的頭部。deliveryMode(或任何其他屬性)也可以在 additionalHeaders() 中更改,如下例所示

RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {

    protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
        message.getMessageProperties()
            .setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
        return null;
    }

};

從 2.0.5 版本開始,如果堆疊跟蹤太大,可能會被截斷;這是因為所有頭部都必須適合單個幀。預設情況下,如果堆疊跟蹤導致為其他頭部留出的可用空間(“餘量”)少於 20,000 位元組,它將被截斷。如果你需要更多或更少空間用於其他頭部,可以透過設定恢復器的 frameMaxHeadroom 屬性來調整此值。從 2.1.13 和 2.2.3 版本開始,異常訊息也包含在此計算中,並且堆疊跟蹤的量將使用以下演算法最大化

  • 如果僅堆疊跟蹤就超出限制,則異常訊息頭部將被截斷為 97 位元組加上 …​,並且堆疊跟蹤也會被截斷。

  • 如果堆疊跟蹤很小,訊息(加上 …​)將被截斷以適應可用位元組(但堆疊跟蹤本身中的訊息被截斷為 97 位元組加上 …​)。

每當發生任何型別的截斷時,原始異常都會被記錄下來,以保留完整資訊。評估在頭部被增強後執行,因此像異常型別這樣的資訊可以在表示式中使用。

從 2.4.8 版本開始,錯誤 exchange 和路由鍵可以作為 SpEL 表示式提供,其中 Message 是評估的根物件。

從 2.3.3 版本開始,提供了一個新的子類 RepublishMessageRecovererWithConfirms;它支援兩種風格的釋出者確認,並會在返回之前等待確認(如果未確認或訊息被返回,則丟擲異常)。

如果確認型別是 CORRELATED,該子類還會檢測訊息是否被返回並丟擲 AmqpMessageReturnedException;如果釋出被否定確認,它會丟擲 AmqpNackReceivedException

如果確認型別是 SIMPLE,該子類將在通道上呼叫 waitForConfirmsOrDie 方法。

有關確認和返回的更多資訊,請參閱釋出者確認和返回

從 2.1 版本開始,添加了 ImmediateRequeueMessageRecoverer,用於丟擲 ImmediateRequeueAmqpException,它通知監聽器容器重新入隊當前失敗的訊息。

Spring Retry 的異常分類

Spring Retry 在確定哪些異常可以觸發重試方面具有極大的靈活性。預設配置會重試所有異常。考慮到使用者異常被包裝在 ListenerExecutionFailedException 中,我們需要確保分類器檢查異常的原因。預設分類器僅檢視頂層異常。

從 Spring Retry 1.0.3 開始,BinaryExceptionClassifier 有一個名為 traverseCauses 的屬性(預設為 false)。當設定為 true 時,它會遍歷異常原因,直到找到匹配項或沒有更多原因。

要將此分類器用於重試,可以使用一個透過建構函式建立的 SimpleRetryPolicy,該建構函式接受最大嘗試次數、Exception 例項的 Map 和布林值 (traverseCauses),然後將此策略注入到 RetryTemplate 中。

透過代理進行重試

從佇列成為死信的訊息在從 DLX 重新路由後,可以重新發布回該佇列。這種重試行為透過 x-death 頭部在代理端控制。有關此方法的更多資訊,請參閱官方 RabbitMQ 文件

另一種方法是從應用程式手動將失敗的訊息重新發布回原始 exchange。從 4.0 版本開始,RabbitMQ 代理不考慮客戶端傳送的 x-death 頭部。實際上,客戶端傳送的任何 x-* 頭部都會被忽略。

為了緩解 RabbitMQ 代理的這種新行為,Spring AMQP 從 3.2 版本開始引入了一個 retry_count 頭部。當此頭部不存在且伺服器端 DLX 生效時,x-death.count 屬性會被對映到此頭部。當手動重新發布失敗訊息以進行重試時,retry_count 頭部的值必須手動遞增。有關更多資訊,請參閱 MessageProperties.incrementRetryCount() 的 JavaDoc。

以下示例總結了透過代理進行手動重試的演算法

@RabbitListener(queueNames = "some_queue")
public void rePublish(Message message) {
    try {
    // Process message
    }
    catch (Exception ex) {
        Long retryCount = message.getMessageProperties().getRetryCount();
        if (retryCount < 3) {
            message.getMessageProperties().incrementRetryCount();
            this.rabbitTemplate.send("", "some_queue", message);
        }
        else {
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
		}
    }
}