運算子 gateway()
IntegrationFlow 定義中的 gateway() 運算子是一種特殊的 Service Activator 實現,用於透過其輸入通道呼叫其他端點或整合流並等待回覆。從技術上講,它與 <chain> 定義中巢狀的 <gateway> 元件扮演相同的角色(參見 從鏈中呼叫鏈),並使流更清晰、更直接。從邏輯上和業務角度看,它是一個訊息閘道器,允許在目標整合解決方案的不同部分之間分發和重用功能(參見 訊息閘道器)。此運算子有多種過載以實現不同目標:
-
gateway(String requestChannel):透過其名稱向某個端點的輸入通道傳送訊息; -
gateway(MessageChannel requestChannel):透過直接注入向某個端點的輸入通道傳送訊息; -
gateway(IntegrationFlow flow):向提供的IntegrationFlow的輸入通道傳送訊息。
所有這些都帶有一個包含第二個 Consumer<GatewayEndpointSpec> 引數的變體,用於配置目標 GatewayMessageHandler 和相應的 AbstractEndpoint。此外,基於 IntegrationFlow 的方法允許呼叫現有 IntegrationFlow bean,或者透過就地 lambda 宣告流為子流以實現 IntegrationFlow 函式式介面,或者將其提取到 private 方法中以實現更清晰的程式碼風格。
@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流並非總返回回覆,您應將 requestTimeout 設定為 0,以防止呼叫執行緒無限期掛起。在這種情況下,流將在該點結束,並且執行緒將被釋放以進行後續工作。 |
從 6.5 版本開始,此 gateway() 運算子完全支援 async(true) 行為。在內部,為 GatewayProxyFactoryBean 提供了 AsyncRequestReplyExchanger 服務介面。由於 AsyncRequestReplyExchanger 契約是 CompletableFuture<Message<?>>,因此整個請求-回覆以非同步方式執行。