路由器實現

由於基於內容的路由通常需要一些特定於領域的邏輯,大多數用例都需要 Spring Integration 的選項,透過使用 XML 名稱空間支援或註解委託給 POJO。這兩者都將在後面討論。但是,我們首先介紹幾個滿足常見需求的實現。

PayloadTypeRouter

PayloadTypeRouter 根據有效載荷型別對映將訊息傳送到定義的通道,如以下示例所示:

<bean id="payloadTypeRouter"
      class="org.springframework.integration.router.PayloadTypeRouter">
    <property name="channelMapping">
        <map>
            <entry key="java.lang.String" value-ref="stringChannel"/>
            <entry key="java.lang.Integer" value-ref="integerChannel"/>
        </map>
    </property>
</bean>

PayloadTypeRouter 的配置也由 Spring Integration 提供的名稱空間支援(參見名稱空間支援),這透過將 <router/> 配置及其相應的實現(透過使用 <bean/> 元素定義)組合成一個更簡潔的配置元素,從而簡化了配置。以下示例展示了一個等同於上面但使用名稱空間支援的 PayloadTypeRouter 配置:

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String" channel="stringChannel" />
    <int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>

以下示例顯示了在 Java 中配置的等效路由器

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

使用 Java DSL 時,有兩種選擇。

首先,您可以定義路由器物件,如前面的示例所示:

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlow.from("routingChannel")
            .route(router())
            .get();
}

public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

請注意,路由器可以是 @Bean,但不是必須是。如果它不是 @Bean,流會註冊它。

其次,您可以在 DSL 流本身中定義路由函式,如以下示例所示:

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlow.from("routingChannel")
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(String.class, "stringChannel")
                    .channelMapping(Integer.class, "integerChannel"))
            .get();
}

HeaderValueRouter

HeaderValueRouter 根據單獨的頭部值對映將訊息傳送到通道。建立 HeaderValueRouter 時,它使用要評估的頭部名稱進行初始化。頭部的值可以是以下兩種情況之一:

  • 任意值

  • 通道名稱

如果它是一個任意值,則需要將這些頭部值對映到通道名稱的額外對映。否則,無需額外配置。

Spring Integration 提供了一個簡單的基於名稱空間的 XML 配置來配置 HeaderValueRouter。以下示例演示了當需要將頭部值對映到通道時 HeaderValueRouter 的配置:

<int:header-value-router input-channel="routingChannel" header-name="testHeader">
    <int:mapping value="someHeaderValue" channel="channelA" />
    <int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>

在解析過程中,前面示例中定義的路由器可能會遇到通道解析失敗,從而導致異常。如果您想抑制此類異常並將未解析的訊息傳送到預設輸出通道(由 default-output-channel 屬性標識),請將 resolution-required 設定為 false

通常,頭部值未明確對映到通道的訊息將傳送到 default-output-channel。但是,當頭部值對映到通道名稱但通道無法解析時,將 resolution-required 屬性設定為 false 會導致此類訊息路由到 default-output-channel

以下示例顯示了在 Java 中配置的等效路由器

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

使用 Java DSL 時,有兩種選擇。首先,您可以定義路由器物件,如前面的示例所示:

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlow.from("routingChannel")
            .route(router())
            .get();
}

public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

請注意,路由器可以是 @Bean,但不是必須是。如果它不是 @Bean,流會註冊它。

其次,您可以在 DSL 流本身中定義路由函式,如以下示例所示:

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlow.from("routingChannel")
            .route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
                    m -> m
                        .channelMapping("someHeaderValue", "channelA")
                        .channelMapping("someOtherHeaderValue", "channelB"),
                e -> e.id("headerValueRouter"))
            .get();
}

無需將頭部值對映到通道名稱的配置,因為頭部值本身表示通道名稱。以下示例展示了一個不需要將頭部值對映到通道名稱的路由器:

<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>

自 Spring Integration 2.1 起,通道解析的行為更加明確。例如,如果您省略 default-output-channel 屬性,路由器無法解析至少一個有效通道,並且透過將 resolution-required 設定為 false 忽略任何通道名稱解析失敗,那麼將丟擲 MessageDeliveryException

基本上,預設情況下,路由器必須能夠成功地將訊息路由到至少一個通道。如果您真的想丟棄訊息,您還必須將 default-output-channel 設定為 nullChannel

RecipientListRouter

RecipientListRouter 將每條接收到的訊息傳送到靜態定義的訊息通道列表。以下示例建立了一個 RecipientListRouter

<bean id="recipientListRouter"
      class="org.springframework.integration.router.RecipientListRouter">
    <property name="channels">
        <list>
            <ref bean="channel1"/>
            <ref bean="channel2"/>
            <ref bean="channel3"/>
        </list>
    </property>
</bean>

Spring Integration 還為 RecipientListRouter 配置提供名稱空間支援(請參閱 名稱空間支援),如下例所示:

<int:recipient-list-router id="customRouter" input-channel="routingChannel"
        timeout="1234"
        ignore-send-failures="true"
        apply-sequence="true">
  <int:recipient channel="channel1"/>
  <int:recipient channel="channel2"/>
</int:recipient-list-router>

以下示例顯示了在 Java 中配置的等效路由器

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
    RecipientListRouter router = new RecipientListRouter();
    router.setSendTimeout(1_234L);
    router.setIgnoreSendFailures(true);
    router.setApplySequence(true);
    router.addRecipient("channel1");
    router.addRecipient("channel2");
    router.addRecipient("channel3");
    return router;
}

以下示例展示了使用 Java DSL 配置的等效路由器:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlow.from("routingChannel")
            .routeToRecipients(r -> r
                    .applySequence(true)
                    .ignoreSendFailures(true)
                    .recipient("channel1")
                    .recipient("channel2")
                    .recipient("channel3")
                    .sendTimeout(1_234L))
            .get();
}
這裡的“apply-sequence”標誌與它對釋出-訂閱通道的作用相同,並且,與釋出-訂閱通道一樣,它在 recipient-list-router 上預設停用。有關更多資訊,請參閱 PublishSubscribeChannel 配置

配置 RecipientListRouter 的另一個便捷選項是使用 Spring Expression Language (SpEL) 支援作為各個接收者通道的選擇器。這樣做類似於在“鏈”的開頭使用過濾器作為“選擇性消費者”。然而,在這種情況下,所有這些都非常簡潔地組合到路由器的配置中,如下例所示:

<int:recipient-list-router id="customRouter" input-channel="routingChannel">
    <int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
    <int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>

在上述配置中,透過 selector-expression 屬性標識的 SpEL 表示式將進行評估,以確定此接收者是否應包含在給定輸入訊息的接收者列表中。表示式的評估結果必須是 boolean。如果未定義此屬性,則該通道始終在接收者列表中。

RecipientListRouterManagement

從版本 4.1 開始,RecipientListRouter 提供了幾種操作來在執行時動態操縱接收者。這些管理操作由 RecipientListRouterManagement 透過 @ManagedResource 註解提供。它們可以透過 控制匯流排 以及 JMX 使用,如下例所示:

<control-bus input-channel="controlBus"/>

<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
   <recipient channel="channel1"/>
</recipient-list-router>

<channel id="channel2"/>
Message<?> addRecipientCommandMessage =
                     MessageBuilder.withPayload("'simpleRouter.handler'.addRecipient")
                            .setHeader(IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS, List.of("channel2"))
                            .build();

從應用程式啟動開始,simpleRouter 只有一個 channel1 接收者。但在 addRecipient 命令之後,channel2 接收者被新增。這是一個“註冊對訊息一部分的興趣”的用例,我們可能在某個時間段對來自路由器的訊息感興趣,所以我們訂閱 recipient-list-router,並在某個時候決定取消訂閱。

由於 <recipient-list-router> 的執行時管理操作,它可以從一開始就配置而沒有任何 <recipient>。在這種情況下,當沒有匹配訊息的接收者時,RecipientListRouter 的行為相同。如果配置了 defaultOutputChannel,訊息將傳送到那裡。否則,將丟擲 MessageDeliveryException

XPath 路由器

XPath 路由器是 XML 模組的一部分。請參閱 使用 XPath 路由 XML 訊息

路由和錯誤處理

Spring Integration 還提供了一種特殊的基於型別的路由器,稱為 ErrorMessageExceptionTypeRouter,用於路由錯誤訊息(定義為有效負載為 Throwable 例項的訊息)。ErrorMessageExceptionTypeRouter 類似於 PayloadTypeRouter。事實上,它們幾乎相同。唯一的區別是,PayloadTypeRouter 遍歷有效負載例項的例項層次結構(例如,payload.getClass().getSuperclass())以查詢最具體的型別和通道對映,而 ErrorMessageExceptionTypeRouter 遍歷“異常原因”的層次結構(例如,payload.getCause())以查詢最具體的 Throwable 型別或通道對映,並使用 mappingClass.isInstance(cause) 來匹配 cause 到類或任何超類。

在這種情況下,通道對映順序很重要。因此,如果需要獲取 IllegalArgumentException 的對映,而不是 RuntimeException,則必須首先在路由器上配置後者。
自版本 4.3 起,ErrorMessageExceptionTypeRouter 在初始化階段載入所有對映類,以便在發生 ClassNotFoundException 時快速失敗。

以下示例顯示了 ErrorMessageExceptionTypeRouter 的示例配置:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • XML DSL

@Bean
public IntegrationFlow someFlow() {
    return f -> f
            .routeByException(r -> r
                 .channelMapping(IllegalArgumentException.class, "illegalChannel")
                 .channelMapping(NullPointerException.class, "npeChannel")
                 .defaultOutputChannel("defaultChannel"));
}
@Bean
fun someFlow() =
    integrationFlow {
        routeByException {
                    channelMapping(IllegalArgumentException::class.java, "illegalChannel")
                    channelMapping(NullPointerException::class.java, "npeChannel")
                    defaultOutputChannel("defaultChannel")
                }
    }
@Bean
someFlow() {
    integrationFlow {
        routeByException {
            channelMapping IllegalArgumentException, 'illegalChannel'
            channelMapping NullPointerException, 'npeChannel'
            defaultOutputChannel 'defaultChannel'
        }
    }
}
<int:exception-type-router input-channel="inputChannel"
                           default-output-channel="defaultChannel">
    <int:mapping exception-type="java.lang.IllegalArgumentException"
                 channel="illegalChannel"/>
    <int:mapping exception-type="java.lang.NullPointerException"
                 channel="npeChannel"/>
</int:exception-type-router>

<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />
© . This site is unofficial and not affiliated with VMware.