事務
Spring Rabbit 框架支援同步和非同步用例中的自動事務管理,具有多種不同的語義,可以像 Spring 事務的現有使用者所熟悉的那樣以宣告方式選擇。這使得實現許多(如果不是大多數)常見訊息模式變得容易。
有兩種方式向框架指示所需的事務語義。在 RabbitTemplate
和 SimpleMessageListenerContainer
中,都有一個標誌 channelTransacted
,如果設定為 true
,則指示框架使用事務性通道,並以提交或回滾(取決於結果)結束所有操作(傳送或接收),異常表示回滾。另一種方式是提供一個外部事務,使用 Spring 的一個 PlatformTransactionManager
實現作為正在進行操作的上下文。如果在框架傳送或接收訊息時已經存在事務,並且 channelTransacted
標誌為 true
,則訊息傳遞事務的提交或回滾將被推遲到當前事務結束。如果 channelTransacted
標誌為 false
,則訊息傳遞操作不應用事務語義(它會自動確認)。
channelTransacted
標誌是一個配置時設定。它在 AMQP 元件建立時宣告和處理一次,通常在應用程式啟動時。原則上,外部事務更具動態性,因為系統在執行時響應當前執行緒狀態。然而,在實踐中,當事務以宣告方式疊加到應用程式上時,它通常也是一個配置設定。
對於帶有 RabbitTemplate
的同步用例,外部事務由呼叫者提供,可以根據喜好採用宣告式或程式設計式(通常的 Spring 事務模型)。以下示例展示了一種宣告式方法(通常首選,因為它是非侵入性的),其中模板已配置 channelTransacted=true
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
在前面的示例中,一個 String
負載在標記為 @Transactional
的方法內部被接收、轉換並作為訊息體傳送。如果資料庫處理因異常失敗,則收到的訊息將被返回給 broker,而傳送的訊息不會被髮送。這適用於在事務方法鏈內部使用 RabbitTemplate
進行的任何操作(除非例如直接操作 Channel
以提前提交事務)。
對於帶有 SimpleMessageListenerContainer
的非同步用例,如果需要外部事務,容器在設定監聽器時必須請求它。為了指示需要外部事務,使用者在配置容器時會向其提供一個 PlatformTransactionManager
的實現。以下示例展示瞭如何操作
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在前面的示例中,事務管理器作為依賴項從另一個 bean 定義(未顯示)注入,並且 channelTransacted
標誌也設定為 true
。其效果是,如果監聽器因異常失敗,事務將被回滾,並且訊息也會被返回給 broker。重要的是,如果事務提交失敗(例如,由於資料庫約束錯誤或連線問題),AMQP 事務也會回滾,並且訊息被返回給 broker。這有時被稱為“盡力單階段提交”(Best Efforts 1 Phase Commit),是實現可靠訊息傳遞的一種非常強大的模式。如果在前面的示例中,channelTransacted
標誌設定為 false
(預設值),則外部事務仍將為監聽器提供,但所有訊息操作都會自動確認,因此即使業務操作回滾,訊息操作也會提交。
條件回滾
在 1.6.6 版本之前,在使用外部事務管理器(如 JDBC)時,向容器的 transactionAttribute
添加回滾規則沒有效果。異常總是會回滾事務。
此外,在容器的增強鏈中使用事務增強時,條件回滾也不是很有效,因為所有監聽器異常都被包裝在 ListenerExecutionFailedException
中。
第一個問題已得到糾正,規則現在已正確應用。此外,現在提供了 ListenerFailedRuleBasedTransactionAttribute
。它是 RuleBasedTransactionAttribute
的子類,唯一的區別是它瞭解 ListenerExecutionFailedException
,並使用此類異常的原因來應用規則。此事務屬性可以直接在容器中使用,或透過事務增強使用。
以下示例使用了此規則
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
關於已接收訊息回滾的說明
AMQP 事務僅適用於傳送到 broker 的訊息和確認。因此,當 Spring 事務回滾且訊息已被接收時,Spring AMQP 不僅要回滾事務,還需要手動拒絕訊息(有點像 nack,但規範不這麼稱呼)。訊息拒絕時採取的操作與事務無關,並取決於 defaultRequeueRejected
屬性(預設值:true
)。有關拒絕失敗訊息的更多資訊,請參見訊息監聽器和非同步情況。
有關 RabbitMQ 事務及其限制的更多資訊,請參見RabbitMQ Broker 語義。
在 RabbitMQ 2.7.0 之前,此類訊息(以及通道關閉或中止時未確認的任何訊息)會在 Rabbit broker 上進入佇列尾部。從 2.7.0 起,拒絕的訊息會進入佇列頭部,這與 JMS 回滾訊息的方式類似。 |
之前,事務回滾時訊息的重新入隊行為在本地事務和提供 TransactionManager 的情況下不一致。在前者中,應用正常的重新入隊邏輯(AmqpRejectAndDontRequeueException 或 defaultRequeueRejected=false )(參見訊息監聽器和非同步情況)。使用事務管理器時,訊息在回滾時會無條件重新入隊。從 2.0 版本開始,行為一致,兩種情況下都應用正常的重新入隊邏輯。要恢復到之前的行為,可以將容器的 alwaysRequeueWithTxManagerRollback 屬性設定為 true 。參見訊息監聽器容器配置。 |
使用 RabbitTransactionManager
RabbitTransactionManager 是一種替代方案,用於在外部事務中執行 Rabbit 操作並與之同步。此事務管理器是 PlatformTransactionManager
介面的一個實現,應與單個 Rabbit ConnectionFactory
一起使用。
此策略無法提供 XA 事務,例如,無法在訊息傳遞和資料庫訪問之間共享事務。 |
應用程式程式碼需要透過 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)
而不是標準的 Connection.createChannel()
呼叫及其後續的通道建立來檢索事務性 Rabbit 資源。使用 Spring AMQP 的 RabbitTemplate 時,它會自動檢測執行緒繫結的 Channel 並自動參與其事務。
透過 Java 配置,您可以使用以下 bean 設定新的 RabbitTransactionManager
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
如果您更喜歡 XML 配置,可以在 XML 應用程式上下文檔案中宣告以下 bean
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
事務同步
將 RabbitMQ 事務與某些其他(例如 DBMS)事務同步可提供“盡力單階段提交”(Best Effort One Phase Commit)語義。RabbitMQ 事務可能在事務同步的完成後續階段(after completion phase)提交失敗。這會被 spring-tx
基礎設施記錄為錯誤,但不會向呼叫程式碼丟擲異常。從 2.3.10 版本開始,您可以在事務已提交後,在處理事務的同一執行緒上呼叫 ConnectionUtils.checkAfterCompletion()
。如果沒有發生異常,它將簡單返回;否則,它將丟擲 AfterCompletionFailedException
,該異常將包含一個表示完成同步狀態的屬性。
透過呼叫 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)
啟用此功能;這是一個全域性標誌,適用於所有執行緒。