子流程支援
if…else 和 publish-subscribe 元件中的一些提供了使用子流程指定其邏輯或對映的能力。最簡單的示例是 .publishSubscribeChannel(),如下例所示
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
您可以使用單獨的 IntegrationFlow @Bean 定義達到相同的效果,但我們希望您覺得子流程樣式的邏輯組合很有用。我們發現它能使程式碼更短(因此更易讀)。
從 5.3 版本開始,提供了基於 BroadcastCapableChannel 的 publishSubscribeChannel() 實現,用於在由 Broker 支援的訊息通道上配置子流程訂閱者。例如,我們現在可以在 Jms.publishSubscribeChannel() 上將幾個訂閱者配置為子流程
@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub");
}
@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel,
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
類似的 publish-subscribe 子流程組合提供了 .routeToRecipients() 方法。
另一個例子是在 .filter() 方法上使用 .discardFlow() 而不是 .discardChannel()。
.route() 值得特別關注。考慮以下示例
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
.channelMapping() 繼續像在常規 Router 對映中那樣工作,但是 .subFlowMapping() 將該子流程繫結到主流程。換句話說,任何路由器的子流程在 .route() 之後都會返回到主流程。
有時,您需要從 .subFlowMapping() 中引用一個現有的 IntegrationFlow @Bean。以下示例展示瞭如何做到這一點
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. 當您將子流程配置為 lambda 表示式時,框架會處理與子流程的請求-回覆互動,並且不需要閘道器。 |
子流程可以巢狀到任何深度,但我們不建議這樣做。實際上,即使在路由器的情況下,在一個流程中新增複雜的子流程很快就會看起來像一盤義大利麵條,並且人類難以解析。
在 DSL 支援子流程配置的情況下,當配置的元件通常需要一個通道,並且該子流程以 channel() 元素開頭時,框架會在元件輸出通道和流程的輸入通道之間隱式地放置一個 bridge()。例如,在此 filter 定義中
框架會在內部建立一個 DirectChannel bean,用於注入到 MessageFilter.discardChannel 中。然後將子流程包裝到一個以該隱式通道作為訂閱起點的 IntegrationFlow 中,並在流程中指定的 channel() 之前放置一個 bridge。當使用現有的 IntegrationFlow bean 作為子流程引用時(而不是內聯子流程,例如 lambda),則不需要這樣的 bridge,因為框架可以從 flow bean 中解析出第一個通道。對於內聯子流程,輸入通道尚不可用。 |