Scatter-Gather
從 4.1 版本開始,Spring Integration 提供了 scatter-gather 企業整合模式的實現。它是一個複合端點,其目標是將訊息傳送給接收者並聚合結果。正如 Enterprise Integration Patterns 中所述,它是一個用於“最優報價”等場景的元件,在這種場景下,我們需要向多個供應商請求資訊並決定哪一個提供了最優惠的條款。
以前,可以使用離散元件配置該模式。此增強功能帶來了更方便的配置。
ScatterGatherHandler
是一個請求-回覆端點,它結合了 PublishSubscribeChannel
(或 RecipientListRouter
)和 AggregatingMessageHandler
。請求訊息被髮送到 scatter
通道,ScatterGatherHandler
等待聚合器傳送到 outputChannel
的回覆。
功能
Scatter-Gather
模式提出了兩種場景:“拍賣(auction)”和“分發(distribution)”。在這兩種情況下,aggregation
功能是相同的,並提供了 AggregatingMessageHandler
的所有可用選項。(實際上,ScatterGatherHandler
只需要一個 AggregatingMessageHandler
作為建構函式引數)。有關更多資訊,請參見 Aggregator。
拍賣
拍賣 Scatter-Gather
變體對請求訊息使用“釋出-訂閱”邏輯,其中“scatter”通道是一個 PublishSubscribeChannel
,帶有 apply-sequence="true"
。但是,此通道可以是任何 MessageChannel
實現(與 ContentEnricher
中的 request-channel
一樣 — 請參見 Content Enricher)。但是,在這種情況下,您應該為 aggregation
功能建立自己的自定義 correlationStrategy
。
分發
分發 Scatter-Gather
變體基於 RecipientListRouter
(參見 RecipientListRouter
),並擁有 RecipientListRouter
的所有可用選項。這是 ScatterGatherHandler
的第二個建構函式引數。如果您只想依賴於 recipient-list-router
和 aggregator
的預設 correlationStrategy
,則應指定 apply-sequence="true"
。否則,您應該為 aggregator
提供自定義的 correlationStrategy
。與 PublishSubscribeChannel
變體(拍賣變體)不同,recipient-list-router
的 selector
選項允許根據訊息過濾目標供應商。使用 apply-sequence="true"
時,會提供預設的 sequenceSize
,並且 aggregator
可以正確釋放組。分發選項與拍賣選項互斥。
applySequence=true 僅在基於 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 建構函式配置的普通 Java 配置中是必需的,因為框架無法修改外部提供的元件。為了方便起見,從 6.0 版本開始,用於 Scatter-Gather 的 XML 和 Java DSL 將 applySequence 設定為 true。 |
對於拍賣和分發兩種變體,請求(分散)訊息都會透過 gatherResultChannel
頭進行豐富,以等待來自 aggregator
的回覆訊息。
預設情況下,所有供應商應將其結果傳送到 replyChannel
頭(通常透過省略最終端點的 output-channel
)。但是,還提供了 gatherChannel
選項,允許供應商將其回覆傳送到該通道進行聚合。
配置 Scatter-Gather 端點
以下示例展示了使用 Java 配置定義 Scatter-Gather
bean 的方法
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我們使用 applySequence="true"
和接收者通道列表配置了 RecipientListRouter
distributor
bean。下一個 bean 是用於 AggregatingMessageHandler
的。最後,我們將這兩個 bean 注入到 ScatterGatherHandler
bean 定義中,並將其標記為 @ServiceActivator
,以便將 Scatter-Gather 元件連線到整合流中。
以下示例展示瞭如何使用 XML 名稱空間配置 <scatter-gather>
端點
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
1 | 端點的 ID。ScatterGatherHandler bean 會註冊一個別名 id + '.handler' 。RecipientListRouter bean 會註冊一個別名 id + '.scatterer' 。AggregatingMessageHandler bean 會註冊一個別名 id + '.gatherer' 。可選。(BeanFactory 會生成預設的 id 值。) |
2 | 生命週期屬性,指示端點是否應在應用上下文初始化期間啟動。此外,ScatterGatherHandler 還實現了 Lifecycle 介面,並啟動和停止內部建立的 gatherEndpoint (如果提供了 gather-channel )。可選。(預設為 true 。) |
3 | 在哪個通道上接收請求訊息,以便在 ScatterGatherHandler 中處理它們。必需。 |
4 | ScatterGatherHandler 將聚合結果傳送到的通道。可選。(入站訊息可以在 replyChannel 訊息頭中指定回覆通道)。 |
5 | 對於拍賣場景,用於傳送分散訊息的通道。可選。與 <scatterer> 子元素互斥。 |
6 | 用於接收每個供應商回覆訊息進行聚合的通道。它在分散訊息中作為 replyChannel 頭使用。可選。預設情況下,會建立一個 FixedSubscriberChannel 。 |
7 | 當多個處理器訂閱同一個 DirectChannel 時,此元件的順序(用於負載均衡目的)。可選。 |
8 | 指定端點應在哪個階段啟動和停止。啟動順序從最低到最高,停止順序從最高到最低。預設情況下,此值為 Integer.MAX_VALUE ,這意味著此容器儘可能晚地啟動,並儘可能早地停止。可選。 |
9 | 向 output-channel 傳送回覆 Message 時等待的超時時間間隔。預設情況下,send() 會阻塞一秒鐘。它僅適用於輸出通道存在某些“傳送”限制的情況,例如容量固定的 QueueChannel 已滿。在這種情況下,會丟擲 MessageDeliveryException 。對於 AbstractSubscribableChannel 實現,send-timeout 會被忽略。對於 group-timeout(-expression) ,來自計劃過期任務的 MessageDeliveryException 會導致該任務被重新排程。可選。 |
10 | 允許您指定 Scatter-Gather 在返回之前等待回覆訊息的時間。預設情況下,它會等待 30 秒。如果回覆超時,則返回 'null'。可選。 |
11 | 指定 Scatter-Gather 是否必須返回非 null 值。此值預設為 true 。因此,當底層聚合器在 gather-timeout 後返回 null 值時,會丟擲 ReplyRequiredException 。請注意,如果可能返回 null ,則應指定 gather-timeout 以避免無限期等待。 |
12 | <recipient-list-router> 選項。可選。與 scatter-channel 屬性互斥。 |
13 | <aggregator> 選項。必需。 |
錯誤處理
由於 Scatter-Gather 是一個多請求-回覆元件,錯誤處理會增加一些複雜性。在某些情況下,如果 ReleaseStrategy
允許在回覆少於請求的情況下完成處理,則最好捕獲並忽略下游異常。在其他情況下,當發生錯誤時,應考慮從子流返回類似“補償訊息”的東西。
每個非同步子流都應配置 errorChannel
頭,以便 MessagePublishingErrorHandler
正確傳送錯誤訊息。否則,錯誤將傳送到全域性 errorChannel
,並使用通用的錯誤處理邏輯。有關非同步錯誤處理的更多資訊,請參見 錯誤處理。
同步流可以使用 ExpressionEvaluatingRequestHandlerAdvice
來忽略異常或返回補償訊息。當異常從一個子流拋到 ScatterGatherHandler
時,它會被重新拋到上游。這樣,所有其他子流的工作都將徒勞無功,它們的回覆將在 ScatterGatherHandler
中被忽略。有時這可能是預期的行為,但在大多數情況下,最好在特定的子流中處理錯誤,而不會影響所有其他子流以及 gatherer 中的預期。
從版本 5.1.3 開始,ScatterGatherHandler
提供了 errorChannelName
選項。它會被填充到分散訊息的 errorChannel
頭中,並在發生非同步錯誤時使用,或者可以在常規同步子流中用於直接傳送錯誤訊息。
以下示例配置展示了透過返回補償訊息進行非同步錯誤處理
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
為了生成正確的回覆,我們必須從由 MessagePublishingErrorHandler
傳送到 scatterGatherErrorChannel
的 MessagingException
的 failedMessage
中複製頭(包括 replyChannel
和 errorChannel
)。這樣,目標異常將返回到 ScatterGatherHandler
的 gatherer,以便完成回覆訊息組。此類異常 payload
可以在 gatherer 的 MessageGroupProcessor
中被過濾掉,或者在 Scatter-Gather 端點之後的下游以其他方式處理。
在將分散結果傳送到 gatherer 之前,ScatterGatherHandler 會恢復請求訊息頭,包括回覆和錯誤通道(如果存在)。這樣,來自 AggregatingMessageHandler 的錯誤將傳播到呼叫者,即使在分散接收者子流中應用了非同步移交。為了成功操作,必須將 gatherResultChannel 、originalReplyChannel 和 originalErrorChannel 頭傳回分散接收者子流的回覆中。在這種情況下,必須為 ScatterGatherHandler 配置一個合理的、有限的 gatherTimeout 。否則,預設情況下它將無限期地阻塞,等待 gatherer 的回覆。 |