訊息儲存

企業整合模式》(EIP) 一書指出了幾種具有緩衝訊息能力的模式。例如,聚合器會緩衝訊息直到它們可以被釋放,而一個 QueueChannel 會緩衝訊息直到消費者明確地從該通道接收這些訊息。由於訊息流中任何點都可能發生故障,緩衝訊息的 EIP 元件也引入了訊息可能丟失的點。

為了降低訊息丟失的風險,EIP 定義了訊息儲存模式,該模式允許 EIP 元件儲存訊息,通常儲存在某種型別的持久化儲存中(例如 RDBMS)。

Spring Integration 透過以下方式提供對訊息儲存模式的支援:

  • 定義一個 org.springframework.integration.store.MessageStore 策略介面

  • 提供該介面的多種實現

  • 在所有具有緩衝訊息能力的元件上暴露一個 message-store 屬性,以便你可以注入實現 MessageStore 介面的任何例項。

如何在手冊中配置特定的訊息儲存實現以及如何將 MessageStore 實現注入到特定的緩衝元件的詳細資訊都在手冊的各個部分有描述(參見特定元件,例如QueueChannelAggregatorDelayer 等)。下面這對示例展示瞭如何為 QueueChannel 和聚合器新增訊息儲存的引用。

QueueChannel
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>
聚合器
<int:aggregator message-store="refToMessageStore"/>

預設情況下,訊息透過使用 o.s.i.store.SimpleMessageStore(一個 MessageStore 實現)儲存在記憶體中。這對於開發或簡單的低流量環境來說可能沒問題,在這些環境中,非持久化訊息潛在的丟失不是問題。然而,典型的生產應用程式需要一個更健壯的選項,不僅能降低訊息丟失的風險,還能避免潛在的記憶體溢位錯誤。因此,我們也為各種資料儲存提供了 MessageStore 實現。以下是支援的實現的完整列表:

然而,在使用 MessageStore 的持久化實現時,請注意一些限制。

訊息資料(payload 和 headers)的序列化和反序列化使用不同的序列化策略,具體取決於 MessageStore 的實現。例如,當使用 JdbcMessageStore 時,預設情況下只持久化 Serializable 資料。在這種情況下,不可序列化的 headers 在序列化發生之前會被移除。此外,請注意由傳輸介面卡(例如 FTP、HTTP、JMS 等)注入的協議特定 headers。例如,<http:inbound-channel-adapter/> 將 HTTP headers 對映到訊息 headers 中,其中一個是非序列化的 org.springframework.http.MediaType 例項的 ArrayList。但是,你可以將自己實現的 SerializerDeserializer 策略介面注入到某些 MessageStore 實現中(例如 JdbcMessageStore),以改變序列化和反序列化的行為。

特別注意那些表示某些型別資料的 headers。例如,如果某個 header 包含某個 Spring bean 的例項,反序列化後,你可能會得到該 bean 的不同例項,這會直接影響框架建立的一些隱式 headers(例如 REPLY_CHANNELERROR_CHANNEL)。目前,它們是不可序列化的,但即使可序列化,反序列化後的通道也不會代表預期的例項。

從 Spring Integration 3.0 版本開始,你可以使用配置為在向 HeaderChannelRegistry 註冊通道後用名稱替換這些 headers 的 header enricher 來解決此問題。

另外,考慮當你按如下方式配置訊息流時會發生什麼:gateway → queue-channel(由持久化 Message Store 支援)→ service-activator。該 gateway 建立了一個臨時回覆通道,當 service-activator 的 poller 從佇列讀取時,這個通道就丟失了。同樣,你可以使用 header enricher 將 headers 替換為 String 表示。

更多資訊,請參閱Header Enricher

Spring Integration 4.0 引入了兩個新介面:

  • ChannelMessageStore: 實現 QueueChannel 例項特有的操作

  • PriorityCapableChannelMessageStore: 標記用於 PriorityChannel 例項的 MessageStore 實現,併為持久化訊息提供優先順序排序。

實際行為取決於具體實現。框架提供了以下實現,可用作 QueueChannelPriorityChannel 的持久化 MessageStore

關於 SimpleMessageStore 的注意事項

從 4.1 版本開始,SimpleMessageStore 在呼叫 getMessageGroup() 時不再複製訊息組。對於大型訊息組,這是一個顯著的效能問題。4.0.1 版本引入了一個布林型別的 copyOnGet 屬性,允許你控制此行為。當聚合器在內部使用時,此屬性被設定為 false 以提高效能。現在它的預設值是 false

現在,在聚合器等元件之外訪問組儲存的使用者會獲得聚合器正在使用的組的直接引用,而不是副本。在聚合器外部操作組可能會導致不可預測的結果。

因此,你應避免此類操作,或將 copyOnGet 屬性設定為 true

使用 MessageGroupFactory

從 4.3 版本開始,一些 MessageGroupStore 實現可以注入自定義的 MessageGroupFactory 策略,用於建立和自定義 MessageGroupStore 所使用的 MessageGroup 例項。預設情況下,這是 SimpleMessageGroupFactory,它基於內部集合 GroupType.HASH_SET (LinkedHashSet) 生成 SimpleMessageGroup 例項。其他可能的選項包括 SYNCHRONISED_SETBLOCKING_QUEUE,其中最後一個可用於恢復之前的 SimpleMessageGroup 行為。此外,還提供了 PERSISTENT 選項。更多資訊請參閱下一節。從 5.0.1 版本開始,當組中訊息的順序和唯一性不重要時,還可以使用 LIST 選項。

持久化 MessageGroupStore 和延遲載入

從 4.3 版本開始,所有持久化 MessageGroupStore 例項都以延遲載入方式從儲存中檢索 MessageGroup 例項及其 messages。在大多數情況下,這對於關聯 MessageHandler 例項(參見AggregatorResequencer)非常有用,因為每次關聯操作都從儲存中載入整個 MessageGroup 會增加開銷。

你可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false) 選項從配置中關閉延遲載入行為。

我們對 MongoDB MessageStoreMongoDB Message Store)和 <aggregator>Aggregator)上的延遲載入進行的效能測試使用了類似於以下的自定義 release-strategy

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

它為 1000 條簡單訊息產生了類似於以下的結果:

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager
...

然而,從 5.5 版本開始,所有持久化 MessageGroupStore 實現都提供了一個基於目標資料庫流式 API 的 streamMessagesForGroup(Object groupId) 契約。這在儲存中的組非常大時提高了資源利用率。在框架內部,這個新 API 在啟動時重新排程持久化訊息時被 Delayer(例如)使用。返回的 Stream<Message<?>> 必須在處理結束時關閉,例如透過 try-with-resources 的自動關閉。無論何時使用 PersistentMessageGroup,其 streamMessages() 都委託給 MessageGroupStore.streamMessagesForGroup()

訊息組條件

從 5.5 版本開始,MessageGroup 抽象提供了一個 condition 字串選項。此選項的值可以是任何可在後續解析以出於任何原因對組做出決定的內容。例如,來自關聯訊息處理器ReleaseStrategy 可以直接查閱組的此屬性,而不是遍歷組中的所有訊息。MessageGroupStore 暴露了一個 setGroupCondition(Object groupId, String condition) API。為此,已向 AbstractCorrelatingMessageHandler 添加了一個 setGroupConditionSupplier(BiFunction<Message<?>, String, String>) 選項。此函式在訊息被新增到組後以及針對組的現有條件進行評估。實現可以決定返回一個新值、現有值或將目標條件重置為 nullcondition 的值可以是 JSON、SpEL 表示式、數字或任何可以序列化為字串並在之後解析的內容。例如,來自檔案聚合器元件的 FileMarkerReleaseStrategy,從 FileSplitter.FileMarker.Mark.END 訊息的 FileHeaders.LINE_COUNT header 中填充條件到組中,並在其 canRelease() 方法中查閱此條件,將組大小與此條件中的值進行比較。這樣,它就不需要遍歷組中的所有訊息來查詢帶有 FileHeaders.LINE_COUNT header 的 FileSplitter.FileMarker.Mark.END 訊息。它還允許結束標記在所有其他記錄之前到達聚合器;例如在多執行緒環境中處理檔案時。

此外,為了配置方便,引入了 GroupConditionProvider 契約。AbstractCorrelatingMessageHandler 會檢查提供的 ReleaseStrategy 是否實現了此介面,並提取 conditionSupplier 以用於組條件評估邏輯。