事務支援
理解訊息流中的事務
Spring Integration 提供了幾個鉤子來滿足你的訊息流的事務需求。為了更好地理解這些鉤子以及如何從中受益,我們必須首先回顧你可以用來啟動訊息流的六種機制,並瞭解你如何在每種機制中處理這些流的事務需求。
以下六種機制啟動訊息流(每種機制的詳細資訊在本手冊中提供):
-
閘道器代理:基本訊息閘道器。
-
訊息通道:與
MessageChannel方法的直接互動(例如,channel.send(message))。 -
訊息釋出者:作為 Spring bean 上的方法呼叫的副產品啟動訊息流的方式。
-
入站通道介面卡和閘道器:基於將第三方系統與 Spring Integration 訊息系統連線來啟動訊息流的方式(例如,
[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel)。 -
排程器:基於預配置排程器分發的排程事件啟動訊息流的方式。
-
輪詢器:類似於排程器,這是一種基於預配置輪詢器分發的排程或基於間隔的事件啟動訊息流的方式。
我們可以將這六種機制分為兩大類:
-
使用者程序啟動的訊息流:此類別中的示例場景是呼叫閘道器方法或顯式將
Message傳送到MessageChannel。換句話說,這些訊息流依賴於第三方程序(例如你編寫的一些程式碼)來啟動。 -
守護程序啟動的訊息流:此類別中的示例場景包括輪詢器輪詢訊息佇列以使用輪詢的訊息啟動新的訊息流,或者排程器透過建立新訊息並在預定義時間啟動訊息流來排程程序。
顯然,閘道器代理、MessageChannel.send(…) 和 MessagePublisher 都屬於第一類,而入站介面卡和閘道器、排程器和輪詢器則屬於第二類。
那麼,你如何在每種類別中的各種場景下處理事務需求,以及 Spring Integration 是否需要為特定場景提供一些明確的事務支援?或者你可以使用 Spring 的事務支援嗎?
Spring 本身提供了對事務管理的一流支援。所以我們這裡的目標不是提供新的東西,而是利用 Spring 的現有事務支援。換句話說,作為框架,我們必須暴露與 Spring 事務管理功能的鉤子。然而,由於 Spring Integration 配置基於 Spring 配置,我們不需要總是暴露這些鉤子,因為 Spring 已經暴露了它們。畢竟,每個 Spring Integration 元件都是一個 Spring Bean。
考慮到這個目標,我們可以再次考慮兩種場景:使用者程序啟動的訊息流和守護程序啟動的訊息流。
在 Spring 應用程式上下文中配置並由使用者程序啟動的訊息流受制於這些程序的常規事務配置。因此,它們不需要透過 Spring Integration 顯式配置來支援事務。事務可以也應該透過 Spring 的標準事務支援來啟動。Spring Integration 訊息流自然會尊重元件的事務語義,因為它本身就是由 Spring 配置的。例如,閘道器或服務啟用器方法可以標註 @Transactional,或者在 XML 配置中定義 TransactionInterceptor,並帶有指向應進行事務處理的特定方法的切入點表示式。總之,在這些場景中,你對事務配置和邊界擁有完全控制權。
然而,當涉及到由守護程序啟動的訊息流時,情況有點不同。儘管由開發人員配置,但這些流的啟動並不直接涉及人或某些其他程序。這些是基於觸發器的流,由觸發程序(守護程序)根據程序的配置啟動。例如,我們可以有一個排程器在每個星期五晚上啟動一個訊息流。我們還可以配置一個觸發器,每秒啟動一個訊息流,等等。因此,我們需要一種方法來讓這些基於觸發器的程序知道我們希望使生成的訊息流具有事務性,以便在每次啟動新訊息流時都可以建立事務上下文。換句話說,我們需要公開一些事務配置,但只夠委託給 Spring 已經提供的事務支援(就像我們在其他場景中所做的那樣)。
輪詢器事務支援
Spring Integration 為輪詢器提供事務支援。輪詢器是一種特殊型別的元件,因為在輪詢器任務中,我們可以對本身具有事務性的資源呼叫 receive(),從而將 receive() 呼叫包含在事務邊界內,這使得在任務失敗時可以回滾。如果我們要為通道新增相同的支援,新增的事務將影響所有從 send() 呼叫開始的下游元件。這為事務劃分提供了相當廣泛的範圍,而沒有任何強有力的理由,特別是當 Spring 已經提供了多種方法來解決任何下游元件的事務需求時。然而,將 receive() 方法包含在事務邊界內是輪詢器的“強有力理由”。
任何時候你配置輪詢器,你都可以透過使用 transactional 子元素及其屬性來提供事務配置,如下例所示:
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>
前面的配置類似於原生的 Spring 事務配置。你仍然必須提供對事務管理器的引用,並指定事務屬性或依賴於預設值(例如,如果未指定 'transaction-manager' 屬性,則預設為名為 'transactionManager' 的 bean)。在內部,該過程被包裝在 Spring 的原生事務中,其中 TransactionInterceptor 負責處理事務。有關如何配置事務管理器、事務管理器的型別(如 JTA、Datasource 等)以及與事務配置相關的其他詳細資訊,請參閱 Spring Framework 參考指南。
透過上述配置,此輪詢器啟動的所有訊息流都是事務性的。有關輪詢器事務配置的更多資訊和詳細資訊,請參閱 輪詢和事務。
除了事務之外,在執行輪詢器時,你可能還需要處理更多橫切關注點。為此,輪詢器元素接受一個 <advice-chain> 子元素,它允許你定義一個自定義的 advice 例項鏈以應用於輪詢器。(有關更多詳細資訊,請參閱 可輪詢訊息源。)在 Spring Integration 2.0 中,輪詢器經過了重構,現在使用代理機制來處理事務性關注點以及其他橫切關注點。從這項工作中演變而來的一個重要變化是,我們使 <transactional> 和 <advice-chain> 元素互斥。其基本原理是,如果你需要多個 advice 並且其中一個是事務 advice,你可以像以前一樣方便地將其包含在 <advice-chain> 中,但擁有更多的控制權,因為你現在可以選擇以所需的順序定位 advice。以下示例演示瞭如何操作:
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
前面的示例展示了 Spring 事務 advice (txAdvice) 的基本 XML 配置,並將其包含在輪詢器定義的 <advice-chain> 中。如果你只需要處理輪詢器的事務性關注點,仍然可以使用 <transactional> 元素以方便起見。
事務邊界
另一個重要因素是訊息流中事務的邊界。當事務啟動時,事務上下文繫結到當前執行緒。因此,無論你的訊息流中有多少個端點和通道,只要你確保流在同一執行緒上繼續,你的事務上下文就會被保留。一旦你透過引入一個可輪詢通道或執行器通道或在某個服務中手動啟動一個新執行緒來打破它,事務邊界也會被打破。本質上,事務將立即結束,如果執行緒之間發生了成功的交接,則流將被視為成功,即使流將繼續並可能在下游某個地方導致異常,也會發送 COMMIT 訊號。如果這樣的流是同步的,該異常可能會被拋回到訊息流的啟動者(也是事務上下文的啟動者),並且事務將導致 ROLLBACK。中間方法是線上程邊界被打破的任何地方使用事務通道。例如,你可以使用委託給事務性 MessageStore 策略的基於佇列的通道,或者你可以使用基於 JMS 的通道。
事務同步
在某些環境中,將操作與包含整個流程的事務同步會很有幫助。例如,考慮一個位於流程開頭的 <file:inbound-channel-adapter/>,它執行一些資料庫更新。如果事務提交,我們可能希望將檔案移動到 success 目錄;如果事務回滾,我們可能希望將其移動到 failure 目錄。
Spring Integration 2.2 引入了將這些操作與事務同步的功能。此外,如果你沒有“真實”事務但仍希望在成功或失敗時執行不同的操作,則可以配置 PseudoTransactionManager。有關更多資訊,請參閱 偽事務。
以下列表顯示了此功能的主要策略介面:
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
該工廠負責建立 TransactionSynchronization 物件。你可以實現自己的,也可以使用框架提供的:DefaultTransactionSynchronizationFactory。此實現返回一個 TransactionSynchronization,它委託給 TransactionSynchronizationProcessor 的預設實現:ExpressionEvaluatingTransactionSynchronizationProcessor。此處理器支援三個 SpEL 表示式:beforeCommitExpression、afterCommitExpression 和 afterRollbackExpression。
這些操作對於熟悉事務的人來說應該不言自明。在每種情況下,#root 變數都是原始的 Message。在某些情況下,還會提供其他 SpEL 變數,具體取決於輪詢器輪詢的 MessageSource。例如,MongoDbMessageSource 提供 #mongoTemplate 變數,該變數引用訊息源的 MongoTemplate。類似地,RedisStoreMessageSource 提供 #store 變數,該變數引用由輪詢建立的 RedisStore。
要為特定輪詢器啟用該功能,你可以透過使用 synchronization-factory 屬性,在輪詢器的 <transactional/> 元素上提供對 TransactionSynchronizationFactory 的引用。
從版本 5.0 開始,Spring Integration 提供了 PassThroughTransactionSynchronizationFactory,當未配置 TransactionSynchronizationFactory 但 advice 鏈中存在 TransactionInterceptor 型別的 advice 時,它預設應用於輪詢端點。使用任何開箱即用的 TransactionSynchronizationFactory 實現時,如果事務 advice 之後丟擲異常,輪詢端點會將輪詢的訊息繫結到當前事務上下文,並將其作為 failedMessage 在 MessagingException 中提供。當使用不實現 TransactionInterceptor 的自定義事務 advice 時,可以顯式配置 PassThroughTransactionSynchronizationFactory 以實現此行為。在任何一種情況下,MessagingException 都會成為傳送到 errorChannel 的 ErrorMessage 的 payload,並且原因是 advice 丟擲的原始異常。以前,ErrorMessage 的 payload 是 advice 丟擲的原始異常,並且沒有提供對 failedMessage 資訊的引用,這使得難以確定事務提交問題的原因。
為了簡化這些元件的配置,Spring Integration 為預設工廠提供了名稱空間支援。以下示例展示瞭如何使用名稱空間配置檔案入站通道介面卡:
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
SpEL 評估的結果作為 payload 傳送到 committedChannel 或 rolledBackChannel(在這種情況下,這將是 Boolean.TRUE 或 Boolean.FALSE——java.io.File.renameTo() 方法呼叫的結果)。
如果你希望傳送整個 payload 以進行進一步的 Spring Integration 處理,請使用 'payload' 表示式。
|
重要的是要理解,這會將操作與事務同步。它不會使本質上不具備事務性的資源實際具有事務性。相反,事務(無論是 JDBC 還是其他)在輪詢之前啟動,並在流完成時提交或回滾,然後執行同步操作。 如果你提供自定義的 |
除了 after-commit 和 after-rollback 表示式之外,還支援 before-commit。在這種情況下,如果評估(或下游處理)丟擲異常,則事務將回滾而不是提交。
偽事務
閱讀 事務同步 部分後,你可能會認為,即使輪詢器下游沒有“真實”的事務性資源(如 JDBC),在流程完成時執行這些“成功”或“失敗”操作也會很有用。例如,考慮一個“<file:inbound-channel-adapter/>”後跟一個“<ftp:outbout-channel-adapter/>”。這兩個元件都不是事務性的,但我們可能希望根據 FTP 傳輸的成功或失敗將輸入檔案移動到不同的目錄。
為了提供此功能,框架提供了 PseudoTransactionManager,即使沒有涉及真實的事務性資源,也可以實現上述配置。如果流程正常完成,則呼叫 beforeCommit 和 afterCommit 同步。如果失敗,則呼叫 afterRollback 同步。因為它不是真正的事務,所以不會發生實際的提交或回滾。偽事務是用於啟用同步功能的工具。
要使用 PseudoTransactionManager,你可以將其定義為 <bean/>,就像配置真正的事務管理器一樣。以下示例展示瞭如何操作:
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />