訊息儲存
《企業整合模式》(EIP) 一書指出了幾種具有緩衝訊息能力的模式。例如,聚合器會緩衝訊息直到它們可以被釋放,而一個 QueueChannel
會緩衝訊息直到消費者明確地從該通道接收這些訊息。由於訊息流中任何點都可能發生故障,緩衝訊息的 EIP 元件也引入了訊息可能丟失的點。
為了降低訊息丟失的風險,EIP 定義了訊息儲存模式,該模式允許 EIP 元件儲存訊息,通常儲存在某種型別的持久化儲存中(例如 RDBMS)。
Spring Integration 透過以下方式提供對訊息儲存模式的支援:
-
定義一個
org.springframework.integration.store.MessageStore
策略介面 -
提供該介面的多種實現
-
在所有具有緩衝訊息能力的元件上暴露一個
message-store
屬性,以便你可以注入實現MessageStore
介面的任何例項。
如何在手冊中配置特定的訊息儲存實現以及如何將 MessageStore
實現注入到特定的緩衝元件的詳細資訊都在手冊的各個部分有描述(參見特定元件,例如QueueChannel、Aggregator、Delayer 等)。下面這對示例展示瞭如何為 QueueChannel
和聚合器新增訊息儲存的引用。
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
預設情況下,訊息透過使用 o.s.i.store.SimpleMessageStore
(一個 MessageStore
實現)儲存在記憶體中。這對於開發或簡單的低流量環境來說可能沒問題,在這些環境中,非持久化訊息潛在的丟失不是問題。然而,典型的生產應用程式需要一個更健壯的選項,不僅能降低訊息丟失的風險,還能避免潛在的記憶體溢位錯誤。因此,我們也為各種資料儲存提供了 MessageStore
實現。以下是支援的實現的完整列表:
-
Hazelcast Message Store: 使用 Hazelcast 分散式快取儲存訊息
-
JDBC Message Store: 使用 RDBMS 儲存訊息
-
Redis Message Store: 使用 Redis 鍵/值資料儲存儲存訊息
-
MongoDB Message Store: 使用 MongoDB 文件儲存儲存訊息
然而,在使用 訊息資料(payload 和 headers)的序列化和反序列化使用不同的序列化策略,具體取決於 特別注意那些表示某些型別資料的 headers。例如,如果某個 header 包含某個 Spring bean 的例項,反序列化後,你可能會得到該 bean 的不同例項,這會直接影響框架建立的一些隱式 headers(例如 從 Spring Integration 3.0 版本開始,你可以使用配置為在向 另外,考慮當你按如下方式配置訊息流時會發生什麼:gateway → queue-channel(由持久化 Message Store 支援)→ service-activator。該 gateway 建立了一個臨時回覆通道,當 service-activator 的 poller 從佇列讀取時,這個通道就丟失了。同樣,你可以使用 header enricher 將 headers 替換為 更多資訊,請參閱Header Enricher。 |
Spring Integration 4.0 引入了兩個新介面:
-
ChannelMessageStore
: 實現QueueChannel
例項特有的操作 -
PriorityCapableChannelMessageStore
: 標記用於PriorityChannel
例項的MessageStore
實現,併為持久化訊息提供優先順序排序。
實際行為取決於具體實現。框架提供了以下實現,可用作 QueueChannel
和 PriorityChannel
的持久化 MessageStore
:
關於
SimpleMessageStore 的注意事項從 4.1 版本開始, 現在,在聚合器等元件之外訪問組儲存的使用者會獲得聚合器正在使用的組的直接引用,而不是副本。在聚合器外部操作組可能會導致不可預測的結果。 因此,你應避免此類操作,或將 |
使用 MessageGroupFactory
從 4.3 版本開始,一些 MessageGroupStore
實現可以注入自定義的 MessageGroupFactory
策略,用於建立和自定義 MessageGroupStore
所使用的 MessageGroup
例項。預設情況下,這是 SimpleMessageGroupFactory
,它基於內部集合 GroupType.HASH_SET
(LinkedHashSet
) 生成 SimpleMessageGroup
例項。其他可能的選項包括 SYNCHRONISED_SET
和 BLOCKING_QUEUE
,其中最後一個可用於恢復之前的 SimpleMessageGroup
行為。此外,還提供了 PERSISTENT
選項。更多資訊請參閱下一節。從 5.0.1 版本開始,當組中訊息的順序和唯一性不重要時,還可以使用 LIST
選項。
持久化 MessageGroupStore
和延遲載入
從 4.3 版本開始,所有持久化 MessageGroupStore
例項都以延遲載入方式從儲存中檢索 MessageGroup
例項及其 messages
。在大多數情況下,這對於關聯 MessageHandler
例項(參見Aggregator和Resequencer)非常有用,因為每次關聯操作都從儲存中載入整個 MessageGroup
會增加開銷。
你可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
選項從配置中關閉延遲載入行為。
我們對 MongoDB MessageStore
(MongoDB Message Store)和 <aggregator>
(Aggregator)上的延遲載入進行的效能測試使用了類似於以下的自定義 release-strategy
。
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
它為 1000 條簡單訊息產生了類似於以下的結果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
然而,從 5.5 版本開始,所有持久化 MessageGroupStore
實現都提供了一個基於目標資料庫流式 API 的 streamMessagesForGroup(Object groupId)
契約。這在儲存中的組非常大時提高了資源利用率。在框架內部,這個新 API 在啟動時重新排程持久化訊息時被 Delayer(例如)使用。返回的 Stream<Message<?>>
必須在處理結束時關閉,例如透過 try-with-resources
的自動關閉。無論何時使用 PersistentMessageGroup
,其 streamMessages()
都委託給 MessageGroupStore.streamMessagesForGroup()
。
訊息組條件
從 5.5 版本開始,MessageGroup
抽象提供了一個 condition
字串選項。此選項的值可以是任何可在後續解析以出於任何原因對組做出決定的內容。例如,來自關聯訊息處理器的 ReleaseStrategy
可以直接查閱組的此屬性,而不是遍歷組中的所有訊息。MessageGroupStore
暴露了一個 setGroupCondition(Object groupId, String condition)
API。為此,已向 AbstractCorrelatingMessageHandler
添加了一個 setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
選項。此函式在訊息被新增到組後以及針對組的現有條件進行評估。實現可以決定返回一個新值、現有值或將目標條件重置為 null
。condition
的值可以是 JSON、SpEL 表示式、數字或任何可以序列化為字串並在之後解析的內容。例如,來自檔案聚合器元件的 FileMarkerReleaseStrategy
,從 FileSplitter.FileMarker.Mark.END
訊息的 FileHeaders.LINE_COUNT
header 中填充條件到組中,並在其 canRelease()
方法中查閱此條件,將組大小與此條件中的值進行比較。這樣,它就不需要遍歷組中的所有訊息來查詢帶有 FileHeaders.LINE_COUNT
header 的 FileSplitter.FileMarker.Mark.END
訊息。它還允許結束標記在所有其他記錄之前到達聚合器;例如在多執行緒環境中處理檔案時。
此外,為了配置方便,引入了 GroupConditionProvider
契約。AbstractCorrelatingMessageHandler
會檢查提供的 ReleaseStrategy
是否實現了此介面,並提取 conditionSupplier
以用於組條件評估邏輯。