路由器實現

由於基於內容的路由通常需要一些領域特定的邏輯,因此大多數用例需要 Spring Integration 提供使用 XML 名稱空間支援或註解將委託給 POJO 的選項。稍後將討論這兩者。但是,我們首先介紹幾種滿足常見需求的實現。

PayloadTypeRouter

PayloadTypeRouter 根據負載型別對映將訊息傳送到通道,示例如下

<bean id="payloadTypeRouter"
      class="org.springframework.integration.router.PayloadTypeRouter">
    <property name="channelMapping">
        <map>
            <entry key="java.lang.String" value-ref="stringChannel"/>
            <entry key="java.lang.Integer" value-ref="integerChannel"/>
        </map>
    </property>
</bean>

Spring Integration 提供的名稱空間(參見 名稱空間支援)也支援 PayloadTypeRouter 的配置,它透過將 <router/> 配置及其相應的實現(使用 <bean/> 元素定義)組合成一個更簡潔的配置元素來簡化配置。以下示例顯示了一個等同於上述配置但使用名稱空間支援的 PayloadTypeRouter 配置

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String" channel="stringChannel" />
    <int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>

以下示例顯示了使用 Java 配置的等效路由器

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

使用 Java DSL 時,有兩種選項。

首先,您可以定義路由器物件,如上例所示

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlow.from("routingChannel")
            .route(router())
            .get();
}

public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

請注意,路由器可以是一個 @Bean,但不是必須的。如果它不是 @Bean,流程將註冊它。

其次,您可以在 DSL 流本身內部定義路由函式,示例如下

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlow.from("routingChannel")
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(String.class, "stringChannel")
                    .channelMapping(Integer.class, "integerChannel"))
            .get();
}

HeaderValueRouter

HeaderValueRouter 根據單個訊息頭值對映將訊息傳送到通道。建立 HeaderValueRouter 時,會使用要評估的訊息頭名稱進行初始化。訊息頭的值可以是以下兩種情況之一

  • 任意值

  • 通道名稱

如果它是任意值,則需要將這些訊息頭值附加對映到通道名稱。否則,無需額外的配置。

Spring Integration 提供了一個簡單的基於名稱空間的 XML 配置來配置 HeaderValueRouter。以下示例演示了當需要將訊息頭值對映到通道時 HeaderValueRouter 的配置

<int:header-value-router input-channel="routingChannel" header-name="testHeader">
    <int:mapping value="someHeaderValue" channel="channelA" />
    <int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>

在解析過程中,上面示例中定義的路由器可能會遇到通道解析失敗,從而引發異常。如果您想抑制此類異常並將未解析的訊息傳送到預設輸出通道(由 default-output-channel 屬性標識),請將 resolution-required 設定為 false

通常,訊息頭值未明確對映到通道的訊息會發送到 default-output-channel。但是,當訊息頭值已對映到通道名稱但無法解析該通道時,將 resolution-required 屬性設定為 false 會將此類訊息路由到 default-output-channel

以下示例顯示了使用 Java 配置的等效路由器

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

使用 Java DSL 時,有兩種選項。首先,您可以定義路由器物件,如上例所示

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlow.from("routingChannel")
            .route(router())
            .get();
}

public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

請注意,路由器可以是一個 @Bean,但不是必須的。如果它不是 @Bean,流程將註冊它。

其次,您可以在 DSL 流本身內部定義路由函式,示例如下

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlow.from("routingChannel")
            .route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
                    m -> m
                        .channelMapping("someHeaderValue", "channelA")
                        .channelMapping("someOtherHeaderValue", "channelB"),
                e -> e.id("headerValueRouter"))
            .get();
}

配置不需要將訊息頭值對映到通道名稱的情況,因為訊息頭值本身就代表通道名稱。以下示例顯示了一個不需要將訊息頭值對映到通道名稱的路由器

<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>

自 Spring Integration 2.1 起,解析通道的行為更加明確。例如,如果您省略了 default-output-channel 屬性,並且路由器無法解析至少一個有效通道,並且透過將 resolution-required 設定為 false 忽略了任何通道名稱解析失敗,那麼將丟擲 MessageDeliveryException

基本上,預設情況下,路由器必須能夠成功地將訊息路由到至少一個通道。如果您確實想丟棄訊息,則必須將 default-output-channel 設定為 nullChannel

RecipientListRouter

RecipientListRouter 將每條收到的訊息傳送到一個靜態定義的訊息通道列表。以下示例建立了一個 RecipientListRouter

<bean id="recipientListRouter"
      class="org.springframework.integration.router.RecipientListRouter">
    <property name="channels">
        <list>
            <ref bean="channel1"/>
            <ref bean="channel2"/>
            <ref bean="channel3"/>
        </list>
    </property>
</bean>

Spring Integration 也為 RecipientListRouter 配置提供了名稱空間支援(參見 名稱空間支援),示例如下

<int:recipient-list-router id="customRouter" input-channel="routingChannel"
        timeout="1234"
        ignore-send-failures="true"
        apply-sequence="true">
  <int:recipient channel="channel1"/>
  <int:recipient channel="channel2"/>
</int:recipient-list-router>

以下示例顯示了使用 Java 配置的等效路由器

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
    RecipientListRouter router = new RecipientListRouter();
    router.setSendTimeout(1_234L);
    router.setIgnoreSendFailures(true);
    router.setApplySequence(true);
    router.addRecipient("channel1");
    router.addRecipient("channel2");
    router.addRecipient("channel3");
    return router;
}

以下示例顯示了使用 Java DSL 配置的等效路由器

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlow.from("routingChannel")
            .routeToRecipients(r -> r
                    .applySequence(true)
                    .ignoreSendFailures(true)
                    .recipient("channel1")
                    .recipient("channel2")
                    .recipient("channel3")
                    .sendTimeout(1_234L))
            .get();
}
此處的 'apply-sequence' 標誌與 publish-subscribe-channel 的效果相同,並且與 publish-subscribe-channel 一樣,在 recipient-list-router 上預設停用。有關詳細資訊,請參見 PublishSubscribeChannel 配置

配置 RecipientListRouter 的另一個便捷選項是使用 Spring Expression Language (SpEL) 支援作為各個接收通道的選擇器。這樣做類似於在“鏈”的開頭使用過濾器充當“選擇性消費者”。然而,在這種情況下,它都相當簡潔地組合在路由器的配置中,示例如下

<int:recipient-list-router id="customRouter" input-channel="routingChannel">
    <int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
    <int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>

在上述配置中,由 selector-expression 屬性標識的 SpEL 表示式會被評估,以確定該接收者是否應包含在給定輸入訊息的接收者列表中。表示式的評估結果必須是 boolean 型別。如果未定義此屬性,則該通道始終位於接收者列表中。

RecipientListRouterManagement

從 4.1 版本開始,RecipientListRouter 提供了幾個操作,可以在執行時動態操縱接收者。這些管理操作透過 @ManagedResource 註解由 RecipientListRouterManagement 提供。它們可以透過 控制匯流排 以及 JMX 使用,示例如下

<control-bus input-channel="controlBus"/>

<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
   <recipient channel="channel1"/>
</recipient-list-router>

<channel id="channel2"/>
Message<?> addRecipientCommandMessage =
                     MessageBuilder.withPayload("'simpleRouter.handler'.addRecipient")
                            .setHeader(IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS, List.of("channel2"))
                            .build();

從應用啟動開始,simpleRouter 只有一個 channel1 接收者。但在執行 addRecipient 命令後,添加了 channel2 接收者。這是一個“註冊對訊息中某部分感興趣”的用例,當我們可能在某個時間段對來自路由器的訊息感興趣時,因此我們訂閱了 recipient-list-router,並在某個時候決定取消訂閱。

由於 <recipient-list-router> 的執行時管理操作,它可以從一開始就不配置任何 <recipient>。在這種情況下,當訊息沒有匹配的接收者時,RecipientListRouter 的行為與有匹配接收者時相同。如果配置了 defaultOutputChannel,訊息將被髮送到那裡。否則,將丟擲 MessageDeliveryException

XPath 路由器

XPath 路由器是 XML 模組的一部分。參見 使用 XPath 路由 XML 訊息

路由和錯誤處理

Spring Integration 還提供了一種特殊的基於型別的路由器,稱為 ErrorMessageExceptionTypeRouter,用於路由錯誤訊息(定義為負載為 Throwable 例項的訊息)。ErrorMessageExceptionTypeRouterPayloadTypeRouter 類似。事實上,它們幾乎相同。唯一的區別是,雖然 PayloadTypeRouter 導航負載例項的例項層級結構(例如,payload.getClass().getSuperclass())以找到最具體的型別和通道對映,但 ErrorMessageExceptionTypeRouter 導航“異常原因”的層級結構(例如,payload.getCause())以找到最具體的 Throwable 型別或通道對映,並使用 mappingClass.isInstance(cause)cause 與類或任何超類匹配。

在這種情況下,通道對映的順序很重要。因此,如果需要獲取 IllegalArgumentException 的對映而不是 RuntimeException 的對映,則必須先在路由器上配置後者。
自版本 4.3 起,ErrorMessageExceptionTypeRouter 在初始化階段載入所有對映類,以便在發生 ClassNotFoundException 時快速失敗。

以下示例顯示了 ErrorMessageExceptionTypeRouter 的示例配置

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • XML DSL

@Bean
public IntegrationFlow someFlow() {
    return f -> f
            .routeByException(r -> r
                 .channelMapping(IllegalArgumentException.class, "illegalChannel")
                 .channelMapping(NullPointerException.class, "npeChannel")
                 .defaultOutputChannel("defaultChannel"));
}
@Bean
fun someFlow() =
    integrationFlow {
        routeByException {
                    channelMapping(IllegalArgumentException::class.java, "illegalChannel")
                    channelMapping(NullPointerException::class.java, "npeChannel")
                    defaultOutputChannel("defaultChannel")
                }
    }
@Bean
someFlow() {
    integrationFlow {
        routeByException {
            channelMapping IllegalArgumentException, 'illegalChannel'
            channelMapping NullPointerException, 'npeChannel'
            defaultOutputChannel 'defaultChannel'
        }
    }
}
<int:exception-type-router input-channel="inputChannel"
                           default-output-channel="defaultChannel">
    <int:mapping exception-type="java.lang.IllegalArgumentException"
                 channel="illegalChannel"/>
    <int:mapping exception-type="java.lang.NullPointerException"
                 channel="npeChannel"/>
</int:exception-type-router>

<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />