分流器
分揀器是一個元件,其作用是將訊息分成幾個部分,並將生成的訊息傳送出去進行獨立處理。通常,它們是包含聚合器的管道中的上游生產者。
程式設計模型
執行分揀的 API 由一個基類 AbstractMessageSplitter 組成。它是一個 MessageHandler 實現,封裝了分揀器共有的特性,例如在生成的訊息中填充適當的訊息頭(CORRELATION_ID、SEQUENCE_SIZE 和 SEQUENCE_NUMBER)。此填充功能可以跟蹤訊息及其處理結果(在典型場景中,這些頭會被複制到由各種轉換端點生成的訊息中)。這些值可以被例如 組合訊息處理器 使用。
以下示例顯示了 AbstractMessageSplitter 的一個摘錄
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在應用程式中實現特定的分揀器,您可以擴充套件 AbstractMessageSplitter 並實現 splitMessage 方法,該方法包含分揀訊息的邏輯。返回值可以是以下之一:
-
Collection或訊息陣列,或迭代訊息的Iterable(或Iterator)。在這種情況下,訊息以訊息的形式傳送(在填充CORRELATION_ID、SEQUENCE_SIZE和SEQUENCE_NUMBER之後)。使用此方法可以為您提供更多控制權——例如,在分揀過程中填充自定義訊息頭。 -
非訊息物件的
Collection或陣列,或迭代非訊息物件的Iterable(或Iterator)。它與前一種情況類似,只是每個集合元素都用作訊息負載。使用此方法可以讓您專注於領域物件,而無需考慮訊息系統,並生成更易於測試的程式碼。 -
一個
Message或非訊息物件(但不是集合或陣列)。它與前幾種情況類似,只是傳送一條訊息。
在 Spring Integration 中,任何 POJO 都可以實現分揀演算法,前提是它定義了一個接受單個引數並具有返回值的 方法。在這種情況下,方法的返回值將如前所述進行解釋。輸入引數可以是 Message 或簡單的 POJO。在後一種情況下,分揀器接收傳入訊息的有效負載。我們推薦這種方法,因為它將程式碼與 Spring Integration API 解耦,並且通常更容易測試。
迭代器
從 4.1 版本開始,AbstractMessageSplitter 支援 Iterator 型別來分揀 value。請注意,在 Iterator(或 Iterable)的情況下,我們無法訪問底層項的數量,並且 SEQUENCE_SIZE 訊息頭被設定為 0。這意味著 <aggregator> 的預設 SequenceSizeReleaseStrategy 將不起作用,並且來自 splitter 的 CORRELATION_ID 對應的組將不會被釋放;它將保持為 incomplete。在這種情況下,您應該使用適當的自定義 ReleaseStrategy 或依賴 send-partial-result-on-expiry 以及 group-timeout 或 MessageGroupStoreReaper。
從 5.0 版本開始,AbstractMessageSplitter 提供了 protected obtainSizeIfPossible() 方法,如果可能的話,可以確定 Iterable 和 Iterator 物件的大小。例如,XPathMessageSplitter 可以確定底層 NodeList 物件的大小。從 5.0.9 版本開始,此方法也正確返回 com.fasterxml.jackson.core.TreeNode 的大小。
Iterator 物件有助於避免在分揀之前需要在記憶體中構建整個集合。例如,當底層項透過迭代或流從某些外部系統(例如資料庫或 FTP MGET)填充時。
Stream 和 Flux
從 5.0 版本開始,AbstractMessageSplitter 支援 Java Stream 和 Reactive Streams Publisher 型別來分揀 value。在這種情況下,目標 Iterator 是基於它們的迭代功能構建的。
此外,如果分揀器的輸出通道是 ReactiveStreamsSubscribableChannel 的例項,則 AbstractMessageSplitter 會生成 Flux 結果而不是 Iterator,並且輸出通道將訂閱此 Flux,以根據下游流需求進行基於背壓的分揀。
從 5.2 版本開始,分揀器支援 discardChannel 選項,用於傳送那些拆分函式返回空容器(集合、陣列、流、Flux 等)的請求訊息。在這種情況下,沒有要迭代傳送到 outputChannel 的項。null 拆分結果仍然是流結束的指示。
使用 Java、Groovy 和 Kotlin DSL 配置分揀器
基於 Message 及其可迭代有效負載的簡單分揀器示例,使用 DSL 配置
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
有關 DSL 的更多資訊,請參閱相關章節
使用 XML 配置分揀器
分揀器可以透過 XML 如下配置
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
| 1 | 分揀器的 ID 是可選的。 |
| 2 | 對應用程式上下文中定義的 bean 的引用。該 bean 必須實現分揀邏輯,如前一節所述。可選。如果未提供對 bean 的引用,則假定到達 input-channel 的訊息的有效負載是 java.util.Collection 的實現,並將預設分揀邏輯應用於該集合,將每個單獨的元素合併到一條訊息中並將其傳送到 output-channel。 |
| 3 | 實現分揀邏輯的方法(在 bean 上定義)。可選。 |
| 4 | 分揀器的輸入通道。必需。 |
| 5 | 分揀器將傳入訊息的分揀結果傳送到的通道。可選(因為傳入訊息本身可以指定回覆通道)。 |
| 6 | 如果拆分結果為空,請求訊息將被髮送到的通道。可選(它們將像 null 結果一樣停止)。 |
如果自定義分揀器實現可以在其他 <splitter> 定義中引用,我們建議使用 ref 屬性。但是,如果自定義分揀器處理程式實現應僅限於 <splitter> 的單個定義,您可以配置內部 bean 定義,如下例所示
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
在同一個 <int:splitter> 配置中同時使用 ref 屬性和內部處理程式定義是不允許的,因為它會建立模糊條件並導致丟擲異常。 |
如果 ref 屬性引用了一個擴充套件 AbstractMessageProducingHandler 的 bean(例如框架本身提供的分揀器),則透過將輸出通道直接注入處理程式來最佳化配置。在這種情況下,每個 ref 必須是獨立的 bean 例項(或 prototype 作用域的 bean),或者使用內部 <bean/> 配置型別。但是,此最佳化僅在您未在分揀器 XML 定義中提供任何分揀器特定屬性時才適用。如果您不小心從多個 bean 引用了同一個訊息處理程式,您將得到一個配置異常。 |