拆分器
拆分器是一個元件,其作用是將一個訊息分割成多個部分,並將分割後的訊息傳送出去進行獨立處理。通常,它們是包含聚合器(aggregator)的管道中的上游生產者。
程式設計模型
執行拆分的 API 由一個基類組成:AbstractMessageSplitter
。它是一個 MessageHandler
實現,封裝了拆分器的通用功能,例如在生成的訊息上填充相應的訊息頭(CORRELATION_ID
, SEQUENCE_SIZE
, 和 SEQUENCE_NUMBER
)。這些資訊頭能夠跟蹤訊息及其處理結果(在典型場景中,這些資訊頭會被複制到由各種轉換端點生成的訊息中)。然後,這些值可以被使用,例如,由一個複合訊息處理器使用。
以下示例展示了 AbstractMessageSplitter
的一個片段
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在應用程式中實現特定的拆分器,你可以擴充套件 AbstractMessageSplitter
並實現 splitMessage
方法,該方法包含拆分訊息的邏輯。返回值可以是以下之一
-
一個訊息的
Collection
或陣列,或者一個迭代訊息的Iterable
(或Iterator
)。在這種情況下,訊息會作為訊息傳送(在填充了CORRELATION_ID
,SEQUENCE_SIZE
和SEQUENCE_NUMBER
之後)。使用這種方法可以讓你有更多的控制權,例如,可以在拆分過程中填充自定義訊息頭。 -
一個非訊息物件的
Collection
或陣列,或者一個迭代非訊息物件的Iterable
(或Iterator
)。它的工作方式與前一種情況類似,不同之處在於每個集合元素都被用作訊息的 payload。使用這種方法可以讓你專注於領域物件,而無需考慮訊息系統,並且可以生成更易於測試的程式碼。 -
一個
Message
或非訊息物件(但不是集合或陣列)。它的工作方式與之前的情況類似,不同之處在於只發送一個訊息。
在 Spring Integration 中,任何 POJO 都可以實現拆分演算法,前提是它定義了一個接受單個引數並有返回值的方法。在這種情況下,方法的返回值會按照前面的描述進行解釋。輸入引數可以是 Message
或簡單的 POJO。在後一種情況下,拆分器接收入站訊息的 payload。我們推薦這種方法,因為它將程式碼與 Spring Integration API 解耦,並且通常更容易測試。
迭代器
從版本 4.1 開始,AbstractMessageSplitter
支援 Iterator
型別用於拆分的值。請注意,在使用 Iterator
(或 Iterable
)時,我們無法獲取底層元素的數量,並且 SEQUENCE_SIZE
頭會被設定為 0
。這意味著 <aggregator>
的預設 SequenceSizeReleaseStrategy
將無法工作,並且拆分器生成的 CORRELATION_ID
對應的組將不會被釋放;它會保持在 incomplete
狀態。在這種情況下,你應該使用適當的自定義 ReleaseStrategy
,或者依賴 send-partial-result-on-expiry
以及 group-timeout
或 MessageGroupStoreReaper
。
從版本 5.0 開始,AbstractMessageSplitter
提供了受保護的 obtainSizeIfPossible()
方法,如果可能的話,允許確定 Iterable
和 Iterator
物件的大小。例如 XPathMessageSplitter
可以確定底層 NodeList
物件的大小。從版本 5.0.9 開始,此方法也能正確返回 com.fasterxml.jackson.core.TreeNode
的大小。
Iterator
物件非常有用,可以避免在拆分之前必須在記憶體中構建整個集合。例如,當使用迭代或流從外部系統(例如資料庫或 FTP MGET
)填充底層元素時。
Stream 和 Flux
從版本 5.0 開始,AbstractMessageSplitter
支援 Java Stream
和 Reactive Streams Publisher
型別用於拆分的值。在這種情況下,目標 Iterator
是基於它們的迭代功能構建的。
此外,如果拆分器的輸出通道是 ReactiveStreamsSubscribableChannel
的例項,AbstractMessageSplitter
會生成一個 Flux
結果而不是 Iterator
,並且輸出通道會訂閱此 Flux
,以便根據下游流的需求進行基於背壓的拆分。
從版本 5.2 開始,拆分器支援 discardChannel
選項,用於傳送那些拆分函式返回空容器(集合、陣列、流、Flux
等)的請求訊息。在這種情況下,沒有要傳送到 outputChannel
的元素可供迭代。null
的拆分結果仍然是流結束的指示符。
使用 Java、Groovy 和 Kotlin DSL 配置拆分器
基於 Message 及其可迭代 payload 的簡單拆分器示例,使用 DSL 配置
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
請參閱相關章節中關於 DSL 的更多資訊
使用 XML 配置拆分器
拆分器可以透過 XML 進行如下配置
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | 拆分器的 ID 是可選的。 |
2 | 引用應用上下文(application context)中定義的一個 bean。該 bean 必須實現拆分邏輯,如前面章節所述。可選。如果未提供對 bean 的引用,則假定到達 input-channel 的訊息的 payload 是 java.util.Collection 的實現,並將預設的拆分邏輯應用於該集合,將每個單獨的元素合併到一個訊息中併發送到 output-channel 。 |
3 | 實現拆分邏輯的方法(在 bean 上定義)。可選。 |
4 | 拆分器的入站通道。必需。 |
5 | 拆分器將入站訊息拆分結果傳送到的通道。可選(因為入站訊息本身可以指定回覆通道)。 |
6 | 在拆分結果為空時傳送請求訊息的通道。可選(在這種情況下,它們會像 null 結果一樣停止)。 |
如果自定義拆分器實現可以在其他 <splitter>
定義中引用,我們建議使用 ref
屬性。但是,如果自定義拆分器處理器實現應僅限於單個 <splitter>
定義的範圍,你可以配置一個內部 bean 定義,如下例所示
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
在同一個 <int:splitter> 配置中同時使用 ref 屬性和內部處理器定義是不允許的,因為它會建立模糊條件並導致丟擲異常。 |
如果 ref 屬性引用了一個擴充套件自 AbstractMessageProducingHandler 的 bean(例如框架本身提供的拆分器),則透過直接將輸出通道注入到處理器中來最佳化配置。在這種情況下,每個 ref 必須是單獨的 bean 例項(或 prototype 作用域的 bean),或者使用內部 <bean/> 配置型別。但是,此最佳化僅在你未在拆分器 XML 定義中提供任何拆分器特定屬性時適用。如果你不小心從多個 bean 引用了同一個訊息處理器,你會得到一個配置異常。 |
使用註解配置拆分器
@Splitter
註解適用於期望 Message
型別或訊息 payload 型別作為引數,並且方法返回值應為任何型別的 Collection
的方法。如果返回的值不是實際的 Message
物件,則每個元素將被包裝在一個 Message
中作為訊息的 payload。每個結果訊息都會發送到定義了 @Splitter
的端點指定的輸出通道。
以下示例展示瞭如何使用 @Splitter
註解配置拆分器
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}
另請參閱使用註解為端點提供 Advice 和檔案拆分器。