TCP 訊息關聯
閘道器
閘道器會自動關聯訊息。但是,對於流量相對較低的應用,您應該使用出站閘道器。當您將連線工廠配置為對所有訊息對使用單個共享連線('single-use="false"')時,一次只能處理一個訊息。新訊息必須等待接收到前一個訊息的回覆後才能處理。當連線工廠配置為每個新訊息使用一個新的連線('single-use="true"')時,此限制不適用。儘管此設定可以提供比共享連線環境更高的吞吐量,但它會帶來為每對訊息開啟和關閉新連線的開銷。
因此,對於高流量訊息,考慮使用一對協作的通道介面卡。然而,這樣做您需要提供協作邏輯。
另一種解決方案,在 Spring Integration 2.2 中引入,是使用 CachingClientConnectionFactory
,它允許使用共享連線池。
協作的出站和入站通道介面卡
為了實現高吞吐量(避免使用閘道器的陷阱,如前面提到的),您可以配置一對協作的出站和入站通道介面卡。您還可以使用協作介面卡(伺服器端或客戶端)進行完全非同步通訊(而不是請求-回覆語義)。在伺服器端,訊息關聯由介面卡自動處理,因為入站介面卡添加了一個訊息頭,允許出站介面卡確定傳送回覆訊息時使用哪個連線。
在伺服器端,您必須填充 ip_connectionId 訊息頭,因為它用於將訊息與連線關聯起來。源自入站介面卡的訊息會自動設定此訊息頭。如果您希望構建其他訊息來發送,則需要設定此訊息頭。您可以從入站訊息中獲取訊息頭值。 |
在客戶端,如果需要,應用程式必須提供自己的關聯邏輯。
您可以透過多種方式實現。如果訊息載荷包含一些自然的關聯資料(例如事務 ID 或訂單號),並且您無需保留原始出站訊息中的任何資訊(例如回覆通道訊息頭),則關聯很簡單,並且無論如何都將在應用程式級別完成。
如果訊息載荷包含一些自然的關聯資料(例如事務 ID 或訂單號),但您需要保留原始出站訊息中的一些資訊(例如回覆通道訊息頭),您可以保留原始出站訊息的副本(例如使用釋出-訂閱通道),並使用聚合器重新組合所需的資料。
對於前面兩種情況中的任何一種,如果載荷沒有自然的關聯資料,您可以在出站通道介面卡的上游提供一個轉換器來增強載荷,新增此類資料。這樣的轉換器可以將原始載荷轉換為一個新物件,該物件包含原始載荷和部分訊息頭。當然,訊息頭中的“即時”物件(例如回覆通道)不能包含在轉換後的載荷中。
如果您選擇這種策略,需要確保連線工廠具有適當的序列化器-反序列化器對來處理此類載荷(例如使用 Java 序列化的 DefaultSerializer
和 DefaultDeserializer
,或自定義序列化器和反序列化器)。TCP 連線工廠中提到的 ByteArray*Serializer
選項,包括預設的 ByteArrayCrLfSerializer
,除非轉換後的載荷是 String
或 byte[]
,否則不支援此類載荷。
在 2.2 版本之前,當協作通道介面卡使用客戶端連線工廠時, 此預設行為在真正的非同步環境中不適用,因此現在預設為無限超時。您可以透過在客戶端連線工廠上將 |
從 5.4 版本開始,多個出站通道介面卡和一個 TcpInboundChannelAdapter
可以共享同一個連線工廠。這允許應用程式同時支援請求/回覆和任意的伺服器 → 客戶端訊息傳遞。更多資訊請參見TCP 閘道器。
傳輸訊息頭
TCP 是一種流協議。Serializers
和 Deserializers
在流中分隔訊息。在 3.0 之前,只能透過 TCP 傳輸訊息載荷(String
或 byte[]
)。從 3.0 版本開始,您也可以傳輸選定的訊息頭以及載荷。然而,“即時”物件,例如 replyChannel
訊息頭,無法序列化。
透過 TCP 傳送訊息頭資訊需要一些額外的配置。
第一步是為 ConnectionFactory
提供一個使用 mapper
屬性的 MessageConvertingTcpMessageMapper
。此 mapper 委託給任何 MessageConverter
實現,將訊息轉換為可透過配置的 serializer
和 deserializer
進行序列化和反序列化的物件,反之亦然。
Spring Integration 提供了一個 MapMessageConverter
,它允許指定一個訊息頭列表,這些訊息頭連同載荷一起新增到 Map
物件中。生成的 Map 有兩個條目:payload
和 headers
。headers
條目本身是一個 Map
,包含選定的訊息頭。
第二步是提供一個序列化器和反序列化器,可以在 Map
和某種線路格式之間進行轉換。這可以是一個自定義的 Serializer
或 Deserializer
,如果對端系統不是 Spring Integration 應用程式,通常需要這樣做。
Spring Integration 提供了一個 MapJsonSerializer
,用於將 Map
轉換為 JSON,反之亦然。它使用一個 Spring Integration JsonObjectMapper
。如果需要,您可以提供一個自定義的 JsonObjectMapper
。預設情況下,序列化器在物件之間插入換行符(0x0a
)。更多資訊請參見Javadoc。
JsonObjectMapper 使用類路徑中可用的任何版本的 Jackson 。 |
您也可以使用標準的 Java 序列化 Map
,透過使用 DefaultSerializer
和 DefaultDeserializer
。
以下示例展示了一個連線工廠的配置,該配置使用 JSON 傳輸 correlationId
、sequenceNumber
和 sequenceSize
訊息頭
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>
<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
使用前面配置傳送的訊息,載荷為 'something',在網路線路上將如下顯示
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}