執行緒屏障

有時,我們需要暫停訊息流執行緒,直到發生其他非同步事件。例如,考慮一個將訊息釋出到 RabbitMQ 的 HTTP 請求。我們可能希望在 RabbitMQ 代理確認訊息已接收之前,不對使用者進行回覆。

在 4.2 版本中,Spring Integration 引入了 <barrier/> 元件來實現此目的。底層是 BarrierMessageHandler,它實現了 MessageHandler 介面。該類還實現了 MessageTriggerAction 介面,透過呼叫其 trigger() 方法傳遞訊息,可以釋放 handleRequestMessage() 方法中對應的(如果存在)掛起執行緒。

掛起的執行緒和觸發執行緒透過對訊息呼叫 CorrelationStrategy 進行關聯。當訊息傳送到 input-channel 時,執行緒將被掛起,最多等待 requestTimeout 毫秒,等待相應的觸發訊息。預設的關聯策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 頭資訊。當帶有相同關聯 ID 的觸發訊息到達時,執行緒被釋放。釋放後傳送到 output-channel 的訊息是使用 MessageGroupProcessor 構建的。預設情況下,該訊息是一個包含兩個 payload 的 Collection<?>,並且頭資訊使用 DefaultAggregatingMessageGroupProcessor 合併。

如果 trigger() 方法先被呼叫(或主執行緒超時後),它將掛起,最多等待 triggerTimeout 毫秒,等待掛起訊息的到達。如果您不想掛起觸發執行緒,請考慮將其交給 TaskExecutor 處理,以便由 TaskExecutor 的執行緒來掛起。
在 5.4 版本之前,請求訊息和觸發訊息只有一個 timeout 選項,但在某些用例中,最好對這些操作設定不同的超時時間。因此,引入了 requestTimeouttriggerTimeout 選項。

requires-reply 屬性決定了在觸發訊息到達之前,如果掛起執行緒超時應採取的操作。預設情況下,它為 false,這意味著端點返回 null,流結束,並且執行緒返回給呼叫者。當設定為 true 時,將丟擲 ReplyRequiredException 異常。

您可以透過程式設計方式呼叫 trigger() 方法(使用 bean 名稱 barrier.handler 獲取 bean 引用,其中 barrier 是 barrier 端點的 bean 名稱)。或者,您可以配置一個 <outbound-channel-adapter/> 來觸發釋放。

使用相同的關聯 ID 只能掛起一個執行緒。同一個關聯 ID 可以多次使用,但不能同時用於多個掛起操作。如果第二個執行緒帶著相同的關聯 ID 到達,則會丟擲異常。

以下示例展示瞭如何使用自定義頭資訊進行關聯

  • Java

  • XML

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

根據哪個通道先接收到訊息,傳送訊息到 in 通道的執行緒或傳送訊息到 release 通道的執行緒將最多等待十秒,直到另一條訊息到達。訊息釋放後,out 通道會發送一條訊息,該訊息結合了呼叫名為 myOutputProcessor 的自定義 MessageGroupProcessor bean 的結果。如果主執行緒超時並且觸發訊息稍後才到達,您可以配置一個丟棄通道,將延遲到達的觸發訊息傳送到該通道。如果請求訊息未及時到達,觸發訊息也會被丟棄。

有關此元件的示例,請參閱barrier 示例應用