事務支援
理解訊息流中的事務
Spring Integration 提供了幾個鉤子來滿足你的訊息流的事務需求。為了更好地理解這些鉤子以及如何從中受益,我們首先需要回顧一下可以用來啟動訊息流的六種機制,並看看如何在每種機制中處理這些流的事務需求。
以下六種機制啟動一個訊息流(每種機制的詳細資訊在本手冊中都有介紹):
-
閘道器代理:一種基本的訊息閘道器。
-
訊息通道:直接與
MessageChannel
方法互動(例如,channel.send(message)
)。 -
訊息釋出者:透過呼叫 Spring bean 方法的副產品來啟動訊息流的方式。
-
入站通道介面卡和閘道器:將第三方系統與 Spring Integration 訊息系統連線起來,並基於此啟動訊息流的方式(例如,
[JmsMessage] → Jms 入站介面卡 [SI Message] → SI 通道
)。 -
排程器:基於預配置的排程器分發的排程事件來啟動訊息流的方式。
-
輪詢器 (Poller):類似於排程器,這是一種基於預配置的輪詢器分發的排程或基於間隔的事件來啟動訊息流的方式。
我們可以將這六種機制分為兩大類:
-
使用者程序啟動的訊息流:此類示例場景包括呼叫閘道器方法或顯式向
MessageChannel
傳送Message
。換句話說,這些訊息流的啟動依賴於第三方程序(例如您編寫的一些程式碼)。 -
守護程序啟動的訊息流:此類示例場景包括輪詢器輪詢訊息佇列以使用輪詢的訊息啟動新的訊息流,或者排程器透過建立新訊息並在預定義的時間啟動訊息流來排程程序。
顯然,閘道器代理、MessageChannel.send(…)
和 MessagePublisher
都屬於第一類,而入站介面卡和閘道器、排程器和輪詢器屬於第二類。
那麼,如何在每種類別中的各種場景中解決事務需求?Spring Integration 是否需要針對特定場景提供明確的事務支援?或者您可以使用 Spring 的事務支援嗎?
Spring 本身提供了對事務管理的一流支援。所以我們這裡的目標不是提供新的東西,而是利用 Spring 現有的事務支援從中受益。換句話說,作為一個框架,我們必須暴露鉤子以連線 Spring 的事務管理功能。然而,由於 Spring Integration 配置基於 Spring 配置,我們不需要總是暴露這些鉤子,因為 Spring 已經暴露了它們。畢竟,每個 Spring Integration 元件都是一個 Spring Bean。
考慮到這一目標,我們可以再次考慮這兩種場景:使用者程序啟動的訊息流和守護程序啟動的訊息流。
由使用者程序啟動並在 Spring 應用上下文中配置的訊息流受到此類程序通常的事務配置的約束。因此,它們無需由 Spring Integration 顯式配置來支援事務。事務可以而且應該透過 Spring 的標準事務支援來啟動。Spring Integration 訊息流自然會遵循元件的事務語義,因為它本身就是由 Spring 配置的。例如,閘道器或服務啟用器方法可以用 @Transactional
註解,或者可以在 XML 配置中定義一個 TransactionInterceptor
,其切入點表示式指向需要事務化的特定方法。總而言之,在這些場景中,您對事務配置和邊界擁有完全控制權。
然而,對於守護程序啟動的訊息流來說,情況有所不同。儘管由開發人員配置,但這些流不直接涉及人類或某些其他程序來啟動。這些是基於觸發器的流,由觸發器程序(守護程序)根據程序配置啟動。例如,我們可以讓一個排程器在每個週五晚上啟動一個訊息流。我們還可以配置一個觸發器,每秒鐘啟動一個訊息流等等。因此,我們需要一種方法來讓這些基於觸發器的程序知道我們想要使結果訊息流具有事務性,以便在每次啟動新的訊息流時都可以建立一個事務上下文。換句話說,我們需要公開一些事務配置,但僅夠將工作委託給 Spring 已提供的事務支援(就像我們在其他場景中所做的那樣)。
輪詢器事務支援
Spring Integration 為輪詢器提供了事務支援。輪詢器是一種特殊型別的元件,因為在輪詢器任務中,我們可以對自身具有事務性的資源呼叫 receive()
,從而將 receive()
呼叫包含在事務邊界內,這樣在任務失敗時可以回滾。如果我們對通道新增相同的支援,新增的事務將影響所有從 send()
呼叫開始的下游元件。這為事務劃分提供了相當廣泛的範圍,而沒有任何強有力的理由,特別是當 Spring 已經提供了幾種方法來解決任何下游元件的事務需求時。然而,將 receive()
方法包含在事務邊界內是支援輪詢器的“強有力理由”。
無論何時配置輪詢器,您都可以透過使用 transactional
子元素及其屬性來提供事務配置,如下例所示:
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>
上述配置類似於原生的 Spring 事務配置。您仍然必須提供對事務管理器的引用,並指定事務屬性或依賴預設值(例如,如果未指定 'transaction-manager' 屬性,則預設使用名為 'transactionManager' 的 bean)。在內部,該過程包裝在 Spring 的原生事務中,其中 TransactionInterceptor
負責處理事務。有關如何配置事務管理器、事務管理器的型別(例如 JTA、Datasource 等)以及與事務配置相關的其他詳細資訊,請參閱 Spring Framework 參考指南。
透過上述配置,此輪詢器啟動的所有訊息流都將是事務性的。有關輪詢器事務配置的更多資訊和詳細資訊,請參閱 輪詢和事務。
除了事務之外,在執行輪詢器時,您可能還需要解決幾個橫切關注點。為了提供幫助,輪詢器元素接受一個 <advice-chain>
子元素,允許您定義要應用於輪詢器的一系列自定義通知例項。(更多詳細資訊請參閱 可輪詢訊息源)。在 Spring Integration 2.0 中,輪詢器經過了重構,現在使用代理機制來解決事務問題以及其他橫切關注點。這項工作帶來的一個顯著變化是,我們讓 <transactional>
和 <advice-chain>
元素互斥。這樣做的理由是,如果您需要多個通知且其中一個是事務通知,您可以將其包含在 <advice-chain>
中,就像以前一樣方便,但控制能力更強,因為您現在可以選擇將通知放置在所需的位置。以下示例展示瞭如何實現:
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
上述示例展示了基於 XML 的 Spring 事務通知 (txAdvice
) 的基本配置,並將其包含在輪詢器定義的 <advice-chain>
中。如果您只需要處理輪詢器的事務問題,仍然可以使用 <transactional>
元素作為便利方式。
事務邊界
另一個重要因素是訊息流中事務的邊界。事務啟動時,事務上下文會繫結到當前執行緒。因此,無論訊息流中有多少端點和通道,只要您確保流在同一執行緒上繼續執行,您的事務上下文就會得到保留。一旦您透過引入 可輪詢通道 (Pollable Channel) 或 執行器通道 (Executor Channel) 或在某些服務中手動啟動新執行緒來中斷它,事務邊界也會被打破。本質上,事務將立即結束,並且如果線上程之間發生了成功的移交,即使流將繼續並且下游可能仍然導致異常,該流也將被視為成功併發送 COMMIT 訊號。如果這樣的流是同步的,該異常可能會被拋回給訊息流的啟動者(同時也是事務上下文的啟動者),並且事務將導致 ROLLBACK。中間方案是在任何執行緒邊界被打破的點使用事務通道。例如,您可以使用委託給事務性 MessageStore 策略的基於佇列的通道,或者使用基於 JMS 的通道。
事務同步
在某些環境中,將操作與包含整個流的事務同步會有所幫助。例如,考慮流開頭的一個 <file:inbound-channel-adapter/>
,它執行一些資料庫更新。如果事務提交,我們可能想將檔案移動到 success
目錄;而如果事務回滾,我們可能想將其移動到 failure
目錄。
Spring Integration 2.2 引入了將這些操作與事務同步的能力。此外,如果您沒有“真實的”事務但仍希望在成功或失敗時執行不同的操作,則可以配置一個 PseudoTransactionManager
。更多資訊請參閱 偽事務。
以下列表顯示了此功能的主要策略介面:
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
該工廠負責建立一個 TransactionSynchronization
物件。您可以實現自己的,或使用框架提供的:DefaultTransactionSynchronizationFactory
。此實現返回一個 TransactionSynchronization
,它委託給 TransactionSynchronizationProcessor
的預設實現:ExpressionEvaluatingTransactionSynchronizationProcessor
。此處理器支援三個 SpEL 表示式:beforeCommitExpression
、afterCommitExpression
和 afterRollbackExpression
。
這些操作對於熟悉事務的人來說應該是不言自明的。在每種情況下,#root
變數都是原始的 Message
。在某些情況下,根據輪詢器輪詢的 MessageSource
,會提供其他 SpEL 變數。例如,MongoDbMessageSource
提供了 #mongoTemplate
變數,該變數引用訊息源的 MongoTemplate
。類似地,RedisStoreMessageSource
提供了 #store
變數,該變數引用輪詢建立的 RedisStore
。
要為特定的輪詢器啟用此功能,您可以在輪詢器的 <transactional/>
元素上使用 synchronization-factory
屬性提供對 TransactionSynchronizationFactory
的引用。
從 5.0 版本開始,Spring Integration 提供了 PassThroughTransactionSynchronizationFactory
,當未配置 TransactionSynchronizationFactory
但通知鏈中存在型別為 TransactionInterceptor
的通知時,它會預設應用於輪詢端點。使用任何開箱即用的 TransactionSynchronizationFactory
實現時,如果在事務通知之後丟擲異常,輪詢端點會將輪詢的訊息繫結到當前事務上下文,並在 MessagingException
中將其作為 failedMessage
提供。使用不實現 TransactionInterceptor
的自定義事務通知時,您可以顯式配置 PassThroughTransactionSynchronizationFactory
以實現此行為。在這兩種情況下,MessagingException
都將成為傳送到 errorChannel
的 ErrorMessage
的 payload,而 cause 是通知丟擲的原始異常。在此之前,ErrorMessage
的 payload 是通知丟擲的原始異常,並且不提供 failedMessage
資訊的引用,這使得難以確定事務提交問題的原因。
為了簡化這些元件的配置,Spring Integration 為預設工廠提供了名稱空間支援。以下示例展示瞭如何使用名稱空間配置檔案入站通道介面卡:
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
SpEL 評估的結果將作為 payload 傳送到 committedChannel
或 rolledBackChannel
(在本例中,這將是 Boolean.TRUE
或 Boolean.FALSE
——即 java.io.File.renameTo()
方法呼叫的結果)。
如果您希望傳送整個 payload 以進行進一步的 Spring Integration 處理,請使用 'payload' 表示式。
重要的是要理解,這會將操作與事務同步。它並不會使一個並非固有事務性的資源真正具有事務性。相反,事務(無論是 JDBC 還是其他型別)在輪詢之前啟動,並在流完成時提交或回滾,然後執行同步操作。 如果您提供了自定義的 |
除了 after-commit
和 after-rollback
表示式之外,還支援 before-commit
。在這種情況下,如果評估(或下游處理)丟擲異常,事務將回滾而不是提交。
偽事務
在閱讀了事務同步一節後,你可能會認為在流程完成時執行這些“成功”或“失敗”操作會很有用,即使在輪詢器的下游沒有“真正的”事務性資源(如 JDBC)。例如,考慮一個“<file:inbound-channel-adapter/>”後跟一個“<ftp:outbout-channel-adapter/>”。這兩個元件都不是事務性的,但我們可能希望根據 FTP 傳輸的成功或失敗,將輸入檔案移動到不同的目錄。
為了提供此功能,框架提供了一個 PseudoTransactionManager
,即使沒有涉及真正的事務性資源,也可以啟用上述配置。如果流程正常完成,將呼叫 beforeCommit
和 afterCommit
同步。如果失敗,將呼叫 afterRollback
同步。由於它不是真正的事務,因此不會發生實際的提交或回滾。偽事務是用於啟用同步特性的載體。
要使用 PseudoTransactionManager
,你可以將其定義為一個 <bean/>,就像配置真正的事務管理器一樣。下面的例子展示瞭如何這樣做
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />