動態路由器

Spring Integration 為常見的基於內容的路由用例提供了相當多的不同路由器配置,以及將自定義路由器實現為 POJO 的選項。例如,PayloadTypeRouter 提供了一種簡單的方式來配置路由器,該路由器根據入站訊息的負載型別計算通道,而 HeaderValueRouter 在配置透過評估特定訊息頭的價值來計算通道的路由器方面提供了同樣的便利。還有基於表示式 (SpEL) 的路由器,其中通道是根據評估表示式來確定的。所有這些型別的路由器都表現出一定的動態特性。

然而,這些路由器都需要靜態配置。即使是基於表示式的路由器,表示式本身也是路由器配置的一部分,這意味著作用於相同值的相同表示式總是會計算出相同的通道。這在大多數情況下是可以接受的,因為這樣的路由是明確定義的,因此是可預測的。但有時我們需要動態地改變路由器配置,以便將訊息流路由到不同的通道。

例如,您可能需要關閉系統的一部分進行維護,並臨時將訊息重新路由到不同的訊息流。另一個例子是,您可能希望透過新增另一條路由來處理更具體的 java.lang.Number 型別(在使用 PayloadTypeRouter 的情況下),從而為訊息流引入更多粒度。

不幸的是,如果使用靜態路由器配置來實現這兩個目標中的任何一個,您將不得不關閉整個應用程式,更改路由器配置(更改路由),然後再重新啟動應用程式。這顯然不是任何人想要看到的解決方案。

動態路由器 模式描述了在不關閉系統或單個路由器的情況下動態更改或配置路由器的機制。

在深入探討 Spring Integration 如何支援動態路由的具體細節之前,我們需要考慮路由器的典型流程:

  1. 計算一個通道識別符號,這是路由器接收到訊息後計算出的一個值。通常,它是一個 String 或實際的 MessageChannel 例項。

  2. 將通道識別符號解析為通道名稱。我們將在本節後面描述此過程的具體細節。

  3. 將通道名稱解析為實際的 MessageChannel 例項

如果步驟 1 的結果是 MessageChannel 的實際例項,那麼在動態路由方面就沒多少可做的了,因為 MessageChannel 是任何路由器工作的最終產物。然而,如果第一步的結果是通道識別符號而不是 MessageChannel 的例項,那麼您有很多可能的方式來影響獲取 MessageChannel 的過程。考慮以下負載型別路由器的示例:

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String"  channel="channel1" />
    <int:mapping type="java.lang.Integer" channel="channel2" />
</int:payload-type-router>

在負載型別路由器的上下文中,前面提到的三個步驟將實現如下:

  1. 計算一個通道識別符號,它是負載型別的完全限定名(例如,java.lang.String)。

  2. 將通道識別符號解析為通道名稱,其中上一步的結果用於從 mapping 元素中定義的負載型別對映中選擇適當的值。

  3. 將通道名稱解析為實際的 MessageChannel 例項,它是應用程式上下文中由上一步結果標識的 bean 的引用(希望該 bean 是一個 MessageChannel)。

換句話說,每一步都為下一步提供輸入,直到整個過程完成。

現在考慮一個頭值路由器的示例:

<int:header-value-router input-channel="inputChannel" header-name="testHeader">
    <int:mapping value="foo" channel="fooChannel" />
    <int:mapping value="bar" channel="barChannel" />
</int:header-value-router>

現在我們可以考慮這三個步驟如何應用於頭值路由器:

  1. 計算一個通道識別符號,它是透過 header-name 屬性標識的頭的值。

  2. 將通道識別符號解析為通道名稱,其中上一步的結果用於從 mapping 元素中定義的通用對映中選擇適當的值。

  3. 將通道名稱解析為實際的 MessageChannel 例項,它是應用程式上下文中由上一步結果標識的 bean 的引用(希望該 bean 是一個 MessageChannel)。

前面兩種不同路由器型別的配置看起來幾乎完全相同。然而,如果您檢視 HeaderValueRouter 的另一種配置,我們可以清楚地看到沒有 mapping 子元素,如下面的列表所示:

<int:header-value-router input-channel="inputChannel" header-name="testHeader"/>

然而,此配置仍然完全有效。那麼很自然的問題是,第二步的對映在哪裡?

第二步現在是可選的。如果未定義 mapping,則第一步計算出的通道識別符號值會自動被視為 channel name,然後像第三步中那樣解析為實際的 MessageChannel。這也意味著第二步是為路由器提供動態特性的關鍵步驟之一,因為它引入了一個過程,允許您改變通道識別符號解析為通道名稱的方式,從而影響從初始通道識別符號確定最終 MessageChannel 例項的過程。

例如,在前面的配置中,假設 testHeader 的值是 'kermit',它現在是一個通道識別符號(第一步)。由於此路由器中沒有對映,將此通道識別符號解析為通道名稱(第二步)是不可能的,並且此通道識別符號現在被視為通道名稱。然而,如果存在對映但針對的是不同的值呢?最終結果仍然相同,因為如果在將通道識別符號解析為通道名稱的過程中無法確定新值,則通道識別符號將成為通道名稱。

剩下要做的就是第三步,將通道名稱 ('kermit') 解析為此名稱標識的 MessageChannel 的實際例項。這基本上涉及對提供的名稱進行 bean 查詢。現在,所有包含 testHeader=kermit 頭-值對的訊息都將路由到 bean 名稱(其 id)為 'kermit' 的 MessageChannel

但是,如果您想將這些訊息路由到 'simpson' 通道怎麼辦?顯然,更改靜態配置是可行的,但這需要關閉系統。然而,如果您可以訪問通道識別符號對映,您可以引入一個新的對映,其中頭-值對現在是 kermit=simpson,這樣第二步就可以將 'kermit' 作為通道識別符號,並將其解析為 'simpson' 作為通道名稱。

同樣的道理也適用於 PayloadTypeRouter,您現在可以重新對映或移除特定的負載型別對映。實際上,它適用於所有其他路由器,包括基於表示式的路由器,因為它們的計算值現在有機會透過第二步解析為實際的 channel name

任何是 AbstractMappingMessageRouter 子類的路由器(包括大多數框架定義的路由器)都是動態路由器,因為 channelMapping 是在 AbstractMappingMessageRouter 級別定義的。該對映的 setter 方法作為公共方法公開,同時還有 'setChannelMapping' 和 'removeChannelMapping' 方法。只要您擁有路由器本身的引用,這些方法就可以讓您在執行時更改、新增和刪除路由器對映。這也意味著您可以透過 JMX(參見 JMX 支援)或 Spring Integration 控制匯流排(參見 控制匯流排)功能暴露這些相同的配置選項。

將通道鍵作為通道名稱回退是靈活方便的。但是,如果您不信任訊息建立者,惡意行為者(瞭解系統的人)可能會建立一個訊息,該訊息被路由到意外的通道。例如,如果鍵設定為路由器的輸入通道名稱,此類訊息將被路由迴路由器,最終導致堆疊溢位錯誤。因此,您可能希望停用此功能(將 channelKeyFallback 屬性設定為 false),並在需要時更改對映。

使用控制匯流排管理路由器對映

管理路由器對映的一種方法是透過 控制匯流排 模式,該模式暴露了一個控制通道,您可以透過該通道傳送控制訊息來管理和監視 Spring Integration 元件,包括路由器。

有關控制匯流排的更多資訊,請參見 控制匯流排

通常,您會發送一條控制訊息,請求在特定受管元件(例如路由器)上呼叫特定操作。以下受管操作(方法)專門用於更改路由器解析過程:

  • public void setChannelMapping(String key, String channelName): 允許您新增新的或修改現有的 channel identifierchannel name 之間的對映

  • public void removeChannelMapping(String key): 允許您移除特定的通道對映,從而斷開 channel identifierchannel name 之間的關聯

請注意,這些方法可用於簡單的更改(例如更新單個路由或新增或移除路由)。但是,如果您想移除一個路由並新增另一個路由,則更新不是原子的。這意味著在更新之間,路由表可能處於不確定狀態。從 4.0 版本開始,您現在可以使用控制匯流排原子地更新整個路由表。以下方法可實現此目的:

  • public Map<String, String>getChannelMappings(): 返回當前的對映。

  • public void replaceChannelMappings(Properties channelMappings): 更新對映。請注意,channelMappings 引數是一個 Properties 物件,因此必須將其新增到相應的 IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS 頭中

Properties newMapping = new Properties();
newMapping.setProperty("foo", "bar");
newMapping.setProperty("baz", "qux");
Message<?> replaceChannelMappingsCommandMessage =
                     MessageBuilder.withPayload("'router.handler'.replaceChannelMappings")
                            .setHeader(IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS, List.of(newMapping))
                            .build();

對於對映的程式設計更改,由於型別安全問題,我們建議您使用 setChannelMappings 方法。replaceChannelMappings 會忽略非 String 物件的鍵或值。

使用 JMX 管理路由器對映

您還可以使用 Spring 的 JMX 支援來暴露路由器例項,然後使用您喜歡的 JMX 客戶端(例如 JConsole)來管理那些用於更改路由器配置的操作(方法)。

有關 Spring Integration 的 JMX 支援的更多資訊,請參見 JMX 支援