關於非阻塞I/O (NIO)
使用NIO(參見 IP配置屬性 中的 using-nio
)避免為讀取每個socket而分配一個專用執行緒。對於少量socket,您可能會發現,不使用NIO並結合非同步移交(例如到QueueChannel
)的效能與使用NIO相當或更優。
當處理大量連線時,您應該考慮使用NIO。然而,使用NIO還有一些其他影響。一個執行緒池(在任務執行器中)在所有socket之間共享。每個傳入訊息被組裝,並在從該池中選擇的執行緒上作為一個獨立的工作單元傳送到配置的通道。到達同一socket的兩個連續訊息可能由不同的執行緒處理。這意味著訊息傳送到通道的順序是不確定的。到達socket的訊息的嚴格順序不會得到保持。
對於某些應用程式來說,這不是問題。但對於其他應用程式來說,這是一個問題。如果您需要嚴格排序,請考慮將 using-nio
設定為 false
並使用非同步移交。
或者,您可以在入站端點的下游插入一個重新排序器(resequencer),以將訊息恢復到正確的順序。如果您在連線工廠上將 apply-sequence
設定為 true
,則到達TCP連線的訊息將設定 sequenceNumber
和 correlationId
頭。重新排序器使用這些頭來將訊息恢復到正確的順序。
從版本5.1.4開始,接收新連線的優先順序高於從現有連線讀取。通常,除非您有非常高的新傳入連線速率,否則這幾乎沒有影響。如果您希望恢復到之前讀取優先的行為,請將 TcpNioServerConnectionFactory 上的 multiAccept 屬性設定為 false 。 |
執行緒池大小
pool size
屬性不再使用。以前,它在未指定任務執行器時指定預設執行緒池的大小。它還用於設定伺服器socket的連線積壓(backlog)。第一個功能已不再需要(參見下一段)。第二個功能已被 backlog
屬性取代。
以前,當將固定執行緒池任務執行器(這是預設設定)與NIO一起使用時,可能會發生死鎖並導致處理停止。問題發生在緩衝區滿時,一個從socket讀取資料的執行緒試圖向緩衝區新增更多資料,但沒有可用執行緒來在緩衝區中騰出空間。這種情況僅線上程池大小非常小的情況下發生,但在極端條件下也可能出現。自2.2版本以來,有兩個更改消除了這個問題。首先,預設任務執行器是快取執行緒池執行器。其次,添加了死鎖檢測邏輯,以便在發生執行緒飢餓時,不會死鎖,而是丟擲異常,從而釋放死鎖資源。
現在預設的任務執行器是無界的,如果訊息處理需要較長時間,則在高傳入訊息速率下可能會發生記憶體不足的情況。如果您的應用程式表現出這種行為,您應該使用具有適當執行緒池大小的池化任務執行器,但請參見 下一節。 |
帶有 CALLER_RUNS
策略的執行緒池任務執行器
當您使用帶有 CallerRunsPolicy
(使用 <task/>
名稱空間時為 CALLER_RUNS
)的固定執行緒池且佇列容量較小時,應該牢記一些重要注意事項。
如果您不使用固定執行緒池,則以下內容不適用。
對於NIO連線,存在三種不同的任務型別。I/O選擇器處理在一個專用執行緒上執行(檢測事件、接受新連線以及使用任務執行器將I/O讀取操作分派到其他執行緒)。當一個I/O讀取器執行緒(讀取操作被分派到的執行緒)讀取資料時,它將資料移交給另一個執行緒來組裝傳入訊息。大型訊息可能需要多次讀取才能完成。這些“組裝器”(assembler)執行緒在等待資料時可能會阻塞。當新的讀取事件發生時,讀取器判斷該socket是否已經有組裝器,如果沒有,則執行一個新的。組裝過程完成後,組裝器執行緒被返回到執行緒池。
當執行緒池耗盡、使用了 CALLER_RUNS
拒絕策略且任務佇列已滿時,這可能導致死鎖。當執行緒池為空且佇列中沒有空間時,I/O選擇器執行緒接收到 OP_READ
事件,並使用執行器分派讀取操作。佇列已滿,因此選擇器執行緒自身開始讀取過程。現在它檢測到該socket沒有組裝器,並在執行讀取之前啟動一個組裝器。再次,佇列已滿,選擇器執行緒變成了組裝器。組裝器現在被阻塞,等待資料被讀取,但這永遠不會發生。連線工廠現在處於死鎖狀態,因為選擇器執行緒無法處理新事件。
為了避免這種死鎖,我們必須避免選擇器(或讀取器)執行緒執行組裝任務。我們希望為I/O和組裝操作使用獨立的執行緒池。
框架提供了 CompositeExecutor
,它允許配置兩個不同的執行器:一個用於執行I/O操作,一個用於訊息組裝。在這種環境中,一個I/O執行緒永遠不會成為組裝器執行緒,因此不會發生死鎖。
此外,任務執行器應配置使用 AbortPolicy
(使用 <task>
時為 ABORT
)。當I/O任務無法完成時,它會被延遲一小段時間並持續重試,直到能夠完成並分配一個組裝器。預設情況下,延遲時間為100毫秒,但您可以透過在連線工廠上設定 readDelay
屬性(使用XML名稱空間配置時為 read-delay
)來更改它。
以下三個示例展示瞭如何配置複合執行器
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>