訊息閘道器

閘道器隱藏了 Spring Integration 提供的訊息處理 API。它使你的應用程式業務邏輯無需瞭解 Spring Integration API。透過使用通用閘道器,你的程式碼只需與一個簡單介面互動。

引入 GatewayProxyFactoryBean

如前所述,最好不依賴 Spring Integration API,包括閘道器類。因此,Spring Integration 提供了 GatewayProxyFactoryBean,它可以為任何介面生成代理,並在內部呼叫如下所示的閘道器方法。透過使用依賴注入,你可以將該介面暴露給你的業務方法。

以下示例展示了一個可用於與 Spring Integration 互動的介面

public interface Cafe {

    void placeOrder(Order order);

}

閘道器 XML Namespace 支援

還提供了 Namespace 支援。它允許你將介面配置為服務,如下例所示

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

定義此配置後,cafeService 現在可以注入到其他 bean 中,並且呼叫 Cafe 介面該代理例項上的方法的程式碼無需瞭解 Spring Integration API。請參閱“示例”附錄,其中包含一個使用 gateway 元素的示例(在 Cafe demo 中)。

前述配置中的預設設定適用於閘道器介面上的所有方法。如果未指定回覆超時時間,則呼叫執行緒將等待回覆 30 秒。請參閱閘道器在沒有回覆時的行為

可以針對單個方法覆蓋預設設定。請參閱使用註解和 XML 配置閘道器

設定預設回覆通道

通常,你無需指定 default-reply-channel,因為閘道器會自動建立一個臨時的、匿名的回覆通道,並在其中監聽回覆。但是,在某些情況下,你可能需要定義 default-reply-channel(或對於介面卡閘道器,如 HTTP、JMS 等,定義 reply-channel)。

為了提供一些背景資訊,我們簡要討論一下閘道器的一些內部工作原理。閘道器會建立一個臨時的點對點回復通道。它是匿名的,並以 replyChannel 名稱新增到訊息頭中。當提供一個顯式的 default-reply-channel(遠端介面卡閘道器使用 reply-channel)時,你可以指向一個釋出-訂閱通道,之所以這樣命名是因為你可以向其新增多個訂閱者。在內部,Spring Integration 在臨時 replyChannel 和顯式定義的 default-reply-channel 之間建立一個橋接。

假設你希望回覆不僅傳送到閘道器,還發送到其他一些消費者。在這種情況下,你需要兩件事

  • 一個你可以訂閱的命名通道

  • 該通道是一個釋出-訂閱通道

閘道器使用的預設策略不能滿足這些需求,因為新增到訊息頭中的回覆通道是匿名的點對點通道。這意味著沒有其他訂閱者可以獲取其控制代碼,即使可以,該通道的點對點行為也只會導致只有一個訂閱者收到訊息。透過定義 default-reply-channel,你可以指向一個你選擇的通道。在這種情況下,它是一個 publish-subscribe-channel。閘道器會從該通道橋接到儲存在訊息頭中的臨時匿名回覆通道。

你可能還希望透過攔截器(例如,wiretap)顯式提供一個回覆通道用於監控或審計。要配置通道攔截器,你需要一個命名通道。

從 5.4 版本開始,當閘道器方法的返回型別為 void 時,如果未顯式提供 replyChannel 頭,框架會將其填充為 nullChannel bean 引用。這允許下游流可能產生的任何回覆都被丟棄,滿足單向閘道器的契約。

使用註解和 XML 配置閘道器

考慮以下示例,它在之前的 Cafe 介面示例基礎上添加了 @Gateway 註解

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

@Header 註解允許你新增將被解釋為訊息頭的值,如下例所示

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

如果你傾向於使用 XML 方法來配置閘道器方法,你可以在閘道器配置中新增 method 元素,如下例所示

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

你還可以使用 XML 為每個方法呼叫提供獨立的頭資訊。這在你想設定的頭是靜態的,並且不希望透過使用 @Header 註解將其嵌入到閘道器的方法簽名中時非常有用。例如,在貸款代理示例中,我們想根據啟動的請求型別(單個報價或所有報價)來影響貸款報價的聚合方式。透過評估呼叫了哪個閘道器方法來確定請求型別,雖然可能,但會違反關注點分離正規化(方法是 Java 工件)。然而,在訊息處理架構中,在訊息頭中表達你的意圖(元資訊)是很自然的。以下示例展示瞭如何為兩個方法新增不同的訊息頭

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

在前面的示例中,根據閘道器的方法為 'RESPONSE_TYPE' 頭設定了不同的值。

例如,如果你在 <int:method/> 中以及在 @Gateway 註解中都指定了 requestChannel,則註解的值優先。
如果在 XML 中指定了一個無引數閘道器,並且介面方法同時具有 @Payload@Gateway 註解(在 <int:method/> 元素中帶有 payloadExpressionpayload-expression),則 @Payload 值將被忽略。

表示式和“全域性”頭資訊

<header/> 元素支援使用 expression 作為 value 的替代方案。SpEL 表示式會被評估以確定頭的值。從 5.2 版本開始,評估上下文的 #root 物件是一個 MethodArgsHolder,具有 getMethod()getArgs() 訪問器。例如,如果你希望根據簡單的方法名進行路由,你可以新增一個帶有以下表達式的頭:method.name

java.reflect.Method 是不可序列化的。如果以後序列化訊息,帶有表示式 method 的頭將丟失。因此,在這些情況下,你可能希望使用 method.namemethod.toString()toString() 方法提供了該方法的字串表示形式,包括引數和返回型別。

從 3.0 版本開始,可以定義 <default-header/> 元素,以便將頭新增到閘道器生成的所有訊息中,無論呼叫了哪個方法。為特定方法定義的頭優先於預設頭。此處為特定方法定義的頭會覆蓋服務介面中的任何 @Header 註解。但是,預設頭不會覆蓋服務介面中的任何 @Header 註解。

閘道器現在也支援 default-payload-expression,它適用於所有方法(除非被覆蓋)。

將方法引數對映到訊息

使用上一節中的配置技術可以控制方法引數如何對映到訊息元素(載荷和頭)。在沒有使用顯式配置時,會使用某些約定來執行對映。在某些情況下,這些約定無法確定哪個引數是載荷,哪個應該對映到頭。考慮以下示例

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

在第一種情況下,約定是將第一個引數對映到載荷(只要它不是 Map),第二個引數的內容成為頭。

在第二種情況下(或者當引數 thing1 的實參是 Map 時),框架無法確定哪個引數應該作為載荷。因此,對映會失敗。這通常可以透過使用 payload-expression@Payload 註解或 @Headers 註解來解決。

或者(以及當約定失效時),你可以完全負責將方法呼叫對映到訊息。為此,實現一個 MethodArgsMessageMapper 並透過使用 mapper 屬性將其提供給 <gateway/>。該 mapper 對映一個 MethodArgsHolder,它是一個簡單的類,封裝了 java.reflect.Method 例項和一個包含引數的 Object[]。提供自定義 mapper 時,閘道器上不允許使用 default-payload-expression 屬性和 <default-header/> 元素。類似地,任何 <method/> 元素上也不允許使用 payload-expression 屬性和 <header/> 元素。

對映方法引數

以下示例展示瞭如何將方法引數對映到訊息,並展示了一些無效配置的示例

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("args[0] + args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  (1)

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
1 請注意,在此示例中,SpEL 變數 #this 指的是引數,在此處為 s 的值。

相應的 XML 配置略有不同,因為方法引數沒有 #this 上下文。然而,表示式可以透過使用 MethodArgsHolder 根物件的 args 屬性來引用方法引數(有關詳細資訊,請參閱表示式和“全域性”頭資訊),如下例所示

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
  <int:method name="send3" payload-expression="method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="args[2].toUpperCase()"/>
  </int:method>
</int:gateway>

@MessagingGateway 註解

從 4.0 版本開始,閘道器服務介面可以使用 @MessagingGateway 註解進行標記,而無需定義 <gateway /> XML 元素進行配置。以下兩個示例比較了配置同一閘道器的兩種方法

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}
與 XML 版本類似,當 Spring Integration 在元件掃描期間發現這些註解時,它會使用其訊息處理基礎設施建立 proxy 實現。要執行此掃描並在應用程式上下文中註冊 BeanDefinition,請將 @IntegrationComponentScan 註解新增到 @Configuration 類。標準的 @ComponentScan 基礎設施不處理介面。因此,我們引入了自定義的 @IntegrationComponentScan 邏輯來查詢介面上的 @MessagingGateway 註解併為其註冊 GatewayProxyFactoryBean 例項。另請參閱註解支援

除了 @MessagingGateway 註解之外,你還可以使用 @Profile 註解標記服務介面,以便在該 Profile 未啟用時避免建立 bean。

從 6.0 版本開始,帶有 @MessagingGateway 的介面也可以標記 @Primary 註解以實現相應的配置邏輯,這與任何 Spring @Component 定義一樣。

從 6.0 版本開始,@MessagingGateway 介面可以在標準的 Spring @Import 配置中使用。這可以作為 @IntegrationComponentScan 或手動 AnnotationGatewayProxyFactoryBean bean 定義的替代方案。

自 6.0 版本以來,@MessagingGateway 使用 @MessageEndpoint 進行了元註解,並且 name() 屬性本質上是 @Compnent.value() 的別名。這樣,閘道器代理的 bean 名稱生成策略就與掃描和匯入元件的標準 Spring 註解配置重新對齊了。可以透過 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR 全域性覆蓋預設的 AnnotationBeanNameGenerator,或透過 @IntegrationComponentScan.nameGenerator() 屬性覆蓋。

如果你沒有 XML 配置,則至少在一個 @Configuration 類上需要 @EnableIntegration 註解。有關詳細資訊,請參閱配置和 @EnableIntegration

呼叫無引數方法

當呼叫閘道器介面上沒有任何引數的方法時,預設行為是從 PollableChannel 接收 Message

然而,有時你可能希望觸發無引數方法,以便與不需要使用者提供的引數的下游其他元件互動,例如觸發無引數的 SQL 呼叫或儲存過程。

為了實現傳送和接收語義,你必須提供載荷。為了生成載荷,介面上的方法引數並非必需。你可以使用 @Payload 註解,或者在 XML 的 method 元素上使用 payload-expression 屬性。以下列表包含了一些載荷可以是哪些內容的示例

  • 字面量字串

  • #gatewayMethod.name

  • new java.util.Date()

  • @someBean.someMethod() 的返回值

以下示例展示瞭如何使用 @Payload 註解

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

你也可以使用 @Gateway 註解。

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}
如果兩個註解都存在(並且提供了 payloadExpression),則 @Gateway 優先。

如果一個方法沒有引數、沒有返回值,但包含一個載荷表示式,它將被視為僅傳送操作。

呼叫 default 方法

閘道器代理的介面也可以有 default 方法,從 5.3 版本開始,框架會向代理注入一個 DefaultMethodInvokingMethodInterceptor,用於使用 java.lang.invoke.MethodHandle 方法呼叫 default 方法,而不是透過代理方式。JDK 中的介面,例如 java.util.function.Function,仍然可以用於閘道器代理,但由於針對 JDK 類的 MethodHandles.Lookup 例項化的內部 Java 安全原因,它們的 default 方法無法呼叫。這些方法也可以透過在方法上顯式使用 @Gateway 註解,或者在 @MessagingGateway 註解或 <gateway> XML 元件上使用 proxyDefaultMethods 來進行代理(會丟失其實現邏輯,同時恢復之前的閘道器代理行為)。

錯誤處理

閘道器呼叫可能導致錯誤。預設情況下,下游發生的任何錯誤都會在閘道器方法呼叫時“原樣”重新丟擲。例如,考慮以下簡單流程

gateway -> service-activator

如果服務啟用器呼叫的服務丟擲 MyException(例如),框架會將其包裝在 MessagingException 中,並將傳遞給服務啟用器的訊息附加到 failedMessage 屬性中。因此,框架執行的任何日誌記錄都具有完整的失敗上下文。預設情況下,當閘道器捕獲異常時,MyException 會被解包並拋給呼叫者。你可以在閘道器方法宣告上配置一個 throws 子句來匹配 cause 鏈中特定的異常型別。例如,如果你想捕獲包含下游錯誤原因所有訊息資訊的整個 MessagingException,你的閘道器方法應該類似於以下內容

public interface MyGateway {

    void performProcess() throws MessagingException;

}

由於我們鼓勵 POJO 程式設計,你可能不想讓呼叫者接觸訊息處理基礎設施。

如果你的閘道器方法沒有 throws 子句,閘道器會遍歷 cause 樹,查詢不是 MessagingExceptionRuntimeException。如果找不到,框架會丟擲 MessagingException。如果在前面討論中的 MyException 的 cause 是 SomeOtherException,並且你的方法 throws SomeOtherException,閘道器會進一步解包並將其拋給呼叫者。

當閘道器宣告時沒有指定 service-interface,將使用內部框架介面 RequestReplyExchanger

考慮以下示例

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

在 5.0 版本之前,這個 exchange 方法沒有 throws 子句,因此異常會被解包。如果你使用此介面並想恢復之前的解包行為,請改用自定義的 service-interface 或自行訪問 MessagingExceptioncause

然而,你可能希望記錄錯誤而不是傳播它,或者你可能希望將異常視為一個有效的回覆(透過將其對映到符合呼叫者理解的某種“錯誤訊息”契約的訊息)。為了實現這一點,閘道器透過包含對 error-channel 屬性的支援,提供了專門用於錯誤的通道支援。在以下示例中,一個 'transformer' 從 Exception 建立一個回覆 Message

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer 可以是一個簡單的 POJO,它知道如何建立預期的錯誤響應物件。這將成為傳送回撥用者的載荷。如果需要,你可以在這樣的“錯誤流”中執行更多複雜的操作。它可能涉及路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter)、過濾器等。然而,大多數情況下,一個簡單的 'transformer' 應該足夠了。

或者,你可能只希望記錄異常(或將其非同步傳送到某個地方)。如果你提供一個單向流,則不會有任何內容傳送回撥用者。如果你想完全抑制異常,可以提供對全域性 nullChannel 的引用(本質上是 /dev/null 方法)。最後,如上所述,如果未定義 error-channel,則異常會照常傳播。

當你使用 @MessagingGateway 註解時(請參閱@MessagingGateway 註解),你可以使用 errorChannel 屬性。

從 5.0 版本開始,當你使用返回型別為 void 的閘道器方法(單向流)時,提供的 error-channel 引用會被填充到每條傳送訊息的標準 errorChannel 頭中。此功能允許基於標準 ExecutorChannel 配置(或 QueueChannel)的下游非同步流覆蓋預設的全域性 errorChannel 異常傳送行為。之前,你必須使用 @GatewayHeader 註解或 <header> 元素手動指定 errorChannel 頭。對於具有非同步流的 void 方法,error-channel 屬性會被忽略。相反,錯誤訊息會被髮送到預設的 errorChannel

透過簡單的 POJO 閘道器暴露訊息系統帶來了好處,但“隱藏”底層訊息系統的現實也付出了代價,因此您應該考慮某些事情。我們希望 Java 方法儘快返回,而不是在呼叫者等待它返回(無論是 void、返回值還是丟擲的 Exception)時無限期地掛起。當常規方法用作訊息系統前面的代理時,我們必須考慮底層訊息傳遞潛在的非同步性質。這意味著閘道器發起的訊息可能會被過濾器丟棄,永遠無法到達負責生成回覆的元件。某些服務啟用器方法可能會導致異常,從而沒有提供回覆(因為我們不生成空訊息)。換句話說,多種情況可能導致回覆訊息永遠不會到達。這在訊息系統中是非常自然的。然而,考慮一下這對閘道器方法的影響。閘道器方法的輸入引數被封裝到訊息中併發送到下游。回覆訊息將被轉換為閘道器方法的返回值。因此,您可能希望確保對於每次閘道器呼叫,總會有一個回覆訊息。否則,如果 reply-timeout 設定為負值,您的閘道器方法可能永遠不會返回並無限期掛起。處理這種情況的一種方法是使用非同步閘道器(本節稍後解釋)。另一種處理方法是依靠預設的 reply-timeout,例如 30 秒。這樣,閘道器就不會比 reply-timeout 指定的時間掛起更長時間,並且如果超時到期則返回 'null'。最後,您可能想考慮在服務啟用器上設定下游標誌,例如 'requires-reply',或者在過濾器上設定 'throw-exceptions-on-rejection'。本章的最後一節將更詳細地討論這些選項。
如果下游流返回一個 ErrorMessage,其 payload(一個 Throwable)將被視為常規的下游錯誤。如果配置了 error-channel,則會發送到錯誤流。否則,payload 會拋給閘道器的呼叫者。類似地,如果在 error-channel 上的錯誤流返回一個 ErrorMessage,其 payload 會拋給呼叫者。這也適用於任何帶有 Throwable payload 的訊息。這在非同步情況下很有用,當您需要將 Exception 直接傳播給呼叫者時。為此,您可以返回一個 Exception(作為某個服務的 reply)或丟擲它。通常,即使是非同步流,框架也會負責將下游流丟擲的異常傳播回閘道器。TCP Client-Server Multiplex 示例演示了將異常返回給呼叫者的兩種技術。它透過使用帶有 group-timeoutaggregator(參見 Aggregator and Group Timeout)並在丟棄流上返回 MessagingTimeoutException 來模擬套接字 IO 錯誤到等待執行緒。

閘道器超時

閘道器有兩個超時屬性:requestTimeoutreplyTimeout。請求超時僅在通道可能阻塞(例如,已滿的有界 QueueChannel)時適用。replyTimeout 的值是閘道器等待回覆或返回 null 的時間長度。它預設為無限。

超時可以設定為閘道器上所有方法的預設值(defaultRequestTimeoutdefaultReplyTimeout),或在 MessagingGateway 介面註解上設定。單個方法可以覆蓋這些預設值(在 <method/> 子元素中)或在 @Gateway 註解上設定。

從 5.0 版本開始,超時可以定義為表示式,如下例所示

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

評估上下文具有一個 BeanResolver(使用 @someBean 引用其他 bean),並且 `#root` 物件的 args 陣列屬性可用。有關此 root 物件的更多資訊,請參見 表示式與“全域性”頭部。使用 XML 配置時,超時屬性可以是 long 值或 SpEL 表示式,如下例所示

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="args[0]"
                      request-timeout="1000"
                      reply-timeout="args[1]">
</method>

非同步閘道器

作為一種模式,訊息閘道器提供了一種很好的方式來隱藏訊息傳遞特定的程式碼,同時仍然暴露訊息系統的全部功能。如 前所述GatewayProxyFactoryBean 提供了一種便捷的方式來透過服務介面暴露代理,從而為您提供基於 POJO 的訊息系統訪問(基於您自己領域中的物件、基本型別/字串或其他物件)。然而,當透過返回值的簡單 POJO 方法暴露閘道器時,這意味著對於每個請求訊息(方法呼叫時生成),都必須有一個回覆訊息(方法返回時生成)。由於訊息系統本質上是非同步的,您可能無法總是保證“每個請求總會有一個回覆”的契約。Spring Integration 2.0 引入了對非同步閘道器的支援,它提供了一種便捷的方式來啟動流程,當您可能不知道是否期望回覆或回覆需要多久才能到達時。

為了處理這些型別的場景,Spring Integration 使用 java.util.concurrent.Future 例項來支援非同步閘道器。

從 XML 配置來看,沒有任何變化,您仍然以與定義常規閘道器相同的方式定義非同步閘道器,如下例所示

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

然而,閘道器介面(服務介面)略有不同,如下所示

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

如上例所示,閘道器方法的返回型別是 Future。當 GatewayProxyFactoryBean 看到閘道器方法的返回型別是 Future 時,它會立即透過使用 AsyncTaskExecutor 切換到非同步模式。區別僅此而已。呼叫此類方法總是立即返回一個 Future 例項。然後您可以按照自己的節奏與 Future 互動以獲取結果、取消等等。此外,與使用任何其他 Future 例項一樣,呼叫 get() 可能會暴露超時、執行異常等。下面的示例展示瞭如何使用非同步閘道器返回的 Future

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

有關更詳細的示例,請參見 Spring Integration 示例中的 async-gateway 示例。

AsyncTaskExecutor

預設情況下,當為返回型別為 Future 的任何閘道器方法提交內部 AsyncInvocationTask 例項時,GatewayProxyFactoryBean 使用 org.springframework.core.task.SimpleAsyncTaskExecutor。然而,`` 元素的配置中的 async-executor 屬性允許您提供對 Spring 應用程式上下文中可用的任何 java.util.concurrent.Executor 實現的引用。

(預設的)SimpleAsyncTaskExecutor 支援 FutureCompletableFuture 返回型別。參見 CompletableFuture。即使有預設執行器,提供一個外部執行器也通常很有用,這樣您就可以在日誌中識別其執行緒(使用 XML 時,執行緒名稱基於執行器的 bean 名稱),如下例所示

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果您希望返回不同的 Future 實現,您可以提供自定義執行器或完全停用執行器,並在下游流的回覆訊息 payload 中返回 Future。要停用執行器,請在 GatewayProxyFactoryBean 中將其設定為 null(透過使用 setAsyncTaskExecutor(null))。使用 XML 配置閘道器時,使用 async-executor=""。使用 `@MessagingGateway` 註解配置時,使用類似於以下的示例程式碼

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}
如果返回型別是特定的具體 Future 實現或配置的執行器不支援的某個其他子介面,則流程將在呼叫者的執行緒上執行,並且流程必須在回覆訊息 payload 中返回所需的型別。

CompletableFuture

從 4.2 版本開始,閘道器方法現在可以返回 CompletableFuture<?>。返回此型別時有兩種操作模式

  • 當提供了非同步執行器且返回型別恰好是 CompletableFuture(不是子類)時,框架會在執行器上執行任務,並立即向呼叫者返回一個 CompletableFuture。使用 CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) 建立 future。

  • 當非同步執行器明確設定為 null 且返回型別是 CompletableFutureCompletableFuture 的子類時,流程會在呼叫者的執行緒上呼叫。在這種情況下,期望下游流返回適當型別的 CompletableFuture

使用場景

在以下場景中,呼叫者執行緒會立即返回一個 CompletableFuture<Invoice>,當下遊流回復閘道器(帶有 Invoice 物件)時,該 future 會完成。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

在以下場景中,當下遊流將 CompletableFuture<Invoice> 作為回覆閘道器的 payload 提供時,呼叫者執行緒會返回該 CompletableFuture<Invoice>。當發票準備好時,必須由其他某個程序完成該 future。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

在以下場景中,當下遊流將 CompletableFuture<Invoice> 作為回覆閘道器的 payload 提供時,呼叫者執行緒會返回該 CompletableFuture<Invoice>。當發票準備好時,必須由其他某個程序完成該 future。如果啟用了 DEBUG 日誌記錄,則會發出一條日誌條目,指示此場景無法使用非同步執行器。

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

CompletableFuture 例項可用於對回覆執行額外的操作,如下例所示

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

Reactor Mono

從 5.0 版本開始,GatewayProxyFactoryBean 允許在閘道器介面方法中使用 Project Reactor,使用 Mono<T> 返回型別。內部的 AsyncInvocationTask 被包裝在 Mono.fromCallable() 中。

Mono 可用於稍後檢索結果(類似於 Future<?>),或者您可以透過分發器在使用者的 Consumer 中消費它,當結果返回到閘道器時。

框架不會立即重新整理 Mono。因此,在閘道器方法返回之前,底層的訊息流不會啟動(這與 Future<?> Executor 任務不同)。當 Mono 被訂閱時,流程開始。或者,Mono(作為一個“可組合物件”)可能是 Reactor 流的一部分,此時 subscribe() 與整個 Flux 相關聯。下面的示例展示瞭如何使用 Project Reactor 建立閘道器
@MessagingGateway
public interface TestGateway {

    @Gateway(requestChannel = "multiplyChannel")
    Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
    return value * 2;
}

這種閘道器可以在處理資料 Flux 的某個服務中使用

@Autowired
TestGateway testGateway;

public void hadnleFlux() {
    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(System.out::println);
}

使用 Project Reactor 的另一個示例是一個簡單的回撥場景,如下例所示

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

呼叫執行緒繼續執行,當流程完成時會呼叫 handleInvoice()

另請參見 Kotlin 協程 以獲取更多資訊。

下游流返回非同步型別

如上文 AsyncTaskExecutor 部分所述,如果您希望某個下游元件返回帶有非同步 payload(FutureMono 等)的訊息,您必須明確將非同步執行器設定為 null(使用 XML 配置時設定為 "")。然後流程會在呼叫執行緒上呼叫,並且可以稍後檢索結果。

非同步 void 返回型別

訊息閘道器方法可以這樣宣告

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

但下游異常不會傳播回撥用者。為了確保下游流程呼叫和異常傳播到呼叫者的非同步行為,從 6.0 版本開始,框架支援 `Future<Void>` 和 `Mono<Void>` 返回型別。用例類似於之前描述的簡單 `void` 返回型別的“傳送即忘”行為,但不同之處在於流程執行是非同步發生的,並且返回的 `Future`(或 `Mono`)會根據 `send` 操作結果完成,結果為 `null` 或異常。

如果 Future<Void> 是下游流程的確切回覆,則閘道器的 asyncExecutor 選項必須設定為 null(對於 @MessagingGateway 配置為 AnnotationConstants.NULL),並且 send 部分在生產者執行緒上執行。回覆取決於下游流程配置。這樣,正確產生 Future<Void> 回覆的任務就取決於目標應用程式。Mono 的用例已經超出了框架的執行緒控制範圍,因此將 asyncExecutor 設定為 null 沒有意義。作為請求-回覆閘道器操作結果的 Mono<Void> 必須配置為閘道器方法的 Mono<?> 返回型別。

沒有回覆到達時的閘道器行為

前所述,閘道器提供了一種便捷的方式,透過 POJO 方法呼叫與訊息系統互動。然而,通常期望總是返回(即使帶有 Exception)的典型方法呼叫,可能並非總是與訊息交換一一對應(例如,回覆訊息可能不會到達——這相當於方法沒有返回)。

本節其餘部分涵蓋了各種場景以及如何使閘道器的行為更可預測。可以配置某些屬性來使同步閘道器的行為更可預測,但其中一些可能並不總是按您預期的方式工作。其中之一是 reply-timeout(方法級別)或 default-reply-timeout(閘道器級別)。我們檢查 reply-timeout 屬性,以瞭解它在各種場景中如何影響和不能影響同步閘道器的行為。我們檢查單執行緒場景(所有下游元件透過直接通道連線)和多執行緒場景(例如,下游某個地方可能有一個可輪詢或執行器通道,打破了單執行緒邊界)。

下游長時間執行的程序

同步閘道器,單執行緒

如果下游元件仍在執行(可能是因為無限迴圈或慢速服務),設定 reply-timeout 無效,閘道器方法呼叫直到下游服務退出(透過返回或丟擲異常)才會返回。

同步閘道器,多執行緒

如果下游元件在多執行緒訊息流中仍在執行(可能是因為無限迴圈或慢速服務),設定 reply-timeout 會產生效果,允許閘道器方法呼叫在達到超時後返回,因為 GatewayProxyFactoryBean 會在回覆通道上輪詢,等待訊息直到超時過期。然而,如果在實際回覆生成之前達到了超時,可能導致閘道器方法返回 'null'。您應該理解,回覆訊息(如果生成)是在閘道器方法呼叫可能已經返回之後傳送到回覆通道的,因此您必須意識到這一點並在設計流程時考慮在內。

另請參閱 errorOnTimeout 屬性,以便在發生超時時丟擲 MessageTimeoutException 而不是返回 null

下游元件返回 'null'

同步閘道器 — 單執行緒

如果下游元件返回 'null' 且 reply-timeout 配置為負值,閘道器方法呼叫將無限期掛起,除非在可能返回 'null' 的下游元件(例如,服務啟用器)上設定了 requires-reply 屬性。在這種情況下,會丟擲異常並傳播到閘道器。

同步閘道器 — 多執行緒

行為與前一種情況相同。

下游元件返回簽名是 'void',而閘道器方法簽名是非 void

同步閘道器 — 單執行緒

如果下游元件返回 'void' 且 reply-timeout 配置為負值,閘道器方法呼叫將無限期掛起。

同步閘道器 — 多執行緒

行為與前一種情況相同。

下游元件導致執行時異常

同步閘道器 — 單執行緒

如果下游元件丟擲執行時異常,該異常將透過錯誤訊息傳播回閘道器並重新丟擲。

同步閘道器 — 多執行緒

行為與前一種情況相同。

您應該理解,預設情況下,reply-timeout 是無界的。因此,如果您將 reply-timeout 設定為負值,您的閘道器方法呼叫可能會無限期掛起。因此,為確保您分析您的流程,並且即使存在這些場景之一發生的微小可能性,您也應該將 reply-timeout 屬性設定為一個“安全的”值。預設是 30 秒。更好的是,您可以將下游元件的 requires-reply 屬性設定為 'true' 以確保及時響應,當該下游元件內部返回 null 時,會立即丟擲異常產生及時響應。然而,您還應該意識到,有些場景(參見 第一個)中 reply-timeout 沒有幫助。這意味著分析您的訊息流並決定何時使用同步閘道器而不是非同步閘道器也很重要。如 前所述,後一種情況就是定義返回 Future 例項的閘道器方法。這樣您就可以保證收到返回值,並且對呼叫的結果有更細粒度的控制。此外,在使用路由時,您應該記住,如果路由無法解析特定通道,將 resolution-required 屬性設定為 'true' 會導致路由丟擲異常。同樣,在使用過濾器時,您可以設定 throw-exception-on-rejection 屬性。在這兩種情況下,生成的流程表現得就像包含了一個帶有 'requires-reply' 屬性的服務啟用器。換句話說,這有助於確保閘道器方法呼叫的及時響應。
您應該理解,計時器線上程返回到閘道器時啟動——即當流程完成或訊息被傳遞到另一個執行緒時。此時,呼叫執行緒開始等待回覆。如果流程完全同步,則回覆會立即可用。對於非同步流程,執行緒最多等待到此時間。

從 6.2 版本開始,內部 MethodInvocationGatewayMessagingGatewaySupport 的擴充套件)的 errorOnTimeout 屬性在 @MessagingGatewayGatewayEndpointSpec 上暴露。此選項的含義與 端點摘要 章末尾解釋的任何入站閘道器完全相同。換句話說,將此選項設定為 true 將導致在接收超時耗盡時,傳送-接收閘道器操作丟擲 MessageTimeoutException,而不是返回 null

有關透過 IntegrationFlow 定義閘道器的選項,請參見 Java DSL 章中的 IntegrationFlow 作為閘道器