TCP 連線工廠
概述
對於 TCP,底層連線的配置透過使用連線工廠提供。提供了兩種型別的連線工廠:客戶端連線工廠和伺服器連線工廠。客戶端連線工廠建立出站連線。伺服器連線工廠監聽入站連線。
出站通道介面卡使用客戶端連線工廠,但你也可以向入站通道介面卡提供對客戶端連線工廠的引用。該介面卡接收由出站介面卡建立的連線上接收到的任何入站訊息。
入站通道介面卡或閘道器使用伺服器連線工廠。(實際上,沒有它,連線工廠無法工作)。你還可以向出站介面卡提供對伺服器連線工廠的引用。然後,你可以使用該介面卡透過同一連線傳送對入站訊息的回覆。
只有當回覆包含由連線工廠插入到原始訊息中的 ip_connectionId 標頭時,回覆訊息才會被路由到連線。 |
| 這就是在入站和出站介面卡之間共享連線工廠時執行的訊息關聯的程度。這種共享允許透過 TCP 進行非同步雙向通訊。預設情況下,只使用 TCP 傳輸有效負載資訊。因此,任何訊息關聯必須由下游元件(例如聚合器或其他端點)執行。版本 3.0 中引入了對傳輸選定標頭的支援。有關更多資訊,請參閱TCP 訊息關聯。 |
你可以為每種型別最多一個介面卡提供對連線工廠的引用。
Spring Integration 提供了使用 java.net.Socket 和 java.nio.channel.SocketChannel 的連線工廠。
以下示例顯示了一個使用 java.net.Socket 連線的簡單伺服器連線工廠
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"/>
以下示例顯示了一個使用 java.nio.channel.SocketChannel 連線的簡單伺服器連線工廠
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
using-nio="true"/>
從 Spring Integration 4.2 版開始,如果伺服器配置為在隨機埠(透過將埠設定為 0)上偵聽,則可以使用 getPort() 獲取作業系統選擇的實際埠。此外,getServerSocketAddress() 允許你獲取完整的 SocketAddress。有關更多資訊,請參閱TcpServerConnectionFactory 介面的 Javadoc。 |
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"/>
以下示例顯示了一個使用 java.net.Socket 連線併為每個訊息建立新連線的客戶端連線工廠
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"
using-nio=true/>
從 5.2 版本開始,客戶端連線工廠支援 connectTimeout 屬性,以秒為單位指定,預設為 60。
另請參閱 基於註解的配置 和 使用 Java DSL 進行 TCP 元件配置。
訊息劃分(序列化器和反序列化器)
TCP 是一種流協議。這意味著必須為透過 TCP 傳輸的資料提供一些結構,以便接收方可以將資料劃分為離散的訊息。連線工廠被配置為使用序列化器和反序列化器在訊息有效負載和透過 TCP 傳送的位之間進行轉換。這透過分別為入站和出站訊息提供反序列化器和序列化器來實現。Spring Integration 提供了許多標準的序列化器和反序列化器。
ByteArrayCrlfSerializer* 將位元組陣列轉換為位元組流,後跟回車符和換行符 (\r\n)。這是預設的序列化器(和反序列化器),可以與 telnet 作為客戶端一起使用(例如)。
ByteArraySingleTerminatorSerializer* 將位元組陣列轉換為位元組流,後跟單個終止字元(預設為 0x00)。
ByteArrayLfSerializer* 將位元組陣列轉換為位元組流,後跟單個換行符 (0x0a)。
ByteArrayStxEtxSerializer* 將位元組陣列轉換為位元組流,字首為 STX (0x02),後跟 ETX (0x03)。
ByteArrayLengthHeaderSerializer 將位元組陣列轉換為位元組流,字首為網路位元組順序(大端)的二進位制長度。這是一種高效的反序列化器,因為它不必解析每個位元組來查詢終止字元序列。它還可以用於包含二進位制資料的有效負載。前面的序列化器僅支援有效負載中的文字。長度標頭的預設大小為四個位元組(一個整數),允許訊息最大為 (2^31 - 1) 位元組。但是,對於最大為 255 位元組的訊息,length 標頭可以是單個位元組(無符號),或者對於最大為 (2^16 - 1) 位元組的訊息,可以是無符號短整型(2 位元組)。如果需要其他任何標頭格式,你可以子類化 ByteArrayLengthHeaderSerializer 併為 readHeader 和 writeHeader 方法提供實現。絕對最大資料大小為 (2^31 - 1) 位元組。從 5.2 版本開始,標頭值可以包含標頭本身的長度以及有效負載的長度。設定 inclusive 屬性以啟用該機制(它必須在生產者和消費者之間設定為相同)。
ByteArrayRawSerializer* 將位元組陣列轉換為位元組流,並且不新增任何額外的訊息劃分資料。使用此序列化器(和反序列化器),訊息的結束由客戶端以有序方式關閉套接字來指示。使用此序列化器時,訊息接收會一直掛起,直到客戶端關閉套接字或發生超時。超時不會導致訊息。當使用此序列化器並且客戶端是 Spring Integration 應用程式時,客戶端必須使用配置了 single-use="true" 的連線工廠。這樣做會導致介面卡在傳送訊息後關閉套接字。序列化器本身不會關閉連線。你應僅將此序列化器與通道介面卡(而非閘道器)使用的連線工廠一起使用,並且連線工廠應由入站或出站介面卡之一使用,而不是兩者都使用。另請參閱本節後面的 ByteArrayElasticRawDeserializer。但是,從 5.2 版本開始,出站閘道器有一個新的屬性 closeStreamAfterSend;這允許使用原始序列化器/反序列化器,因為 EOF 會發送給伺服器,同時保持連線開啟以接收回復。
在 4.2.2 版本之前,當使用非阻塞 I/O (NIO) 時,此序列化器將(讀取期間的)超時視為檔案結束,並且到目前為止讀取的資料作為訊息發出。這是不可靠的,不應用於分隔訊息。它現在將此類情況視為異常。在極少數情況下,如果你以這種方式使用它,你可以透過將 treatTimeoutAsEndOfMessage 建構函式引數設定為 true 來恢復以前的行為。 |
這些都是 AbstractByteArraySerializer 的子類,它實現了 org.springframework.core.serializer.Serializer 和 org.springframework.core.serializer.Deserializer。為了向後相容,使用 AbstractByteArraySerializer 的任何子類進行序列化的連線也接受首先轉換為位元組陣列的 String。這些序列化器和反序列化器都將包含相應格式的輸入流轉換為位元組陣列有效負載。
為了避免由於行為不當的客戶端(不遵守配置的序列化器協議的客戶端)導致記憶體耗盡,這些序列化器施加了最大訊息大小。如果傳入訊息超過此大小,則會丟擲異常。預設最大訊息大小為 2048 位元組。你可以透過設定 maxMessageSize 屬性來增加它。如果你使用預設的序列化器或反序列化器並希望增加最大訊息大小,則必須將最大訊息大小宣告為具有設定 maxMessageSize 屬性的顯式 bean,並配置連線工廠以使用該 bean。
本節前面標記為*的類使用中間緩衝區並將解碼資料複製到正確大小的最終緩衝區。從 4.3 版本開始,你可以透過設定 poolSize 屬性來配置這些緩衝區,以使這些原始緩衝區可以重用,而不是為每個訊息分配和丟棄,這是預設行為。將屬性設定為負值會建立一個沒有邊界的池。如果池有邊界,你還可以設定 poolWaitTimeout 屬性(以毫秒為單位),在此之後,如果沒有可用的緩衝區,則會丟擲異常。它預設為無限。此類異常會導致套接字關閉。
如果你希望在自定義反序列化器中使用相同的機制,你可以擴充套件 AbstractPooledBufferByteArraySerializer(而不是其超類 AbstractByteArraySerializer)並實現 doDeserialize() 而不是 deserialize()。緩衝區會自動返回到池中。AbstractPooledBufferByteArraySerializer 還提供了一個方便的實用方法:copyToSizedArray()。
5.0 版本添加了 ByteArrayElasticRawDeserializer。這類似於上述 ByteArrayRawSerializer 的反序列化器端,只是它不需要設定 maxMessageSize。在內部,它使用 ByteArrayOutputStream,允許緩衝區根據需要增長。客戶端必須以有序方式關閉套接字以表示訊息結束。
| 此反序列化器僅應在信任對等方時使用;它容易受到由於記憶體不足條件導致的 DoS 攻擊。 |
MapJsonSerializer 使用 Jackson ObjectMapper 在 Map 和 JSON 之間進行轉換。你可以將此序列化器與 MessageConvertingTcpMessageMapper 和 MapMessageConverter 結合使用,以 JSON 格式傳輸選定的標頭和有效負載。
Jackson ObjectMapper 無法在流中劃分訊息。因此,MapJsonSerializer 需要委託給另一個序列化器或反序列化器來處理訊息劃分。預設情況下,使用 ByteArrayLfSerializer,導致訊息以 <json><LF> 格式線上傳輸,但你可以將其配置為使用其他序列化器。(下一個示例顯示瞭如何做到這一點。) |
最後一個標準序列化器是 org.springframework.core.serializer.DefaultSerializer,你可以使用它透過 Java 序列化轉換可序列化物件。org.springframework.core.serializer.DefaultDeserializer 用於入站流的反序列化,這些流包含可序列化物件。
如果你不希望使用預設的序列化器和反序列化器 (ByteArrayCrLfSerializer),則必須在連線工廠上設定 serializer 和 deserializer 屬性。以下示例顯示瞭如何做到這一點
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer" />
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
一個伺服器連線工廠,它使用 java.net.Socket 連線並在網路上使用 Java 序列化。
有關連線工廠上可用屬性的完整詳細資訊,請參閱本節末尾的參考。
預設情況下,對入站資料包不執行反向 DNS 查詢:在未配置 DNS 的環境中(例如 Docker 容器),這可能導致連線延遲。為了將 IP 地址轉換為用於訊息頭的 hostname,可以透過將 lookup-host 屬性設定為 true 來覆蓋預設行為。
| 你還可以修改套接字和套接字工廠的屬性。有關更多資訊,請參閱SSL/TLS 支援。如那裡所述,無論是否使用 SSL,都可以進行此類修改。 |
另請參閱 基於註解的配置 和 使用 Java DSL 進行 TCP 元件配置。
主機驗證
從 5.1.0 版本開始,預設啟用主機驗證以增強安全性。此功能確保在 TCP 連線期間驗證伺服器的身份。
如果你遇到需要停用主機驗證的場景(不推薦),你可以在 tcp-connection-factory 中配置 socket-support 屬性。
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="0"
socket-support="customSocketSupport"
single-use="true"
so-timeout="10000"/>
<bean id="customSocketSupport" class="org.springframework.integration.ip.tcp.connection.DefaultTcpSocketSupport">
<constructor-arg value="false" />
</bean>
自定義序列化器和反序列化器
如果你的資料格式不受任何標準反序列化器支援,你可以實現自己的;你也可以實現自定義序列化器。
要實現自定義序列化器和反序列化器對,請實現 org.springframework.core.serializer.Deserializer 和 org.springframework.core.serializer.Serializer 介面。
當反序列化器在訊息之間檢測到已關閉的輸入流時,它必須丟擲 SoftEndOfStreamException;這是向框架發出的訊號,表明關閉是“正常”的。如果在解碼訊息時流關閉,則應丟擲其他異常。
從 5.2 版本開始,SoftEndOfStreamException 現在是一個 RuntimeException,而不是擴充套件 IOException。
TCP 故障轉移客戶端連線工廠
你可以配置一個支援故障轉移到一個或多個其他伺服器的 TCP 連線工廠。傳送訊息時,工廠會遍歷其所有配置的工廠,直到可以傳送訊息或找不到連線為止。最初,使用配置列表中的第一個工廠。如果連線隨後失敗,則下一個工廠成為當前工廠。以下示例顯示瞭如何配置故障轉移客戶端連線工廠
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
<constructor-arg>
<list>
<ref bean="clientFactory1"/>
<ref bean="clientFactory2"/>
</list>
</constructor-arg>
</bean>
使用故障轉移連線工廠時,singleUse 屬性必須在工廠本身和它配置使用的工廠列表之間保持一致。 |
當與共享連線(singleUse=false)一起使用時,連線工廠有兩個與回退相關的屬性
-
refreshSharedInterval -
closeOnRefresh
考慮以下基於上述配置的場景:假設 clientFactory1 無法建立連線,但 clientFactory2 可以。當 refreshSharedInterval 過去後呼叫 failCF 的 getConnection() 方法時,我們將再次嘗試使用 clientFactory1 連線;如果成功,與 clientFactory2 的連線將被關閉。如果 closeOnRefresh 為 false,則“舊”連線將保持開啟狀態,如果第一個工廠再次失敗,將來可能會被重用。
將 refreshSharedInterval 設定為僅在該時間過期後嘗試重新連線到第一個工廠;如果你只想在當前連線失敗時回退到第一個工廠,則將其設定為 Long.MAX_VALUE(預設值)。
設定 closeOnRefresh 以在重新整理實際建立新連線後關閉“舊”連線。
如果任何委託工廠是 CachingClientConnectionFactory,則這些屬性不適用,因為連線快取是在那裡處理的;在這種情況下,將始終諮詢連線工廠列表以獲取連線。 |
從 5.3 版本開始,這些屬性預設為 Long.MAX_VALUE 和 true,因此工廠僅在當前連線失敗時嘗試回退。要恢復到以前版本的預設行為,請將它們設定為 0 和 false。
另請參閱測試連線。
TCP 執行緒親和性連線工廠
Spring Integration 5.0 版本引入了此連線工廠。它將連線繫結到呼叫執行緒,並且每次該執行緒傳送訊息時都會重用相同的連線。這會一直持續到連線關閉(由伺服器或網路)或執行緒呼叫 releaseConnection() 方法為止。連線本身由另一個客戶端工廠實現提供,該實現必須配置為提供非共享(一次性使用)連線,以便每個執行緒獲得一個連線。
以下示例顯示瞭如何配置 TCP 執行緒親和性連線工廠
@Bean
public TcpNetClientConnectionFactory cf() {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
Integer.parseInt(System.getProperty(PORT)));
cf.setSingleUse(true);
return cf;
}
@Bean
public ThreadAffinityClientConnectionFactory tacf() {
return new ThreadAffinityClientConnectionFactory(cf());
}
@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(tacf());
outGate.setReplyChannelName("toString");
return outGate;
}