分散-聚合

從 4.1 版本開始,Spring Integration 提供了 分散-聚合 (scatter-gather) 企業整合模式的實現。它是一個複合端點,目標是將訊息傳送給多個接收者並聚合結果。正如 企業整合模式 中所述,它適用於“最優報價”等場景,在此類場景中,我們需要向多個供應商請求資訊,並決定哪一個能為所請求的商品提供最佳條款。

以前,該模式可以透過使用離散元件來配置。此項增強帶來了更方便的配置。

ScatterGatherHandler 是一個請求-回覆端點,它結合了 PublishSubscribeChannel(或 RecipientListRouter)和 AggregatingMessageHandler。請求訊息被髮送到 scatter 通道,ScatterGatherHandler 等待聚合器傳送到 outputChannel 的回覆。

功能

分散-聚合 模式提出了兩種場景:“拍賣”和“分發”。在這兩種情況下,聚合 功能是相同的,並提供了 AggregatingMessageHandler 可用的所有選項。(實際上,ScatterGatherHandler 僅需要 AggregatingMessageHandler 作為建構函式引數。)有關更多資訊,請參閱 聚合器

拍賣

拍賣 分散-聚合 變體使用“釋出-訂閱”邏輯處理請求訊息,其中“分散”通道是帶有 apply-sequence="true"PublishSubscribeChannel。然而,此通道可以是任何 MessageChannel 實現(就像 ContentEnricher 中的 request-channel 一樣 — 請參閱 內容增強器)。但是,在這種情況下,您應該為 聚合 功能建立自己的自定義 correlationStrategy

分發

分發 分散-聚合 變體基於 RecipientListRouter(請參閱 RecipientListRouter),並具有 RecipientListRouter 的所有可用選項。這是第二個 ScatterGatherHandler 建構函式引數。如果您只想依賴 recipient-list-router聚合器 的預設 correlationStrategy,則應指定 apply-sequence="true"。否則,您應該為 聚合器 提供自定義 correlationStrategy。與 PublishSubscribeChannel 變體(拍賣變體)不同,擁有 recipient-list-routerselector 選項允許根據訊息過濾目標供應商。使用 apply-sequence="true" 時,會提供預設的 sequenceSize,並且 聚合器 可以正確地釋放組。分發選項與拍賣選項互斥。

applySequence=true 僅在基於 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 建構函式配置的純 Java 配置中才需要,因為框架無法修改外部提供的元件。為了方便起見,從 6.0 版本開始,分散-聚合 的 XML 和 Java DSL 將 applySequence 設定為 true。

對於拍賣和分發變體,請求(分散)訊息會透過 gatherResultChannel 標頭進行豐富,以等待來自 聚合器 的回覆訊息。

預設情況下,所有供應商都應將其結果傳送到 replyChannel 標頭(通常透過省略最終端點的 output-channel)。但是,也提供了 gatherChannel 選項,允許供應商將其回覆傳送到該通道進行聚合。

配置分散-聚合端點

以下示例顯示了 分散-聚合 Bean 定義的 Java 配置

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的示例中,我們配置了 RecipientListRouterdistributor bean,其中 applySequence="true" 和接收者通道列表。下一個 bean 是 AggregatingMessageHandler。最後,我們將這兩個 bean 注入到 ScatterGatherHandler bean 定義中,並將其標記為 @ServiceActivator 以將分散-聚合元件連線到整合流中。

以下示例展示瞭如何使用 XML 名稱空間配置 <scatter-gather> 端點

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 端點的 ID。ScatterGatherHandler bean 使用 id + '.handler' 的別名註冊。RecipientListRouter bean 使用 id + '.scatterer' 的別名註冊。AggregatingMessageHandler bean 使用 id + '.gatherer' 的別名註冊。可選。(BeanFactory 生成預設的 id 值。)
2 生命週期屬性,表示端點是否應在應用程式上下文初始化期間啟動。此外,ScatterGatherHandler 也實現了 Lifecycle,如果提供了 gather-channel,它將啟動和停止內部建立的 gatherEndpoint。可選。(預設值為 true。)
3 用於在 ScatterGatherHandler 中接收請求訊息以進行處理的通道。必填。
4 ScatterGatherHandler 傳送聚合結果的通道。可選。(傳入訊息可以在 replyChannel 訊息頭中自行指定回覆通道)。
5 用於拍賣場景中傳送分散訊息的通道。可選。與 <scatterer> 子元素互斥。
6 用於接收每個供應商的回覆以進行聚合的通道。它用作分散訊息中的 replyChannel 標頭。可選。預設情況下,會建立 FixedSubscriberChannel
7 當多個處理程式訂閱到同一個 DirectChannel 時,此元件的順序(用於負載平衡目的)。可選。
8 指定端點應啟動和停止的階段。啟動順序從最低到最高,關閉順序從最高到最低。預設情況下,此值為 Integer.MAX_VALUE,這意味著此容器儘可能晚地啟動,儘可能早地停止。可選。
9 傳送回覆 Messageoutput-channel 時的等待超時時間。預設情況下,send() 會阻塞一秒鐘。它僅在輸出通道存在某些“傳送”限制時適用——例如,一個容量固定的 QueueChannel 已滿。在這種情況下,會丟擲 MessageDeliveryException。對於 AbstractSubscribableChannel 實現,send-timeout 將被忽略。對於 group-timeout(-expression),來自排程過期任務的 MessageDeliveryException 會導致此任務被重新排程。可選。
10 允許您指定分散-聚合在返回之前等待回覆訊息的時間。預設情況下,它會等待 30 秒。如果回覆超時,則返回 'null'。可選。
11 指定分散-聚合是否必須返回非空值。此值預設為 true。因此,當底層聚合器在 gather-timeout 後返回 null 值時,會丟擲 ReplyRequiredException。請注意,如果可能為 null,則應指定 gather-timeout 以避免無限期等待。
12 <recipient-list-router> 選項。可選。與 scatter-channel 屬性互斥。
13 <aggregator> 選項。必填。
從版本 6.5.3 開始,當 ScatterGatherHandler 配置為 async = true 選項時,請求訊息處理執行緒不再阻塞等待內部 ((PollableChannel) gatherResultChannel).receive(this.gatherTimeout) 操作上的聚合結果。相反,基於最終從 gatherResultChannel 生成的聚合結果,會返回一個 reactor.core.publisher.Mono 作為回覆物件。然後,該 Mono 將根據框架中的 Reactive Streams 支援 進行處理。

錯誤處理

由於分散-聚合是一個多請求-回覆元件,錯誤處理會變得更加複雜。在某些情況下,如果 ReleaseStrategy 允許程序以少於請求的回覆完成,最好只捕獲並忽略下游異常。在其他情況下,當發生錯誤時,應考慮使用“補償訊息”從子流返回。

每個非同步子流都應該配置 errorChannel 標頭,以便 MessagePublishingErrorHandler 正確傳送錯誤訊息。否則,錯誤將被髮送到具有通用錯誤處理邏輯的全域性 errorChannel。有關非同步錯誤處理的更多資訊,請參閱 錯誤處理

同步流可以使用 ExpressionEvaluatingRequestHandlerAdvice 來忽略異常或返回補償訊息。當其中一個子流丟擲異常到 ScatterGatherHandler 時,它會被重新丟擲到上游。這樣,所有其他子流將白費功夫,並且它們的回覆將在 ScatterGatherHandler 中被忽略。這有時可能是預期的行為,但在大多數情況下,最好在特定的子流中處理錯誤,而不會影響所有其他子流和聚合器中的預期。

從 5.1.3 版本開始,ScatterGatherHandler 提供了 errorChannelName 選項。它會被填充到分散訊息的 errorChannel 標頭中,並在發生非同步錯誤時使用,或者可以在常規同步子流中用於直接傳送錯誤訊息。

以下示例配置演示了透過返回補償訊息進行非同步錯誤處理

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

為了生成正確的回覆,我們必須從 MessagingExceptionfailedMessage 複製頭部(包括 replyChannelerrorChannel),該異常已由 MessagePublishingErrorHandler 傳送到 scatterGatherErrorChannel。這樣,目標異常將返回到 ScatterGatherHandler 的聚合器,以完成回覆訊息組。這樣的異常 payload 可以在聚合器的 MessageGroupProcessor 中過濾掉,或者在分散-聚合端點之後以其他方式向下遊處理。

在將分散結果傳送給聚合器之前,ScatterGatherHandler 會恢復請求訊息頭,包括回覆通道和錯誤通道(如果有)。這樣,來自 AggregatingMessageHandler 的錯誤將傳播給呼叫者,即使在分散接收者子流中應用了非同步傳遞。為了成功操作,gatherResultChanneloriginalReplyChanneloriginalErrorChannel 頭必須傳回分散接收者子流的回覆中。在這種情況下,必須為 ScatterGatherHandler 配置一個合理的、有限的 gatherTimeout。否則,預設情況下,它將無限期地阻塞等待來自聚合器的回覆。
© . This site is unofficial and not affiliated with VMware.