執行緒屏障

有時,我們需要暫停訊息流執行緒,直到發生其他非同步事件。例如,考慮一個將訊息釋出到RabbitMQ的HTTP請求。我們可能希望在RabbitMQ代理發出訊息已收到的確認之前,不回覆使用者。

在版本4.2中,Spring Integration為此目的引入了<barrier/>元件。底層的MessageHandlerBarrierMessageHandler。該類還實現了MessageTriggerAction,其中傳遞給trigger()方法的訊息會釋放handleRequestMessage()方法中相應的執行緒(如果存在)。

透過對訊息呼叫CorrelationStrategy來關聯掛起執行緒和觸發執行緒。當訊息傳送到input-channel時,執行緒將掛起長達requestTimeout毫秒,等待相應的觸發訊息。預設的關聯策略使用IntegrationMessageHeaderAccessor.CORRELATION_ID頭。當具有相同關聯的觸發訊息到達時,執行緒被釋放。釋放後傳送到output-channel的訊息是使用MessageGroupProcessor構建的。預設情況下,該訊息是一個包含兩個有效載荷的Collection<?>,並且標頭檔案透過DefaultAggregatingMessageGroupProcessor進行合併。

如果trigger()方法首先被呼叫(或在主執行緒超時後),它將被掛起長達triggerTimeout,等待掛起訊息到達。如果您不想掛起觸發執行緒,請考慮將其交給TaskExecutor,以便其執行緒被掛起。
在5.4版本之前,請求和觸發訊息只有一個timeout選項,但在某些用例中,為這些操作設定不同的超時時間會更好。因此,引入了requestTimeouttriggerTimeout選項。

requires-reply屬性確定在觸發訊息到達之前,如果掛起執行緒超時,要採取的操作。預設情況下,它為false,這意味著端點返回null,流程結束,執行緒返回給呼叫者。當為true時,會丟擲ReplyRequiredException

您可以程式設計方式呼叫trigger()方法(使用名稱barrier.handler獲取bean引用——其中barrier是屏障端點的bean名稱)。或者,您可以配置一個<outbound-channel-adapter/>來觸發釋放。

只有一個執行緒可以與相同的關聯一起掛起。相同的關聯可以多次使用,但不能同時使用。如果第二個執行緒以相同的關聯到達,則會丟擲異常。

以下示例演示瞭如何使用自定義頭進行關聯

  • Java

  • XML

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

根據哪個訊息先到達,傳送訊息到in的執行緒或傳送訊息到release的執行緒將等待長達十秒,直到另一個訊息到達。當訊息被釋放時,out通道會發送一個訊息,該訊息結合了呼叫自定義MessageGroupProcessor bean(名為myOutputProcessor)的結果。如果主執行緒超時,並且觸發器稍後到達,您可以配置一個丟棄通道,將延遲觸發器傳送到該通道。如果請求訊息未能及時到達,觸發訊息也會被丟棄。

有關此元件的示例,請參閱屏障示例應用程式

© . This site is unofficial and not affiliated with VMware.