執行緒屏障
有時,我們需要暫停訊息流執行緒,直到發生其他非同步事件。例如,考慮一個將訊息釋出到RabbitMQ的HTTP請求。我們可能希望在RabbitMQ代理發出訊息已收到的確認之前,不回覆使用者。
在版本4.2中,Spring Integration為此目的引入了<barrier/>元件。底層的MessageHandler是BarrierMessageHandler。該類還實現了MessageTriggerAction,其中傳遞給trigger()方法的訊息會釋放handleRequestMessage()方法中相應的執行緒(如果存在)。
透過對訊息呼叫CorrelationStrategy來關聯掛起執行緒和觸發執行緒。當訊息傳送到input-channel時,執行緒將掛起長達requestTimeout毫秒,等待相應的觸發訊息。預設的關聯策略使用IntegrationMessageHeaderAccessor.CORRELATION_ID頭。當具有相同關聯的觸發訊息到達時,執行緒被釋放。釋放後傳送到output-channel的訊息是使用MessageGroupProcessor構建的。預設情況下,該訊息是一個包含兩個有效載荷的Collection<?>,並且標頭檔案透過DefaultAggregatingMessageGroupProcessor進行合併。
如果trigger()方法首先被呼叫(或在主執行緒超時後),它將被掛起長達triggerTimeout,等待掛起訊息到達。如果您不想掛起觸發執行緒,請考慮將其交給TaskExecutor,以便其執行緒被掛起。 |
在5.4版本之前,請求和觸發訊息只有一個timeout選項,但在某些用例中,為這些操作設定不同的超時時間會更好。因此,引入了requestTimeout和triggerTimeout選項。 |
requires-reply屬性確定在觸發訊息到達之前,如果掛起執行緒超時,要採取的操作。預設情況下,它為false,這意味著端點返回null,流程結束,執行緒返回給呼叫者。當為true時,會丟擲ReplyRequiredException。
您可以程式設計方式呼叫trigger()方法(使用名稱barrier.handler獲取bean引用——其中barrier是屏障端點的bean名稱)。或者,您可以配置一個<outbound-channel-adapter/>來觸發釋放。
| 只有一個執行緒可以與相同的關聯一起掛起。相同的關聯可以多次使用,但不能同時使用。如果第二個執行緒以相同的關聯到達,則會丟擲異常。 |
以下示例演示瞭如何使用自定義頭進行關聯
-
Java
-
XML
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
根據哪個訊息先到達,傳送訊息到in的執行緒或傳送訊息到release的執行緒將等待長達十秒,直到另一個訊息到達。當訊息被釋放時,out通道會發送一個訊息,該訊息結合了呼叫自定義MessageGroupProcessor bean(名為myOutputProcessor)的結果。如果主執行緒超時,並且觸發器稍後到達,您可以配置一個丟棄通道,將延遲觸發器傳送到該通道。如果請求訊息未能及時到達,觸發訊息也會被丟棄。
有關此元件的示例,請參閱屏障示例應用程式。