運算子 gateway()

IntegrationFlow 定義中的 gateway() 運算子是一種特殊的 Service Activator 實現,用於透過其輸入通道呼叫其他端點或整合流並等待回覆。從技術上講,它與 <chain> 定義中巢狀的 <gateway> 元件扮演相同的角色(參見 從鏈中呼叫鏈),並使流更清晰、更直接。從邏輯上和業務角度看,它是一個訊息閘道器,允許在目標整合解決方案的不同部分之間分發和重用功能(參見 訊息閘道器)。此運算子有多種過載以實現不同目標:

  • gateway(String requestChannel):透過其名稱向某個端點的輸入通道傳送訊息;

  • gateway(MessageChannel requestChannel):透過直接注入向某個端點的輸入通道傳送訊息;

  • gateway(IntegrationFlow flow):向提供的 IntegrationFlow 的輸入通道傳送訊息。

所有這些都帶有一個包含第二個 Consumer<GatewayEndpointSpec> 引數的變體,用於配置目標 GatewayMessageHandler 和相應的 AbstractEndpoint。此外,基於 IntegrationFlow 的方法允許呼叫現有 IntegrationFlow bean,或者透過就地 lambda 宣告流為子流以實現 IntegrationFlow 函式式介面,或者將其提取到 private 方法中以實現更清晰的程式碼風格。

@Bean
IntegrationFlow someFlow() {
        return IntegrationFlow
                .from(...)
                .gateway(subFlow())
                .handle(...)
                .get();
}

private static IntegrationFlow subFlow() {
        return f -> f
                .scatterGather(s -> s.recipientFlow(...),
                        g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流並非總返回回覆,您應將 requestTimeout 設定為 0,以防止呼叫執行緒無限期掛起。在這種情況下,流將在該點結束,並且執行緒將被釋放以進行後續工作。

從 6.5 版本開始,此 gateway() 運算子完全支援 async(true) 行為。在內部,為 GatewayProxyFactoryBean 提供了 AsyncRequestReplyExchanger 服務介面。由於 AsyncRequestReplyExchanger 契約是 CompletableFuture<Message<?>>,因此整個請求-回覆以非同步方式執行。

© . This site is unofficial and not affiliated with VMware.