路由器實現
由於基於內容的路由通常需要一些特定於領域的邏輯,大多數用例都需要 Spring Integration 的選項,透過使用 XML 名稱空間支援或註解委託給 POJO。這兩者都將在後面討論。但是,我們首先介紹幾個滿足常見需求的實現。
PayloadTypeRouter
PayloadTypeRouter 根據有效載荷型別對映將訊息傳送到定義的通道,如以下示例所示:
<bean id="payloadTypeRouter"
class="org.springframework.integration.router.PayloadTypeRouter">
<property name="channelMapping">
<map>
<entry key="java.lang.String" value-ref="stringChannel"/>
<entry key="java.lang.Integer" value-ref="integerChannel"/>
</map>
</property>
</bean>
PayloadTypeRouter 的配置也由 Spring Integration 提供的名稱空間支援(參見名稱空間支援),這透過將 <router/> 配置及其相應的實現(透過使用 <bean/> 元素定義)組合成一個更簡潔的配置元素,從而簡化了配置。以下示例展示了一個等同於上面但使用名稱空間支援的 PayloadTypeRouter 配置:
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="stringChannel" />
<int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>
以下示例顯示了在 Java 中配置的等效路由器
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
使用 Java DSL 時,有兩種選擇。
首先,您可以定義路由器物件,如前面的示例所示:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
請注意,路由器可以是 @Bean,但不是必須是。如果它不是 @Bean,流會註冊它。
其次,您可以在 DSL 流本身中定義路由函式,如以下示例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(String.class, "stringChannel")
.channelMapping(Integer.class, "integerChannel"))
.get();
}
HeaderValueRouter
HeaderValueRouter 根據單獨的頭部值對映將訊息傳送到通道。建立 HeaderValueRouter 時,它使用要評估的頭部名稱進行初始化。頭部的值可以是以下兩種情況之一:
-
任意值
-
通道名稱
如果它是一個任意值,則需要將這些頭部值對映到通道名稱的額外對映。否則,無需額外配置。
Spring Integration 提供了一個簡單的基於名稱空間的 XML 配置來配置 HeaderValueRouter。以下示例演示了當需要將頭部值對映到通道時 HeaderValueRouter 的配置:
<int:header-value-router input-channel="routingChannel" header-name="testHeader">
<int:mapping value="someHeaderValue" channel="channelA" />
<int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>
在解析過程中,前面示例中定義的路由器可能會遇到通道解析失敗,從而導致異常。如果您想抑制此類異常並將未解析的訊息傳送到預設輸出通道(由 default-output-channel 屬性標識),請將 resolution-required 設定為 false。
通常,頭部值未明確對映到通道的訊息將傳送到 default-output-channel。但是,當頭部值對映到通道名稱但通道無法解析時,將 resolution-required 屬性設定為 false 會導致此類訊息路由到 default-output-channel。
以下示例顯示了在 Java 中配置的等效路由器
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
使用 Java DSL 時,有兩種選擇。首先,您可以定義路由器物件,如前面的示例所示:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
請注意,路由器可以是 @Bean,但不是必須是。如果它不是 @Bean,流會註冊它。
其次,您可以在 DSL 流本身中定義路由函式,如以下示例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
m -> m
.channelMapping("someHeaderValue", "channelA")
.channelMapping("someOtherHeaderValue", "channelB"),
e -> e.id("headerValueRouter"))
.get();
}
無需將頭部值對映到通道名稱的配置,因為頭部值本身表示通道名稱。以下示例展示了一個不需要將頭部值對映到通道名稱的路由器:
<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>
|
自 Spring Integration 2.1 起,通道解析的行為更加明確。例如,如果您省略 基本上,預設情況下,路由器必須能夠成功地將訊息路由到至少一個通道。如果您真的想丟棄訊息,您還必須將 |
RecipientListRouter
RecipientListRouter 將每條接收到的訊息傳送到靜態定義的訊息通道列表。以下示例建立了一個 RecipientListRouter:
<bean id="recipientListRouter"
class="org.springframework.integration.router.RecipientListRouter">
<property name="channels">
<list>
<ref bean="channel1"/>
<ref bean="channel2"/>
<ref bean="channel3"/>
</list>
</property>
</bean>
Spring Integration 還為 RecipientListRouter 配置提供名稱空間支援(請參閱 名稱空間支援),如下例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel"
timeout="1234"
ignore-send-failures="true"
apply-sequence="true">
<int:recipient channel="channel1"/>
<int:recipient channel="channel2"/>
</int:recipient-list-router>
以下示例顯示了在 Java 中配置的等效路由器
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
RecipientListRouter router = new RecipientListRouter();
router.setSendTimeout(1_234L);
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channel1");
router.addRecipient("channel2");
router.addRecipient("channel3");
return router;
}
以下示例展示了使用 Java DSL 配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.routeToRecipients(r -> r
.applySequence(true)
.ignoreSendFailures(true)
.recipient("channel1")
.recipient("channel2")
.recipient("channel3")
.sendTimeout(1_234L))
.get();
}
這裡的“apply-sequence”標誌與它對釋出-訂閱通道的作用相同,並且,與釋出-訂閱通道一樣,它在 recipient-list-router 上預設停用。有關更多資訊,請參閱 PublishSubscribeChannel 配置。 |
配置 RecipientListRouter 的另一個便捷選項是使用 Spring Expression Language (SpEL) 支援作為各個接收者通道的選擇器。這樣做類似於在“鏈”的開頭使用過濾器作為“選擇性消費者”。然而,在這種情況下,所有這些都非常簡潔地組合到路由器的配置中,如下例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel">
<int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
<int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>
在上述配置中,透過 selector-expression 屬性標識的 SpEL 表示式將進行評估,以確定此接收者是否應包含在給定輸入訊息的接收者列表中。表示式的評估結果必須是 boolean。如果未定義此屬性,則該通道始終在接收者列表中。
RecipientListRouterManagement
從版本 4.1 開始,RecipientListRouter 提供了幾種操作來在執行時動態操縱接收者。這些管理操作由 RecipientListRouterManagement 透過 @ManagedResource 註解提供。它們可以透過 控制匯流排 以及 JMX 使用,如下例所示:
<control-bus input-channel="controlBus"/>
<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
<recipient channel="channel1"/>
</recipient-list-router>
<channel id="channel2"/>
Message<?> addRecipientCommandMessage =
MessageBuilder.withPayload("'simpleRouter.handler'.addRecipient")
.setHeader(IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS, List.of("channel2"))
.build();
從應用程式啟動開始,simpleRouter 只有一個 channel1 接收者。但在 addRecipient 命令之後,channel2 接收者被新增。這是一個“註冊對訊息一部分的興趣”的用例,我們可能在某個時間段對來自路由器的訊息感興趣,所以我們訂閱 recipient-list-router,並在某個時候決定取消訂閱。
由於 <recipient-list-router> 的執行時管理操作,它可以從一開始就配置而沒有任何 <recipient>。在這種情況下,當沒有匹配訊息的接收者時,RecipientListRouter 的行為相同。如果配置了 defaultOutputChannel,訊息將傳送到那裡。否則,將丟擲 MessageDeliveryException。
XPath 路由器
XPath 路由器是 XML 模組的一部分。請參閱 使用 XPath 路由 XML 訊息。
路由和錯誤處理
Spring Integration 還提供了一種特殊的基於型別的路由器,稱為 ErrorMessageExceptionTypeRouter,用於路由錯誤訊息(定義為有效負載為 Throwable 例項的訊息)。ErrorMessageExceptionTypeRouter 類似於 PayloadTypeRouter。事實上,它們幾乎相同。唯一的區別是,PayloadTypeRouter 遍歷有效負載例項的例項層次結構(例如,payload.getClass().getSuperclass())以查詢最具體的型別和通道對映,而 ErrorMessageExceptionTypeRouter 遍歷“異常原因”的層次結構(例如,payload.getCause())以查詢最具體的 Throwable 型別或通道對映,並使用 mappingClass.isInstance(cause) 來匹配 cause 到類或任何超類。
在這種情況下,通道對映順序很重要。因此,如果需要獲取 IllegalArgumentException 的對映,而不是 RuntimeException,則必須首先在路由器上配置後者。 |
自版本 4.3 起,ErrorMessageExceptionTypeRouter 在初始化階段載入所有對映類,以便在發生 ClassNotFoundException 時快速失敗。 |
以下示例顯示了 ErrorMessageExceptionTypeRouter 的示例配置:
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
-
XML DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f
.routeByException(r -> r
.channelMapping(IllegalArgumentException.class, "illegalChannel")
.channelMapping(NullPointerException.class, "npeChannel")
.defaultOutputChannel("defaultChannel"));
}
@Bean
fun someFlow() =
integrationFlow {
routeByException {
channelMapping(IllegalArgumentException::class.java, "illegalChannel")
channelMapping(NullPointerException::class.java, "npeChannel")
defaultOutputChannel("defaultChannel")
}
}
@Bean
someFlow() {
integrationFlow {
routeByException {
channelMapping IllegalArgumentException, 'illegalChannel'
channelMapping NullPointerException, 'npeChannel'
defaultOutputChannel 'defaultChannel'
}
}
}
<int:exception-type-router input-channel="inputChannel"
default-output-channel="defaultChannel">
<int:mapping exception-type="java.lang.IllegalArgumentException"
channel="illegalChannel"/>
<int:mapping exception-type="java.lang.NullPointerException"
channel="npeChannel"/>
</int:exception-type-router>
<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />