訊息儲存
《企業整合模式》(EIP)一書識別了多種具有訊息緩衝能力的模式。例如,聚合器會緩衝訊息直到它們可以被釋放,而 QueueChannel 會緩衝訊息直到消費者顯式地從該通道接收這些訊息。由於訊息流中任何點都可能發生故障,因此緩衝訊息的 EIP 元件也引入了訊息可能丟失的風險點。
為了降低訊息丟失的風險,EIP 定義了訊息儲存模式,該模式允許 EIP 元件儲存訊息,通常是在某種型別的持久化儲存(例如 RDBMS)中。
Spring Integration 透過以下方式支援訊息儲存模式:
-
定義
org.springframework.integration.store.MessageStore策略介面 -
提供該介面的多個實現
-
在所有具有訊息緩衝能力的元件上暴露
message-store屬性,以便您可以注入任何實現MessageStore介面的例項。
有關如何配置特定訊息儲存實現以及如何將 MessageStore 實現注入特定緩衝元件的詳細資訊在整個手冊中都有描述(請參閱特定元件,例如 QueueChannel、Aggregator、Delayer 等)。以下兩對示例展示瞭如何為 QueueChannel 和聚合器新增訊息儲存的引用。
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
預設情況下,訊息使用 o.s.i.store.SimpleMessageStore(MessageStore 的一個實現)儲存在記憶體中。這對於開發或簡單低容量環境來說可能很好,在這些環境中,非持久化訊息的潛在丟失不是問題。然而,典型的生產應用程式需要一個更健壯的選項,不僅要降低訊息丟失的風險,還要避免潛在的記憶體不足錯誤。因此,我們還為各種資料儲存提供了 MessageStore 實現。以下是受支援實現的完整列表:
-
Hazelcast 訊息儲存:使用 Hazelcast 分散式快取來儲存訊息
-
JDBC 訊息儲存:使用 RDBMS 來儲存訊息
-
Redis 訊息儲存:使用 Redis 鍵/值資料儲存來儲存訊息
-
MongoDB 訊息儲存:使用 MongoDB 文件儲存來儲存訊息
|
但是,在使用 訊息資料(負載和頭部)透過不同的序列化策略進行序列化和反序列化,具體取決於 特別注意表示某些型別資料的訊息頭。例如,如果其中一個訊息頭包含某個 Spring bean 的例項,則在反序列化時,您可能會得到該 bean 的不同例項,這會直接影響框架建立的一些隱式訊息頭(例如 從 Spring Integration 3.0 版本開始,您可以透過配置一個訊息頭增強器來解決此問題,該增強器在將通道註冊到 此外,請考慮當您按如下方式配置訊息流時會發生什麼: 有關更多資訊,請參閱訊息頭增強器。 |
Spring Integration 4.0 引入了兩個新介面
-
ChannelMessageStore:用於實現QueueChannel例項特有的操作 -
PriorityCapableChannelMessageStore:用於標記MessageStore實現以用於PriorityChannel例項,併為持久化訊息提供優先順序排序。
實際行為取決於實現。框架提供以下實現,它們可以作為 QueueChannel 和 PriorityChannel 的永續性 MessageStore 使用
|
關於
SimpleMessageStore 的注意事項從 4.1 版本開始, 現在,在聚合器等元件之外訪問組儲存的使用者將獲得對聚合器正在使用的組的直接引用,而不是副本。在聚合器外部操作該組可能會導致不可預測的結果。 因此,您應該避免此類操作,或者將 |
使用 MessageGroupFactory
從 4.3 版本開始,一些 MessageGroupStore 實現可以注入自定義的 MessageGroupFactory 策略,以建立和定製 MessageGroupStore 使用的 MessageGroup 例項。這預設為 SimpleMessageGroupFactory,它根據 GroupType.HASH_SET (LinkedHashSet) 內部集合生成 SimpleMessageGroup 例項。其他可能的選項是 SYNCHRONISED_SET 和 BLOCKING_QUEUE,其中最後一個可用於恢復先前的 SimpleMessageGroup 行為。此外,還提供了 PERSISTENT 選項。更多資訊請參閱下一節。從 5.0.1 版本開始,當組中訊息的順序和唯一性無關緊要時,還提供了 LIST 選項。
持久化 MessageGroupStore 和延遲載入
從 4.3 版本開始,所有持久化的 MessageGroupStore 例項都以延遲載入的方式從儲存中檢索 MessageGroup 例項及其 messages。在大多數情況下,這對於關聯 MessageHandler 例項(參見聚合器和重排序器)非常有用,因為在每次關聯操作時從儲存中載入整個 MessageGroup 會增加開銷。
您可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false) 選項來關閉配置中的延遲載入行為。
我們在 MongoDB MessageStore (MongoDB 訊息儲存) 和 <aggregator> (聚合器) 上的延遲載入效能測試使用了一個類似於以下的自定義 release-strategy:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
它對 1000 條簡單訊息產生類似以下的結果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
然而,從 5.5 版本開始,所有持久化 MessageGroupStore 實現都提供了一個基於目標資料庫流式 API 的 streamMessagesForGroup(Object groupId) 契約。這提高了當組在儲存中非常大時的資源利用率。在框架內部,這個新的 API 被用於例如 Delayer 在啟動時重新排程持久化訊息。返回的 Stream<Message<?>> 必須在處理結束時關閉,例如透過 try-with-resources 自動關閉。只要使用 PersistentMessageGroup,其 streamMessages() 就會委託給 MessageGroupStore.streamMessagesForGroup()。
訊息組條件
從 5.5 版本開始,MessageGroup 抽象提供了一個 condition 字串選項。此選項的值可以是任何可以在以後解析以做出組決定的內容。例如,來自關聯訊息處理器的 ReleaseStrategy 可能會查閱此組屬性,而不是遍歷組中的所有訊息。MessageGroupStore 暴露了一個 setGroupCondition(Object groupId, String condition) API。為此,AbstractCorrelatingMessageHandler 中添加了一個 setGroupConditionSupplier(BiFunction<Message<?>, String, String>) 選項。此函式在訊息新增到組後以及組的現有條件上進行評估。實現可能會決定返回新值、現有值或將目標條件重置為 null。condition 的值可以是 JSON、SpEL 表示式、數字或任何可以序列化為字串並在之後解析的內容。例如,來自檔案聚合器元件的 FileMarkerReleaseStrategy 從 FileSplitter.FileMarker.Mark.END 訊息的 FileHeaders.LINE_COUNT 訊息頭中填充條件到組中,並與組大小與此條件中的值進行比較來諮詢其 canRelease()。這樣它就不必遍歷組中的所有訊息來查詢帶有 FileHeaders.LINE_COUNT 訊息頭的 FileSplitter.FileMarker.Mark.END 訊息。它還允許結束標記在所有其他記錄之前到達聚合器;例如,在多執行緒環境中處理檔案時。
此外,為了配置方便,引入了 GroupConditionProvider 契約。AbstractCorrelatingMessageHandler 檢查所提供的 ReleaseStrategy 是否實現了此介面,並提取 conditionSupplier 用於組條件評估邏輯。
使用 LockRegistry
從 6.5 版本開始,AbstractMessageGroupStore 抽象透過鎖操作訊息組的元資料。此鎖獲取 groupId 並由 LockRegistry 生成。其目的是確保訊息和訊息組操作的原子性。在多執行緒中,同時新增或刪除訊息或更新元資料時,如果缺少鎖,某些實現可能會出現訊息組錯誤。預設情況下使用 DefaultLockRegistry,任何 LockRegistry 都可以透過 AbstractMessageGroupStore.setLockRegistry() 注入,通常是針對同一持久儲存的實現。有關更多資訊,請參閱分散式鎖。