執行緒屏障
有時,我們需要暫停訊息流執行緒,直到發生其他非同步事件。例如,考慮一個將訊息釋出到 RabbitMQ 的 HTTP 請求。我們可能希望在 RabbitMQ 代理確認訊息已接收之前,不對使用者進行回覆。
在 4.2 版本中,Spring Integration 引入了 <barrier/>
元件來實現此目的。底層是 BarrierMessageHandler
,它實現了 MessageHandler
介面。該類還實現了 MessageTriggerAction
介面,透過呼叫其 trigger()
方法傳遞訊息,可以釋放 handleRequestMessage()
方法中對應的(如果存在)掛起執行緒。
掛起的執行緒和觸發執行緒透過對訊息呼叫 CorrelationStrategy
進行關聯。當訊息傳送到 input-channel
時,執行緒將被掛起,最多等待 requestTimeout
毫秒,等待相應的觸發訊息。預設的關聯策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID
頭資訊。當帶有相同關聯 ID 的觸發訊息到達時,執行緒被釋放。釋放後傳送到 output-channel
的訊息是使用 MessageGroupProcessor
構建的。預設情況下,該訊息是一個包含兩個 payload 的 Collection<?>
,並且頭資訊使用 DefaultAggregatingMessageGroupProcessor
合併。
如果 trigger() 方法先被呼叫(或主執行緒超時後),它將掛起,最多等待 triggerTimeout 毫秒,等待掛起訊息的到達。如果您不想掛起觸發執行緒,請考慮將其交給 TaskExecutor 處理,以便由 TaskExecutor 的執行緒來掛起。 |
在 5.4 版本之前,請求訊息和觸發訊息只有一個 timeout 選項,但在某些用例中,最好對這些操作設定不同的超時時間。因此,引入了 requestTimeout 和 triggerTimeout 選項。 |
requires-reply
屬性決定了在觸發訊息到達之前,如果掛起執行緒超時應採取的操作。預設情況下,它為 false
,這意味著端點返回 null
,流結束,並且執行緒返回給呼叫者。當設定為 true
時,將丟擲 ReplyRequiredException
異常。
您可以透過程式設計方式呼叫 trigger()
方法(使用 bean 名稱 barrier.handler
獲取 bean 引用,其中 barrier
是 barrier 端點的 bean 名稱)。或者,您可以配置一個 <outbound-channel-adapter/>
來觸發釋放。
使用相同的關聯 ID 只能掛起一個執行緒。同一個關聯 ID 可以多次使用,但不能同時用於多個掛起操作。如果第二個執行緒帶著相同的關聯 ID 到達,則會丟擲異常。 |
以下示例展示瞭如何使用自定義頭資訊進行關聯
-
Java
-
XML
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
根據哪個通道先接收到訊息,傳送訊息到 in
通道的執行緒或傳送訊息到 release
通道的執行緒將最多等待十秒,直到另一條訊息到達。訊息釋放後,out
通道會發送一條訊息,該訊息結合了呼叫名為 myOutputProcessor
的自定義 MessageGroupProcessor
bean 的結果。如果主執行緒超時並且觸發訊息稍後才到達,您可以配置一個丟棄通道,將延遲到達的觸發訊息傳送到該通道。如果請求訊息未及時到達,觸發訊息也會被丟棄。
有關此元件的示例,請參閱barrier 示例應用。