訊息通道
除了帶有 EIP 方法的 IntegrationFlowBuilder 之外,Java DSL 還提供了一個流式 API 來配置 MessageChannel 例項。為此,提供了 MessageChannels 構建器工廠。以下示例展示瞭如何使用它
@Bean
public PriorityChannelSpec priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap());
}
在 IntegrationFlowBuilder 的 channel() EIP 方法中可以使用相同的 MessageChannels 構建器工廠來連線端點,類似於在 XML 配置中連線 input-channel/output-channel 對。預設情況下,端點與 DirectChannel 例項連線,其中 bean 名稱基於以下模式:[IntegrationFlow.beanName].channel#[channelNameIndex]。此規則也適用於透過內聯 MessageChannels 構建器工廠使用而產生的未命名通道。但是,所有 MessageChannels 方法都有一個變體,它知道 channelId,您可以使用它來設定 MessageChannel 例項的 bean 名稱。MessageChannel 引用和 beanName 可以用作 bean 方法呼叫。以下示例顯示了使用 channel() EIP 方法的可能方式
@Bean
public QueueChannelSpec queueChannel() {
return MessageChannels.queue();
}
@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
return MessageChannels.publishSubscribe();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")意味著“找到並使用 ID 為 'input' 的MessageChannel,或者建立一個”。 -
fixedSubscriberChannel()產生一個FixedSubscriberChannel例項並將其註冊為channelFlow.channel#0。 -
channel("queueChannel")的工作方式相同,但使用現有的queueChannelbean。 -
channel(publishSubscribe())是 bean 方法引用。 -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))是一個IntegrationFlowBuilder,它將IntegrationComponentSpec暴露給ExecutorChannel並將其註冊為executorChannel。 -
channel("output")註冊名為output的DirectChannelbean,只要不存在具有此名稱的 bean。
注意:前面的 IntegrationFlow 定義是有效的,它的所有通道都應用於帶有 BridgeHandler 例項的端點。
請注意在不同的 IntegrationFlow 例項中透過 MessageChannels 工廠使用相同的內聯通道定義。即使 DSL 解析器將不存在的物件註冊為 bean,它也無法從不同的 IntegrationFlow 容器中確定相同的物件 (MessageChannel)。以下示例是錯誤的 |
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
該錯誤示例的結果是以下異常
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
為了使其工作,您需要為該通道宣告 @Bean 並從不同的 IntegrationFlow 例項中使用其 bean 方法。