TCP 訊息關聯
閘道器
閘道器會自動關聯訊息。但是,對於相對低流量的應用程式,您應該使用出站閘道器。當您將連線工廠配置為對所有訊息對使用單個共享連線(“single-use="false"”)時,一次只能處理一個訊息。新訊息必須等到收到上一條訊息的回覆後才能處理。當連線工廠配置為每個新訊息使用新連線(“single-use="true"”)時,此限制不適用。雖然此設定可以比共享連線環境提供更高的吞吐量,但它帶來了為每個訊息對開啟和關閉新連線的開銷。
因此,對於大流量訊息,請考慮使用一對協作通道介面卡。但是,要這樣做,您需要提供協作邏輯。
Spring Integration 2.2 中引入的另一個解決方案是使用 CachingClientConnectionFactory,它允許使用共享連線池。
協作的出站和入站通道介面卡
為了實現高吞吐量(避免前面提到的使用閘道器的陷阱),您可以配置一對協作的出站和入站通道介面卡。您還可以使用協作介面卡(伺服器端或客戶端)進行完全非同步通訊(而不是請求-回覆語義)。在伺服器端,訊息關聯由介面卡自動處理,因為入站介面卡添加了一個訊息頭,允許出站介面卡在傳送回覆訊息時確定使用哪個連線。
在伺服器端,您必須填充 ip_connectionId 訊息頭,因為它用於將訊息與連線關聯。源自入站介面卡的訊息會自動設定此訊息頭。如果您希望構造其他要傳送的訊息,則需要設定此訊息頭。您可以從傳入訊息中獲取訊息頭值。 |
在客戶端,如果需要,應用程式必須提供自己的關聯邏輯。您可以透過多種方式實現這一點。
如果訊息負載具有一些自然的關聯資料(例如事務 ID 或訂單號),並且您不需要保留原始出站訊息中的任何資訊(例如回覆通道訊息頭),則關聯很簡單,並且無論如何都將在應用程式級別完成。
如果訊息負載具有一些自然的關聯資料(例如事務 ID 或訂單號),但您需要保留原始出站訊息中的一些資訊(例如回覆通道訊息頭),則可以保留原始出站訊息的副本(可能透過使用釋出-訂閱通道),並使用聚合器重新組合必要的資料。
對於前兩種情況,如果負載沒有自然的關聯資料,您可以在出站通道介面卡上游提供一個轉換器,以使用此類資料增強負載。此類轉換器可以將原始負載轉換為包含原始負載和訊息頭子集的新物件。當然,來自訊息頭的活物件(例如回覆通道)不能包含在轉換後的負載中。
如果您選擇這種策略,您需要確保連線工廠具有適當的序列化器-反序列化器對來處理此類負載(例如使用 Java 序列化的 DefaultSerializer 和 DefaultDeserializer,或自定義序列化器和反序列化器)。TCP 連線工廠中提到的 ByteArray*Serializer 選項(包括預設的 ByteArrayCrLfSerializer)不支援此類負載,除非轉換後的負載是 String 或 byte[]。
|
在 2.2 版本之前,當協作通道介面卡使用客戶端連線工廠時, 此預設行為在真正的非同步環境中不適用,因此它現在預設為無限超時。您可以透過將客戶端連線工廠上的 |
從 5.4 版本開始,多個出站通道介面卡和一個 TcpInboundChannelAdapter 可以共享同一個連線工廠。這允許應用程式同時支援請求/回覆和任意伺服器 → 客戶端訊息傳遞。有關更多資訊,請參閱TCP 閘道器。
傳輸訊息頭
TCP 是一種流協議。Serializers 和 Deserializers 在流中標記訊息。在 3.0 之前,只能透過 TCP 傳輸訊息負載(String 或 byte[])。從 3.0 開始,您可以傳輸選定的訊息頭以及負載。但是,“活”物件,例如 replyChannel 訊息頭,無法序列化。
透過 TCP 傳送訊息頭資訊需要一些額外的配置。
第一步是為 ConnectionFactory 提供一個 MessageConvertingTcpMessageMapper,它使用 mapper 屬性。此對映器委託給任何 MessageConverter 實現,將訊息轉換為可由配置的 serializer 和 deserializer 序列化和反序列化的物件,以及從該物件轉換訊息。
Spring Integration 提供了一個 MapMessageConverter,它允許指定新增到 Map 物件的頭部列表,以及負載。生成的 Map 有兩個條目:payload 和 headers。headers 條目本身是一個 Map,幷包含選定的頭部。
第二步是提供一個序列化器和反序列化器,它們可以在 Map 和某種線路格式之間進行轉換。這可以是自定義的 Serializer 或 Deserializer,如果對等系統不是 Spring Integration 應用程式,您通常需要這樣做。
Spring Integration 提供了一個 MapJsonSerializer,用於將 Map 轉換為 JSON,以及從 JSON 轉換回來。它使用 Spring Integration JsonObjectMapper。如果需要,您可以提供自定義的 JsonObjectMapper。預設情況下,序列化器在物件之間插入一個換行符(0x0a)字元。有關更多資訊,請參閱Javadoc。
JsonObjectMapper 使用類路徑上的任何版本的 Jackson。 |
您還可以透過使用 DefaultSerializer 和 DefaultDeserializer 來使用 Map 的標準 Java 序列化。
以下示例顯示了使用 JSON 傳輸 correlationId、sequenceNumber 和 sequenceSize 訊息頭的連線工廠配置。
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>
<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
使用上述配置傳送的訊息,負載為“something”,將線上路上顯示為
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}