Scatter-Gather

從 4.1 版本開始,Spring Integration 提供了 scatter-gather 企業整合模式的實現。它是一個複合端點,其目標是將訊息傳送給接收者並聚合結果。正如 Enterprise Integration Patterns 中所述,它是一個用於“最優報價”等場景的元件,在這種場景下,我們需要向多個供應商請求資訊並決定哪一個提供了最優惠的條款。

以前,可以使用離散元件配置該模式。此增強功能帶來了更方便的配置。

ScatterGatherHandler 是一個請求-回覆端點,它結合了 PublishSubscribeChannel(或 RecipientListRouter)和 AggregatingMessageHandler。請求訊息被髮送到 scatter 通道,ScatterGatherHandler 等待聚合器傳送到 outputChannel 的回覆。

功能

Scatter-Gather 模式提出了兩種場景:“拍賣(auction)”和“分發(distribution)”。在這兩種情況下,aggregation 功能是相同的,並提供了 AggregatingMessageHandler 的所有可用選項。(實際上,ScatterGatherHandler 只需要一個 AggregatingMessageHandler 作為建構函式引數)。有關更多資訊,請參見 Aggregator

拍賣

拍賣 Scatter-Gather 變體對請求訊息使用“釋出-訂閱”邏輯,其中“scatter”通道是一個 PublishSubscribeChannel,帶有 apply-sequence="true"。但是,此通道可以是任何 MessageChannel 實現(與 ContentEnricher 中的 request-channel 一樣 — 請參見 Content Enricher)。但是,在這種情況下,您應該為 aggregation 功能建立自己的自定義 correlationStrategy

分發

分發 Scatter-Gather 變體基於 RecipientListRouter(參見 RecipientListRouter),並擁有 RecipientListRouter 的所有可用選項。這是 ScatterGatherHandler 的第二個建構函式引數。如果您只想依賴於 recipient-list-routeraggregator 的預設 correlationStrategy,則應指定 apply-sequence="true"。否則,您應該為 aggregator 提供自定義的 correlationStrategy。與 PublishSubscribeChannel 變體(拍賣變體)不同,recipient-list-routerselector 選項允許根據訊息過濾目標供應商。使用 apply-sequence="true" 時,會提供預設的 sequenceSize,並且 aggregator 可以正確釋放組。分發選項與拍賣選項互斥。

applySequence=true 僅在基於 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 建構函式配置的普通 Java 配置中是必需的,因為框架無法修改外部提供的元件。為了方便起見,從 6.0 版本開始,用於 Scatter-Gather 的 XML 和 Java DSL 將 applySequence 設定為 true。

對於拍賣和分發兩種變體,請求(分散)訊息都會透過 gatherResultChannel 頭進行豐富,以等待來自 aggregator 的回覆訊息。

預設情況下,所有供應商應將其結果傳送到 replyChannel 頭(通常透過省略最終端點的 output-channel)。但是,還提供了 gatherChannel 選項,允許供應商將其回覆傳送到該通道進行聚合。

配置 Scatter-Gather 端點

以下示例展示了使用 Java 配置定義 Scatter-Gather bean 的方法

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的示例中,我們使用 applySequence="true" 和接收者通道列表配置了 RecipientListRouter distributor bean。下一個 bean 是用於 AggregatingMessageHandler 的。最後,我們將這兩個 bean 注入到 ScatterGatherHandler bean 定義中,並將其標記為 @ServiceActivator,以便將 Scatter-Gather 元件連線到整合流中。

以下示例展示瞭如何使用 XML 名稱空間配置 <scatter-gather> 端點

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 端點的 ID。ScatterGatherHandler bean 會註冊一個別名 id + '.handler'RecipientListRouter bean 會註冊一個別名 id + '.scatterer'AggregatingMessageHandler bean 會註冊一個別名 id + '.gatherer'。可選。(BeanFactory 會生成預設的 id 值。)
2 生命週期屬性,指示端點是否應在應用上下文初始化期間啟動。此外,ScatterGatherHandler 還實現了 Lifecycle 介面,並啟動和停止內部建立的 gatherEndpoint(如果提供了 gather-channel)。可選。(預設為 true。)
3 在哪個通道上接收請求訊息,以便在 ScatterGatherHandler 中處理它們。必需。
4 ScatterGatherHandler 將聚合結果傳送到的通道。可選。(入站訊息可以在 replyChannel 訊息頭中指定回覆通道)。
5 對於拍賣場景,用於傳送分散訊息的通道。可選。與 <scatterer> 子元素互斥。
6 用於接收每個供應商回覆訊息進行聚合的通道。它在分散訊息中作為 replyChannel 頭使用。可選。預設情況下,會建立一個 FixedSubscriberChannel
7 當多個處理器訂閱同一個 DirectChannel 時,此元件的順序(用於負載均衡目的)。可選。
8 指定端點應在哪個階段啟動和停止。啟動順序從最低到最高,停止順序從最高到最低。預設情況下,此值為 Integer.MAX_VALUE,這意味著此容器儘可能晚地啟動,並儘可能早地停止。可選。
9 output-channel 傳送回覆 Message 時等待的超時時間間隔。預設情況下,send() 會阻塞一秒鐘。它僅適用於輸出通道存在某些“傳送”限制的情況,例如容量固定的 QueueChannel 已滿。在這種情況下,會丟擲 MessageDeliveryException。對於 AbstractSubscribableChannel 實現,send-timeout 會被忽略。對於 group-timeout(-expression),來自計劃過期任務的 MessageDeliveryException 會導致該任務被重新排程。可選。
10 允許您指定 Scatter-Gather 在返回之前等待回覆訊息的時間。預設情況下,它會等待 30 秒。如果回覆超時,則返回 'null'。可選。
11 指定 Scatter-Gather 是否必須返回非 null 值。此值預設為 true。因此,當底層聚合器在 gather-timeout 後返回 null 值時,會丟擲 ReplyRequiredException。請注意,如果可能返回 null,則應指定 gather-timeout 以避免無限期等待。
12 <recipient-list-router> 選項。可選。與 scatter-channel 屬性互斥。
13 <aggregator> 選項。必需。

錯誤處理

由於 Scatter-Gather 是一個多請求-回覆元件,錯誤處理會增加一些複雜性。在某些情況下,如果 ReleaseStrategy 允許在回覆少於請求的情況下完成處理,則最好捕獲並忽略下游異常。在其他情況下,當發生錯誤時,應考慮從子流返回類似“補償訊息”的東西。

每個非同步子流都應配置 errorChannel 頭,以便 MessagePublishingErrorHandler 正確傳送錯誤訊息。否則,錯誤將傳送到全域性 errorChannel,並使用通用的錯誤處理邏輯。有關非同步錯誤處理的更多資訊,請參見 錯誤處理

同步流可以使用 ExpressionEvaluatingRequestHandlerAdvice 來忽略異常或返回補償訊息。當異常從一個子流拋到 ScatterGatherHandler 時,它會被重新拋到上游。這樣,所有其他子流的工作都將徒勞無功,它們的回覆將在 ScatterGatherHandler 中被忽略。有時這可能是預期的行為,但在大多數情況下,最好在特定的子流中處理錯誤,而不會影響所有其他子流以及 gatherer 中的預期。

從版本 5.1.3 開始,ScatterGatherHandler 提供了 errorChannelName 選項。它會被填充到分散訊息的 errorChannel 頭中,並在發生非同步錯誤時使用,或者可以在常規同步子流中用於直接傳送錯誤訊息。

以下示例配置展示了透過返回補償訊息進行非同步錯誤處理

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

為了生成正確的回覆,我們必須從由 MessagePublishingErrorHandler 傳送到 scatterGatherErrorChannelMessagingExceptionfailedMessage 中複製頭(包括 replyChannelerrorChannel)。這樣,目標異常將返回到 ScatterGatherHandler 的 gatherer,以便完成回覆訊息組。此類異常 payload 可以在 gatherer 的 MessageGroupProcessor 中被過濾掉,或者在 Scatter-Gather 端點之後的下游以其他方式處理。

在將分散結果傳送到 gatherer 之前,ScatterGatherHandler 會恢復請求訊息頭,包括回覆和錯誤通道(如果存在)。這樣,來自 AggregatingMessageHandler 的錯誤將傳播到呼叫者,即使在分散接收者子流中應用了非同步移交。為了成功操作,必須將 gatherResultChanneloriginalReplyChanneloriginalErrorChannel 頭傳回分散接收者子流的回覆中。在這種情況下,必須為 ScatterGatherHandler 配置一個合理的、有限的 gatherTimeout。否則,預設情況下它將無限期地阻塞,等待 gatherer 的回覆。