事務

Spring Rabbit 框架支援同步和非同步用例中的自動事務管理,具有多種不同的語義,可以像 Spring 事務的現有使用者所熟悉的那樣以宣告方式選擇。這使得實現許多(如果不是大多數)常見訊息模式變得容易。

有兩種方式向框架指示所需的事務語義。在 RabbitTemplateSimpleMessageListenerContainer 中,都有一個標誌 channelTransacted,如果設定為 true,則指示框架使用事務性通道,並以提交或回滾(取決於結果)結束所有操作(傳送或接收),異常表示回滾。另一種方式是提供一個外部事務,使用 Spring 的一個 PlatformTransactionManager 實現作為正在進行操作的上下文。如果在框架傳送或接收訊息時已經存在事務,並且 channelTransacted 標誌為 true,則訊息傳遞事務的提交或回滾將被推遲到當前事務結束。如果 channelTransacted 標誌為 false,則訊息傳遞操作不應用事務語義(它會自動確認)。

channelTransacted 標誌是一個配置時設定。它在 AMQP 元件建立時宣告和處理一次,通常在應用程式啟動時。原則上,外部事務更具動態性,因為系統在執行時響應當前執行緒狀態。然而,在實踐中,當事務以宣告方式疊加到應用程式上時,它通常也是一個配置設定。

對於帶有 RabbitTemplate 的同步用例,外部事務由呼叫者提供,可以根據喜好採用宣告式或程式設計式(通常的 Spring 事務模型)。以下示例展示了一種宣告式方法(通常首選,因為它是非侵入性的),其中模板已配置 channelTransacted=true

@Transactional
public void doSomething() {
    String incoming = rabbitTemplate.receiveAndConvert();
    // do some more database processing...
    String outgoing = processInDatabaseAndExtractReply(incoming);
    rabbitTemplate.convertAndSend(outgoing);
}

在前面的示例中,一個 String 負載在標記為 @Transactional 的方法內部被接收、轉換並作為訊息體傳送。如果資料庫處理因異常失敗,則收到的訊息將被返回給 broker,而傳送的訊息不會被髮送。這適用於在事務方法鏈內部使用 RabbitTemplate 進行的任何操作(除非例如直接操作 Channel 以提前提交事務)。

對於帶有 SimpleMessageListenerContainer 的非同步用例,如果需要外部事務,容器在設定監聽器時必須請求它。為了指示需要外部事務,使用者在配置容器時會向其提供一個 PlatformTransactionManager 的實現。以下示例展示瞭如何操作

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

}

在前面的示例中,事務管理器作為依賴項從另一個 bean 定義(未顯示)注入,並且 channelTransacted 標誌也設定為 true。其效果是,如果監聽器因異常失敗,事務將被回滾,並且訊息也會被返回給 broker。重要的是,如果事務提交失敗(例如,由於資料庫約束錯誤或連線問題),AMQP 事務也會回滾,並且訊息被返回給 broker。這有時被稱為“盡力單階段提交”(Best Efforts 1 Phase Commit),是實現可靠訊息傳遞的一種非常強大的模式。如果在前面的示例中,channelTransacted 標誌設定為 false(預設值),則外部事務仍將為監聽器提供,但所有訊息操作都會自動確認,因此即使業務操作回滾,訊息操作也會提交。

條件回滾

在 1.6.6 版本之前,在使用外部事務管理器(如 JDBC)時,向容器的 transactionAttribute 添加回滾規則沒有效果。異常總是會回滾事務。

此外,在容器的增強鏈中使用事務增強時,條件回滾也不是很有效,因為所有監聽器異常都被包裝在 ListenerExecutionFailedException 中。

第一個問題已得到糾正,規則現在已正確應用。此外,現在提供了 ListenerFailedRuleBasedTransactionAttribute。它是 RuleBasedTransactionAttribute 的子類,唯一的區別是它瞭解 ListenerExecutionFailedException,並使用此類異常的原因來應用規則。此事務屬性可以直接在容器中使用,或透過事務增強使用。

以下示例使用了此規則

@Bean
public AbstractMessageListenerContainer container() {
    ...
    container.setTransactionManager(transactionManager);
    RuleBasedTransactionAttribute transactionAttribute =
        new ListenerFailedRuleBasedTransactionAttribute();
    transactionAttribute.setRollbackRules(Collections.singletonList(
        new NoRollbackRuleAttribute(DontRollBackException.class)));
    container.setTransactionAttribute(transactionAttribute);
    ...
}

關於已接收訊息回滾的說明

AMQP 事務僅適用於傳送到 broker 的訊息和確認。因此,當 Spring 事務回滾且訊息已被接收時,Spring AMQP 不僅要回滾事務,還需要手動拒絕訊息(有點像 nack,但規範不這麼稱呼)。訊息拒絕時採取的操作與事務無關,並取決於 defaultRequeueRejected 屬性(預設值:true)。有關拒絕失敗訊息的更多資訊,請參見訊息監聽器和非同步情況

有關 RabbitMQ 事務及其限制的更多資訊,請參見RabbitMQ Broker 語義

在 RabbitMQ 2.7.0 之前,此類訊息(以及通道關閉或中止時未確認的任何訊息)會在 Rabbit broker 上進入佇列尾部。從 2.7.0 起,拒絕的訊息會進入佇列頭部,這與 JMS 回滾訊息的方式類似。
之前,事務回滾時訊息的重新入隊行為在本地事務和提供 TransactionManager 的情況下不一致。在前者中,應用正常的重新入隊邏輯(AmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=false)(參見訊息監聽器和非同步情況)。使用事務管理器時,訊息在回滾時會無條件重新入隊。從 2.0 版本開始,行為一致,兩種情況下都應用正常的重新入隊邏輯。要恢復到之前的行為,可以將容器的 alwaysRequeueWithTxManagerRollback 屬性設定為 true。參見訊息監聽器容器配置

使用 RabbitTransactionManager

RabbitTransactionManager 是一種替代方案,用於在外部事務中執行 Rabbit 操作並與之同步。此事務管理器是 PlatformTransactionManager 介面的一個實現,應與單個 Rabbit ConnectionFactory 一起使用。

此策略無法提供 XA 事務,例如,無法在訊息傳遞和資料庫訪問之間共享事務。

應用程式程式碼需要透過 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean) 而不是標準的 Connection.createChannel() 呼叫及其後續的通道建立來檢索事務性 Rabbit 資源。使用 Spring AMQP 的 RabbitTemplate 時,它會自動檢測執行緒繫結的 Channel 並自動參與其事務。

透過 Java 配置,您可以使用以下 bean 設定新的 RabbitTransactionManager

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

如果您更喜歡 XML 配置,可以在 XML 應用程式上下文檔案中宣告以下 bean

<bean id="rabbitTxManager"
      class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

事務同步

將 RabbitMQ 事務與某些其他(例如 DBMS)事務同步可提供“盡力單階段提交”(Best Effort One Phase Commit)語義。RabbitMQ 事務可能在事務同步的完成後續階段(after completion phase)提交失敗。這會被 spring-tx 基礎設施記錄為錯誤,但不會向呼叫程式碼丟擲異常。從 2.3.10 版本開始,您可以在事務已提交後,在處理事務的同一執行緒上呼叫 ConnectionUtils.checkAfterCompletion()。如果沒有發生異常,它將簡單返回;否則,它將丟擲 AfterCompletionFailedException,該異常將包含一個表示完成同步狀態的屬性。

透過呼叫 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true) 啟用此功能;這是一個全域性標誌,適用於所有執行緒。