訊息通道

除了帶有 EIP 方法的 IntegrationFlowBuilder 外,Java DSL 還提供了一個流暢的 API 來配置 MessageChannel 例項。為此,提供了 MessageChannels 構建器工廠。以下示例展示瞭如何使用它

@Bean
public PriorityChannelSpec priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap());
}

相同的 MessageChannels 構建器工廠可以在 IntegrationFlowBuilderchannel() EIP 方法中使用,以連線端點,類似於 XML 配置中的 input-channel/output-channel 對。預設情況下,端點使用 DirectChannel 例項連線,其 bean 名稱基於以下模式:[IntegrationFlow.beanName].channel#[channelNameIndex]。此規則也適用於內聯使用 MessageChannels 構建器工廠生成的未命名通道。然而,所有 MessageChannels 方法都有一個知道 channelId 的變體,您可以使用它來設定 MessageChannel 例項的 bean 名稱。MessageChannel 引用和 beanName 可以用作 bean 方法呼叫。以下示例展示了使用 channel() EIP 方法的可能方式

@Bean
public QueueChannelSpec queueChannel() {
    return MessageChannels.queue();
}

@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
    return MessageChannels.publishSubscribe();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlow.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input") 意味著“查詢並使用 ID 為 "input" 的 MessageChannel,或者建立一個”

  • fixedSubscriberChannel() 生成一個 FixedSubscriberChannel 例項並將其註冊為名為 channelFlow.channel#0

  • channel("queueChannel") 的工作方式相同,但使用現有的 queueChannel bean。

  • channel(publishSubscribe()) 是 bean 方法引用。

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor))IntegrationFlowBuilder,它將 IntegrationComponentSpec 暴露給 ExecutorChannel 並將其註冊為 executorChannel

  • channel("output") 註冊一個名為 outputDirectChannel bean,前提是不存在同名的 bean。

注意:上述 IntegrationFlow 定義是有效的,並且其所有通道都應用於帶有 BridgeHandler 例項的端點。

請注意不要在不同的 IntegrationFlow 例項中透過 MessageChannels 工廠使用相同的內聯通道定義。即使 DSL 解析器將不存在的物件註冊為 bean,它也無法確定來自不同 IntegrationFlow 容器的同一物件(MessageChannel)。以下示例是錯誤的
@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlow.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

該錯誤示例的結果是以下異常

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

要使其工作,您需要為該通道宣告 @Bean,並在不同的 IntegrationFlow 例項中使用其 bean 方法。