事務

Spring Rabbit 框架支援在同步和非同步用例中進行自動事務管理,並提供多種不同的語義,可以宣告式地選擇,這對於 Spring 事務的現有使用者來說很熟悉。這使得許多(如果不是大多數)常見的訊息模式易於實現。

有兩種方法可以向框架發出所需的事務語義訊號。在 RabbitTemplateSimpleMessageListenerContainer 中,都有一個標誌 channelTransacted,如果設定為 true,則告訴框架使用事務性通道,並透過提交或回滾(取決於結果)來結束所有操作(傳送或接收),異常表示回滾。另一個訊號是提供一個外部事務,其中一個 Spring 的 PlatformTransactionManager 實現作為正在進行的操作的上下文。如果框架傳送或接收訊息時已經有一個事務正在進行,並且 channelTransacted 標誌為 true,則訊息事務的提交或回滾將推遲到當前事務結束。如果 channelTransacted 標誌為 false,則沒有事務語義應用於訊息操作(它會自動確認)。

channelTransacted 標誌是一個配置時設定。它在 AMQP 元件建立時(通常在應用程式啟動時)宣告和處理一次。外部事務原則上更具動態性,因為系統在執行時響應當前執行緒狀態。然而,實際上,當事務以宣告方式分層到應用程式時,它通常也是一個配置設定。

對於 RabbitTemplate 的同步用例,外部事務由呼叫者根據喜好(通常的 Spring 事務模型)以宣告式或命令式方式提供。以下示例顯示了一個宣告式方法(通常首選,因為它是非侵入性的),其中模板已配置為 channelTransacted=true

@Transactional
public void doSomething() {
    String incoming = rabbitTemplate.receiveAndConvert();
    // do some more database processing...
    String outgoing = processInDatabaseAndExtractReply(incoming);
    rabbitTemplate.convertAndSend(outgoing);
}

在前面的示例中,一個 String 有效負載在標記為 @Transactional 的方法中被接收、轉換並作為訊息體傳送。如果資料庫處理失敗並丟擲異常,則傳入訊息將返回到代理,並且不傳送傳出訊息。這適用於在事務性方法鏈中與 RabbitTemplate 進行的任何操作(除非,例如,直接操作 Channel 以提前提交事務)。

對於 SimpleMessageListenerContainer 的非同步用例,如果需要外部事務,則必須由容器在設定偵聽器時請求。為了表示需要外部事務,使用者在配置容器時向其提供 PlatformTransactionManager 的實現。以下示例顯示瞭如何執行此操作

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

}

在前面的示例中,事務管理器作為從另一個 bean 定義(未顯示)注入的依賴項新增,並且 channelTransacted 標誌也設定為 true。其效果是,如果偵聽器失敗並丟擲異常,則事務將回滾,並且訊息也將返回到代理。重要的是,如果事務提交失敗(例如,由於資料庫約束錯誤或連線問題),AMQP 事務也將回滾,並且訊息將返回到代理。這有時被稱為“盡力而為的單階段提交”,對於可靠的訊息傳遞來說,這是一個非常強大的模式。如果在前面的示例中 channelTransacted 標誌設定為 false(預設值),則外部事務仍將提供給偵聽器,但所有訊息操作都將自動確認,因此即使在業務操作回滾時,訊息操作也會提交。

條件回滾

在版本 1.6.6 之前,在使用外部事務管理器(如 JDBC)時,向容器的 transactionAttribute 添加回滾規則無效。異常總是回滾事務。

此外,當在容器的通知鏈中使用事務通知時,條件回滾不是很實用,因為所有偵聽器異常都被包裝在 ListenerExecutionFailedException 中。

第一個問題已得到糾正,現在規則已正確應用。此外,現在提供了 ListenerFailedRuleBasedTransactionAttribute。它是 RuleBasedTransactionAttribute 的子類,唯一的區別是它知道 ListenerExecutionFailedException 並使用此類異常的原因作為規則。此事務屬性可以直接在容器中使用,也可以透過事務通知使用。

以下示例使用此規則

@Bean
public AbstractMessageListenerContainer container() {
    ...
    container.setTransactionManager(transactionManager);
    RuleBasedTransactionAttribute transactionAttribute =
        new ListenerFailedRuleBasedTransactionAttribute();
    transactionAttribute.setRollbackRules(Collections.singletonList(
        new NoRollbackRuleAttribute(DontRollBackException.class)));
    container.setTransactionAttribute(transactionAttribute);
    ...
}

關於接收訊息回滾的注意事項

AMQP 事務僅適用於傳送到代理的訊息和確認。因此,當 Spring 事務回滾並且已接收到訊息時,Spring AMQP 不僅需要回滾事務,還需要手動拒絕訊息(某種形式的 nack,但規範不這樣稱呼它)。訊息拒絕時採取的操作獨立於事務,並取決於 defaultRequeueRejected 屬性(預設值:true)。有關拒絕失敗訊息的更多資訊,請參閱訊息偵聽器和非同步情況

有關 RabbitMQ 事務及其限制的更多資訊,請參閱RabbitMQ 代理語義

在 RabbitMQ 2.7.0 之前,此類訊息(以及在通道關閉或中止時未確認的任何訊息)會返回到 Rabbit 代理佇列的末尾。自 2.7.0 起,被拒絕的訊息會返回到佇列的前面,類似於 JMS 回滾的訊息。
以前,在本地事務和提供 TransactionManager 時,事務回滾時訊息重新入隊的行為不一致。在前一種情況下,應用正常的重新入隊邏輯(AmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=false)(請參閱訊息偵聽器和非同步情況)。使用事務管理器時,訊息在回滾時無條件重新入隊。從 2.0 版本開始,行為一致,兩種情況下都應用正常的重新入隊邏輯。要恢復到以前的行為,可以將容器的 alwaysRequeueWithTxManagerRollback 屬性設定為 true。請參閱訊息偵聽器容器配置

使用 RabbitTransactionManager

RabbitTransactionManager 是在外部事務中執行 Rabbit 操作並與之同步的替代方案。此事務管理器是 PlatformTransactionManager 介面的實現,應與單個 Rabbit ConnectionFactory 一起使用。

此策略無法提供 XA 事務 — 例如,為了在訊息傳遞和資料庫訪問之間共享事務。

應用程式程式碼需要透過 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean) 而不是標準的 Connection.createChannel() 呼叫和後續通道建立來檢索事務性 Rabbit 資源。當使用 Spring AMQP 的 RabbitTemplate 時,它將自動檢測執行緒繫結的 Channel 並自動參與其事務。

使用 Java 配置,您可以透過以下 bean 設定新的 RabbitTransactionManager

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

如果您喜歡 XML 配置,可以在 XML 應用程式上下文檔案中宣告以下 bean

<bean id="rabbitTxManager"
      class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

事務同步

將 RabbitMQ 事務與某些其他(例如 DBMS)事務同步提供“盡力而為的單階段提交”語義。RabbitMQ 事務可能在事務同步的完成後期提交失敗。這會被 spring-tx 基礎設施記錄為錯誤,但不會向呼叫程式碼丟擲異常。從 2.3.10 版本開始,您可以在事務在處理該事務的同一執行緒上提交後呼叫 ConnectionUtils.checkAfterCompletion()。如果未發生異常,它將簡單返回;否則,它將丟擲 AfterCompletionFailedException,該異常將具有表示完成的同步狀態的屬性。

透過呼叫 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true) 啟用此功能;這是一個全域性標誌,適用於所有執行緒。

© . This site is unofficial and not affiliated with VMware.