TCP 訊息關聯

IP 端點的一個目標是與非 Spring Integration 應用程式的系統進行通訊。因此,預設情況下只發送和接收訊息載荷。從 3.0 版本開始,您可以使用 JSON、Java 序列化或自定義序列化器和反序列化器來傳輸訊息頭。更多資訊請參見傳輸訊息頭。框架本身(除了使用閘道器時)或伺服器端的協作通道介面卡不提供訊息關聯。在本文件後面部分,我們將討論應用程式可用的各種關聯技術。在大多數情況下,即使訊息載荷包含一些自然的關聯資料(例如訂單號),這也需要特定的應用程式級訊息關聯。

閘道器

閘道器會自動關聯訊息。但是,對於流量相對較低的應用,您應該使用出站閘道器。當您將連線工廠配置為對所有訊息對使用單個共享連線('single-use="false"')時,一次只能處理一個訊息。新訊息必須等待接收到前一個訊息的回覆後才能處理。當連線工廠配置為每個新訊息使用一個新的連線('single-use="true"')時,此限制不適用。儘管此設定可以提供比共享連線環境更高的吞吐量,但它會帶來為每對訊息開啟和關閉新連線的開銷。

因此,對於高流量訊息,考慮使用一對協作的通道介面卡。然而,這樣做您需要提供協作邏輯。

另一種解決方案,在 Spring Integration 2.2 中引入,是使用 CachingClientConnectionFactory,它允許使用共享連線池。

協作的出站和入站通道介面卡

為了實現高吞吐量(避免使用閘道器的陷阱,如前面提到的),您可以配置一對協作的出站和入站通道介面卡。您還可以使用協作介面卡(伺服器端或客戶端)進行完全非同步通訊(而不是請求-回覆語義)。在伺服器端,訊息關聯由介面卡自動處理,因為入站介面卡添加了一個訊息頭,允許出站介面卡確定傳送回覆訊息時使用哪個連線。

在伺服器端,您必須填充 ip_connectionId 訊息頭,因為它用於將訊息與連線關聯起來。源自入站介面卡的訊息會自動設定此訊息頭。如果您希望構建其他訊息來發送,則需要設定此訊息頭。您可以從入站訊息中獲取訊息頭值。

在客戶端,如果需要,應用程式必須提供自己的關聯邏輯。

您可以透過多種方式實現。如果訊息載荷包含一些自然的關聯資料(例如事務 ID 或訂單號),並且您無需保留原始出站訊息中的任何資訊(例如回覆通道訊息頭),則關聯很簡單,並且無論如何都將在應用程式級別完成。

如果訊息載荷包含一些自然的關聯資料(例如事務 ID 或訂單號),但您需要保留原始出站訊息中的一些資訊(例如回覆通道訊息頭),您可以保留原始出站訊息的副本(例如使用釋出-訂閱通道),並使用聚合器重新組合所需的資料。

對於前面兩種情況中的任何一種,如果載荷沒有自然的關聯資料,您可以在出站通道介面卡的上游提供一個轉換器來增強載荷,新增此類資料。這樣的轉換器可以將原始載荷轉換為一個新物件,該物件包含原始載荷和部分訊息頭。當然,訊息頭中的“即時”物件(例如回覆通道)不能包含在轉換後的載荷中。

如果您選擇這種策略,需要確保連線工廠具有適當的序列化器-反序列化器對來處理此類載荷(例如使用 Java 序列化的 DefaultSerializerDefaultDeserializer,或自定義序列化器和反序列化器)。TCP 連線工廠中提到的 ByteArray*Serializer 選項,包括預設的 ByteArrayCrLfSerializer,除非轉換後的載荷是 Stringbyte[],否則不支援此類載荷。

在 2.2 版本之前,當協作通道介面卡使用客戶端連線工廠時,so-timeout 屬性預設為預設回覆超時(10 秒)。這意味著,如果入站介面卡在此期間沒有收到資料,則套接字將被關閉。

此預設行為在真正的非同步環境中不適用,因此現在預設為無限超時。您可以透過在客戶端連線工廠上將 so-timeout 屬性設定為 10000 毫秒來恢復之前的預設行為。

從 5.4 版本開始,多個出站通道介面卡和一個 TcpInboundChannelAdapter 可以共享同一個連線工廠。這允許應用程式同時支援請求/回覆和任意的伺服器 → 客戶端訊息傳遞。更多資訊請參見TCP 閘道器

傳輸訊息頭

TCP 是一種流協議。SerializersDeserializers 在流中分隔訊息。在 3.0 之前,只能透過 TCP 傳輸訊息載荷(Stringbyte[])。從 3.0 版本開始,您也可以傳輸選定的訊息頭以及載荷。然而,“即時”物件,例如 replyChannel 訊息頭,無法序列化。

透過 TCP 傳送訊息頭資訊需要一些額外的配置。

第一步是為 ConnectionFactory 提供一個使用 mapper 屬性的 MessageConvertingTcpMessageMapper。此 mapper 委託給任何 MessageConverter 實現,將訊息轉換為可透過配置的 serializerdeserializer 進行序列化和反序列化的物件,反之亦然。

Spring Integration 提供了一個 MapMessageConverter,它允許指定一個訊息頭列表,這些訊息頭連同載荷一起新增到 Map 物件中。生成的 Map 有兩個條目:payloadheadersheaders 條目本身是一個 Map,包含選定的訊息頭。

第二步是提供一個序列化器和反序列化器,可以在 Map 和某種線路格式之間進行轉換。這可以是一個自定義的 SerializerDeserializer,如果對端系統不是 Spring Integration 應用程式,通常需要這樣做。

Spring Integration 提供了一個 MapJsonSerializer,用於將 Map 轉換為 JSON,反之亦然。它使用一個 Spring Integration JsonObjectMapper。如果需要,您可以提供一個自定義的 JsonObjectMapper。預設情況下,序列化器在物件之間插入換行符(0x0a)。更多資訊請參見Javadoc

JsonObjectMapper 使用類路徑中可用的任何版本的 Jackson

您也可以使用標準的 Java 序列化 Map,透過使用 DefaultSerializerDefaultDeserializer

以下示例展示了一個連線工廠的配置,該配置使用 JSON 傳輸 correlationIdsequenceNumbersequenceSize 訊息頭

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    mapper="mapper"
    serializer="jsonSerializer"
    deserializer="jsonSerializer"/>

<bean id="mapper"
      class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <constructor-arg name="messageConverter">
        <bean class="o.sf.integration.support.converter.MapMessageConverter">
            <property name="headerNames">
                <list>
                    <value>correlationId</value>
                    <value>sequenceNumber</value>
                    <value>sequenceSize</value>
                </list>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

使用前面配置傳送的訊息,載荷為 'something',在網路線路上將如下顯示

{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}