TCP 連線工廠

概覽

對於 TCP,透過使用連線工廠來配置底層連線。提供了兩種型別的連線工廠:客戶端連線工廠和伺服器連線工廠。客戶端連線工廠建立出站連線,伺服器連線工廠監聽入站連線。

出站通道介面卡使用客戶端連線工廠,但您也可以將客戶端連線工廠的引用提供給入站通道介面卡。該介面卡接收由出站介面卡建立的連線上收到的任何入站訊息。

入站通道介面卡或閘道器使用伺服器連線工廠。(事實上,連線工廠必須配合其中之一才能工作)。您也可以將伺服器連線工廠的引用提供給出站介面卡。然後,您可以使用該介面卡在同一連線上向入站訊息傳送回覆。

回覆訊息僅在回覆包含連線工廠插入到原始訊息中的 ip_connectionId 頭部時才路由到該連線。
這是在入站和出站介面卡之間共享連線工廠時執行的訊息關聯的程度。這種共享允許透過 TCP 進行非同步雙向通訊。預設情況下,僅透過 TCP 傳輸載荷資訊。因此,任何訊息關聯必須由下游元件(如聚合器或其他端點)執行。從 3.0 版本開始引入了傳輸選定頭部的支援。更多資訊請參見TCP 訊息關聯

您最多可以將連線工廠的引用提供給每種型別的一個介面卡。

Spring Integration 提供了使用 java.net.Socketjava.nio.channel.SocketChannel 的連線工廠。

以下示例展示了一個使用 java.net.Socket 連線的簡單伺服器連線工廠

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

以下示例展示了一個使用 java.nio.channel.SocketChannel 連線的簡單伺服器連線工廠

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>
從 Spring Integration 4.2 版本開始,如果伺服器配置為監聽隨機埠(透過將埠設定為 0),您可以使用 getPort() 獲取作業系統選擇的實際埠。此外,getServerSocketAddress() 允許您獲取完整的 SocketAddress。更多資訊請參見TcpServerConnectionFactory 介面的 Javadoc
<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

以下示例展示了一個使用 java.net.Socket 連線併為每條訊息建立新連線的客戶端連線工廠

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

從 5.2 版本開始,客戶端連線工廠支援以秒為單位指定的 connectTimeout 屬性,該屬性預設為 60。

訊息邊界劃分(序列化器和反序列化器)

TCP 是一個流協議。這意味著需要為透過 TCP 傳輸的資料提供一些結構,以便接收方可以將資料劃分為獨立的訊息。連線工廠配置為使用序列化器和反序列化器,分別用於在訊息載荷與透過 TCP 傳送的位元組之間進行轉換。這透過分別為入站和出站訊息提供反序列化器和序列化器來完成。Spring Integration 提供了許多標準序列化器和反序列化器。

ByteArrayCrlfSerializer* 將位元組陣列轉換為位元組流,後跟回車符和換行符 (\r\n)。這是預設的序列化器(和反序列化器),例如可以與 telnet 作為客戶端一起使用。

ByteArraySingleTerminatorSerializer* 將位元組陣列轉換為位元組流,後跟單個終止字元(預設為 0x00)。

ByteArrayLfSerializer* 將位元組陣列轉換為位元組流,後跟單個換行符 (0x0a)。

ByteArrayStxEtxSerializer* 將位元組陣列轉換為位元組流,前面是 STX (0x02),後面是 ETX (0x03)。

ByteArrayLengthHeaderSerializer 將位元組陣列轉換為位元組流,前面是一個網路位元組序(大端序)的二進位制長度。這是一種高效的反序列化器,因為它無需解析每個位元組來查詢終止字元序列。它也可用於包含二進位制資料的載荷。前面的序列化器僅支援載荷中的文字。長度頭部預設大小為四個位元組(一個 Integer),允許的訊息最大可達 (2^31 - 1) 位元組。但是,length 頭部可以是單位元組(無符號),用於最大 255 位元組的訊息,或無符號短整型(2 位元組),用於最大 (2^16 - 1) 位元組的訊息。如果您需要其他格式的頭部,可以子類化 ByteArrayLengthHeaderSerializer 並實現 readHeaderwriteHeader 方法。絕對最大資料大小是 (2^31 - 1) 位元組。從 5.2 版本開始,頭部值可以包含頭部自身的長度以及載荷長度。設定 inclusive 屬性啟用該機制(生產者和消費者必須設定相同)。

ByteArrayRawSerializer* 將位元組陣列轉換為位元組流,並且不新增額外的訊息邊界劃分資料。使用此序列化器(和反序列化器)時,訊息的結束由客戶端有序關閉套接字來指示。使用此序列化器時,訊息接收將一直阻塞,直到客戶端關閉套接字或發生超時。超時不會產生訊息。當使用此序列化器且客戶端是 Spring Integration 應用時,客戶端必須使用配置了 single-use="true" 的連線工廠。這樣做會導致介面卡在傳送訊息後關閉套接字。序列化器本身不會關閉連線。您應該僅在通道介面卡(而非閘道器)使用的連線工廠中使用此序列化器,並且連線工廠應該由入站或出站介面卡之一使用,但不能兩者都用。另請參見本節後面的 ByteArrayElasticRawDeserializer。然而,從 5.2 版本開始,出站閘道器有一個新屬性 closeStreamAfterSend;這允許使用原始序列化器/反序列化器,因為 EOF 會被髮送到伺服器,同時保持連線開啟以接收回復。

在 4.2.2 版本之前,使用非阻塞 I/O (NIO) 時,此序列化器會將(讀取期間的)超時視為檔案結束,並將已讀取的資料作為訊息發出。這是不可靠的,不應該用於劃分訊息邊界。現在將此類情況視為異常。如果您在這種情況下使用它(儘管不推薦),可以透過將 treatTimeoutAsEndOfMessage 建構函式引數設定為 true 來恢復以前的行為。

它們都是 AbstractByteArraySerializer 的子類,實現了 org.springframework.core.serializer.Serializerorg.springframework.core.serializer.Deserializer 介面。為了向後相容,使用 AbstractByteArraySerializer 的任何子類進行序列化的連線也接受 String,該 String 會先轉換為位元組陣列。每個序列化器和反序列化器都將包含相應格式的輸入流轉換為位元組陣列載荷。

為避免因行為不端的客戶端(不遵守配置的序列化器協議的客戶端)導致記憶體耗盡,這些序列化器強制設定了最大訊息大小。如果入站訊息超出此大小,將丟擲異常。預設最大訊息大小為 2048 位元組。您可以透過設定 maxMessageSize 屬性來增加它。如果您使用預設序列化器或反序列化器並希望增加最大訊息大小,必須將最大訊息大小宣告為一個顯式的 bean,設定好 maxMessageSize 屬性,並配置連線工廠使用該 bean。

本節前面標記有 * 的類使用中間緩衝區,並將解碼後的資料複製到正確大小的最終緩衝區。從 4.3 版本開始,可以透過設定 poolSize 屬性來配置這些緩衝區,讓這些原始緩衝區被重用,而不是為每條訊息分配和丟棄,這是預設行為。將屬性設定為負值會建立一個無界的池。如果池有界,您還可以設定 poolWaitTimeout 屬性(以毫秒為單位),超過此時間後,如果沒有可用緩衝區,則丟擲異常。它預設為無限。此類異常會導致套接字關閉。

如果您希望在自定義反序列化器中使用相同的機制,可以擴充套件 AbstractPooledBufferByteArraySerializer(而不是其超類 AbstractByteArraySerializer)並實現 doDeserialize() 而不是 deserialize()。緩衝區會自動返回到池中。AbstractPooledBufferByteArraySerializer 還提供了一個便捷的實用方法:copyToSizedArray()

5.0 版本添加了 ByteArrayElasticRawDeserializer。這類似於上面的 ByteArrayRawSerializer 的反序列化器部分,不同之處在於無需設定 maxMessageSize。在內部,它使用 ByteArrayOutputStream,允許緩衝區按需增長。客戶端必須有序關閉套接字以指示訊息結束。

此反序列化器僅應在對等方受信任時使用;它容易受到因記憶體不足條件導致的 DoS 攻擊。

MapJsonSerializer 使用 Jackson ObjectMapperMap 和 JSON 之間進行轉換。可以結合 MessageConvertingTcpMessageMapperMapMessageConverter 使用此序列化器,以 JSON 格式傳輸選定的頭部和載荷。

Jackson ObjectMapper 無法在流中劃分訊息邊界。因此,MapJsonSerializer 需要委託給另一個序列化器或反序列化器來處理訊息邊界劃分。預設情況下,使用 ByteArrayLfSerializer,導致在線上傳輸的訊息格式為 <json><LF>,但您可以配置它使用其他序列化器。(下一個示例將展示如何操作。)

最後一個標準序列化器是 org.springframework.core.serializer.DefaultSerializer,可用於透過 Java 序列化轉換可序列化物件。提供了 org.springframework.core.serializer.DefaultDeserializer 用於入站反序列化包含可序列化物件的流。

如果您不想使用預設序列化器和反序列化器 (ByteArrayCrLfSerializer),必須在連線工廠上設定 serializerdeserializer 屬性。以下示例展示瞭如何操作

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

一個使用 java.net.Socket 連線並在傳輸中使用 Java 序列化的伺服器連線工廠。

有關連線工廠可用屬性的完整詳細資訊,請參見本節末尾的參考

預設情況下,不對入站資料包執行反向 DNS 查詢:在未配置 DNS 的環境(例如 Docker 容器)中,這可能導致連線延遲。要將 IP 地址轉換為用於訊息頭的主機名,可以透過將 lookup-host 屬性設定為 true 來覆蓋預設行為。

您還可以修改套接字和套接字工廠的屬性。更多資訊請參見SSL/TLS 支援。如該部分所述,無論是否使用 SSL,此類修改都是可能的。

主機驗證

從 5.1.0 版本開始,預設啟用主機驗證以增強安全性。此功能確保在 TCP 連線期間驗證伺服器的身份。

如果您遇到需要停用主機驗證的場景(不推薦),可以在 tcp-connection-factory 中配置 socket-support 屬性。

<int-ip:tcp-connection-factory id="client"
                                type="client"
                                host="localhost"
                                port="0"
                                socket-support="customSocketSupport"
                                single-use="true"
                                so-timeout="10000"/>

<bean id="customSocketSupport" class="org.springframework.integration.ip.tcp.connection.DefaultTcpSocketSupport">
	<constructor-arg value="false" />
</bean>

自定義序列化器和反序列化器

如果您的資料格式不受標準反序列化器支援,您可以實現自定義反序列化器;您也可以實現自定義序列化器。

要實現自定義序列化器和反序列化器對,實現 org.springframework.core.serializer.Deserializerorg.springframework.core.serializer.Serializer 介面。

當反序列化器在訊息之間檢測到輸入流已關閉時,必須丟擲 SoftEndOfStreamException;這是向框架發出的訊號,表明關閉是“正常”的。如果在解碼訊息時流關閉,則應丟擲其他異常。

從 5.2 版本開始,SoftEndOfStreamException 現在是 RuntimeException 的子類,而不是繼承自 IOException

TCP 快取客戶端連線工廠

前面所述,TCP 套接字可以是“單次使用”(一次請求或響應)或共享的。共享套接字在高併發環境下與出站閘道器配合效能不佳,因為套接字一次只能處理一個請求或響應。

為了提高效能,可以使用協作通道介面卡代替閘道器,但這需要應用程式級別的訊息關聯。更多資訊請參見TCP 訊息關聯

Spring Integration 2.2 引入了快取客戶端連線工廠,它使用共享套接字池,使閘道器可以使用共享連線池處理多個併發請求。

TCP 故障轉移客戶端連線工廠

可以配置支援故障轉移到一個或多個其他伺服器的 TCP 連線工廠。傳送訊息時,工廠會遍歷其配置的所有工廠,直到訊息可以傳送或找不到連線。最初,使用配置列表中的第一個工廠。如果後續連線失敗,下一個工廠將成為當前工廠。以下示例展示瞭如何配置故障轉移客戶端連線工廠

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>
使用故障轉移連線工廠時,工廠本身的 singleUse 屬性必須與其配置使用的工廠列表保持一致。

與共享連線 (singleUse=false) 一起使用時,連線工廠有兩個與故障恢復相關的屬性

  • refreshSharedInterval

  • closeOnRefresh

考慮基於上述配置的以下場景:假設 clientFactory1 無法建立連線,但 clientFactory2 可以。當在 refreshSharedInterval 超時後呼叫 failCFgetConnection() 方法時,我們將再次嘗試使用 clientFactory1 連線;如果成功,與 clientFactory2 的連線將被關閉。如果 closeOnRefreshfalse,則“舊”連線將保持開啟,如果第一個工廠再次失敗,將來可能會被重用。

設定 refreshSharedInterval,以便僅在該時間過期後才嘗試重新連線到第一個工廠;將其設定為 Long.MAX_VALUE(預設),如果您只希望在當前連線失敗時才故障恢復到第一個工廠。

設定 closeOnRefresh,以便在重新整理實際建立新連線後關閉“舊”連線。

如果任何委託工廠是 CachingClientConnectionFactory,則這些屬性不適用,因為連線快取是在那裡處理的;在這種情況下,總是會查閱連線工廠列表來獲取連線。

從 5.3 版本開始,這些屬性預設為 Long.MAX_VALUEtrue,因此工廠僅在當前連線失敗時才嘗試故障恢復。要恢復到之前版本的預設行為,將它們設定為 0false

另請參見測試連線

TCP 執行緒親和性連線工廠

Spring Integration 5.0 版本引入了此連線工廠。它將連線繫結到呼叫執行緒,該執行緒每次傳送訊息時都會重用相同的連線。這會一直持續,直到連線被關閉(由伺服器或網路)或直到執行緒呼叫 releaseConnection() 方法。連線本身由另一個客戶端工廠實現提供,該實現必須配置為提供非共享(單次使用)連線,以便每個執行緒獲取一個連線。

以下示例展示瞭如何配置 TCP 執行緒親和性連線工廠

@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}