訊息閘道器

閘道器隱藏了 Spring Integration 提供的訊息 API。它讓你的應用程式業務邏輯無需瞭解 Spring Integration API。透過使用通用閘道器,你的程式碼只與一個簡單的介面互動。

進入 GatewayProxyFactoryBean

如前所述,不依賴 Spring Integration API(包括閘道器類)會很好。因此,Spring Integration 提供了 GatewayProxyFactoryBean,它為任何介面生成一個代理,並在內部呼叫下面所示的閘道器方法。透過使用依賴注入,你可以將介面暴露給你的業務方法。

以下示例展示了一個可用於與 Spring Integration 互動的介面

public interface Cafe {

    void placeOrder(Order order);

}

閘道器 XML 名稱空間支援

還提供了名稱空間支援。它允許你將介面配置為服務,如下例所示

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

透過定義此配置,cafeService 現在可以注入到其他 bean 中,並且呼叫 Cafe 介面的代理例項上的方法的程式碼無需瞭解 Spring Integration API。請參閱“示例”附錄,瞭解使用 gateway 元素的示例(在 Cafe 演示中)。

前面配置中的預設值應用於閘道器介面上的所有方法。如果未指定回覆超時,則呼叫執行緒將等待回覆 30 秒。請參閱沒有回覆到達時的閘道器行為

可以為單個方法覆蓋預設值。請參閱使用註解和 XML 的閘道器配置

設定預設回覆通道

通常,你不需要指定 default-reply-channel,因為閘道器會自動建立一個臨時的匿名回覆通道,並在其中偵聽回覆。但是,某些情況下你可能需要定義一個 default-reply-channel(或介面卡閘道器(如 HTTP、JMS 等)的 reply-channel)。

為了一些背景知識,我們簡要討論一下閘道器的一些內部工作原理。閘道器建立一個臨時點對點回復通道。它是匿名的,並以名稱 replyChannel 新增到訊息頭中。當提供顯式的 default-reply-channel(遠端介面卡閘道器的 reply-channel)時,你可以指向一個釋出-訂閱通道,之所以這樣命名是因為你可以向其新增多個訂閱者。在內部,Spring Integration 在臨時 replyChannel 和顯式定義的 default-reply-channel 之間建立一個橋接。

假設你希望你的回覆不僅傳送到閘道器,還發送到其他一些消費者。在這種情況下,你需要兩件事

  • 一個可以訂閱的命名通道

  • 該通道是釋出-訂閱通道

閘道器使用的預設策略不滿足這些需求,因為新增到頭部中的回覆通道是匿名的點對點通道。這意味著沒有其他訂閱者可以獲得它的控制代碼,而且即使可以,該通道也具有點對點行為,只有一個訂閱者會收到訊息。透過定義 default-reply-channel,你可以指向你選擇的通道。在這種情況下,它是一個 publish-subscribe-channel。閘道器從它建立一個橋接到儲存在頭部中的臨時匿名回覆通道。

你可能還希望透過攔截器(例如,監聽器)顯式提供回覆通道以進行監視或審計。要配置通道攔截器,你需要一個命名通道。

從 5.4 版本開始,當閘道器方法的返回型別為 void 時,如果未明確提供 replyChannel 頭,則框架會將 replyChannel 頭填充為 nullChannel bean 引用。這允許丟棄來自下游流的任何可能的回覆,從而滿足單向閘道器契約。

使用註解和 XML 的閘道器配置

考慮以下示例,它透過新增 @Gateway 註解擴充套件了前面的 Cafe 介面示例

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

@Header 註解允許你新增被解釋為訊息頭的值,如下例所示

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

如果你喜歡使用 XML 方法來配置閘道器方法,你可以將 method 元素新增到閘道器配置中,如下例所示

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

你還可以使用 XML 為每個方法呼叫提供單獨的頭部。如果需要設定的頭部本質上是靜態的,並且你不想透過使用 @Header 註解將它們嵌入到閘道器的方法簽名中,這會很有用。例如,在貸款經紀人示例中,我們希望根據發起的請求型別(單引號或所有引號)影響貸款報價的聚合方式。透過評估呼叫哪個閘道器方法來確定請求型別,儘管可能,但會違反關注點分離範例(該方法是一個 Java 工件)。但是,在訊息架構中,在訊息頭部中表達你的意圖(元資訊)是很自然的。以下示例展示瞭如何為兩個方法新增不同的訊息頭部

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

在前面的例子中,'RESPONSE_TYPE' 頭部的不同值是根據閘道器方法設定的。

如果你在 <int:method/>@Gateway 註解中都指定了 requestChannel,則註解值優先。
如果在 XML 中指定了無引數閘道器,並且介面方法同時具有 @Payload@Gateway 註解(帶有 payloadExpression<int:method/> 元素中的 payload-expression),則 @Payload 值將被忽略。

表示式和“全域性”頭部

<header/> 元素支援 expression 作為 value 的替代。SpEL 表示式被評估以確定頭部的值。從 5.2 版本開始,評估上下文的 #root 物件是一個帶有 getMethod()getArgs() 訪問器的 MethodArgsHolder。例如,如果你希望根據簡單的方法名進行路由,你可以新增一個帶有以下表達式的頭部:method.name

java.reflect.Method 是不可序列化的。如果以後序列化訊息,帶有 method 表示式的頭部將丟失。因此,在這些情況下,你可能希望使用 method.namemethod.toString()toString() 方法提供了方法的 String 表示,包括引數和返回型別。

自 3.0 版本以來,可以定義 <default-header/> 元素來為閘道器生成的所有訊息新增頭部,無論呼叫了哪個方法。為方法定義的特定頭部優先於預設頭部。此處為方法定義的特定頭部將覆蓋服務介面中的任何 @Header 註解。但是,預設頭部不會覆蓋服務介面中的任何 @Header 註解。

閘道器現在還支援 default-payload-expression,它應用於所有方法(除非被覆蓋)。

將方法引數對映到訊息

使用上一節中的配置技術可以控制方法引數如何對映到訊息元素(有效載荷和頭部)。當不使用顯式配置時,將使用某些約定來執行對映。在某些情況下,這些約定無法確定哪個引數是有效載荷,哪個應該對映到頭部。考慮以下示例

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

在第一種情況下,約定是將第一個引數對映到有效載荷(只要它不是 Map),第二個引數的內容成為頭部。

在第二種情況(或者當引數 thing1 的引數是 Map 時的第一種情況)下,框架無法確定哪個引數應該是有效載荷。因此,對映失敗。這通常可以透過使用 payload-expression@Payload 註解或 @Headers 註解來解決。

或者(以及當約定失效時),你可以承擔將方法呼叫對映到訊息的全部責任。為此,請實現一個 MethodArgsMessageMapper,並透過使用 mapper 屬性將其提供給 <gateway/>。對映器對映一個 MethodArgsHolder,它是一個簡單的類,封裝了 java.reflect.Method 例項和包含引數的 Object[]。當提供自定義對映器時,閘道器上不允許使用 default-payload-expression 屬性和 <default-header/> 元素。同樣,任何 <method/> 元素上都不允許使用 payload-expression 屬性和 <header/> 元素。

對映方法引數

以下示例展示瞭如何將方法引數對映到訊息,並展示了一些無效配置的示例

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("args[0] + args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  (1)

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
1 請注意,在此示例中,SpEL 變數 #this 指的是引數,在此例中是 s 的值。

XML 等效項看起來有點不同,因為方法引數沒有 #this 上下文。但是,表示式可以使用 MethodArgsHolder 根物件的 args 屬性來引用方法引數(有關更多資訊,請參閱表示式和“全域性”頭部),如下例所示

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
  <int:method name="send3" payload-expression="method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="args[2].toUpperCase()"/>
  </int:method>
</int:gateway>

@MessagingGateway 註解

從 4.0 版本開始,閘道器服務介面可以使用 @MessagingGateway 註解進行標記,而無需定義 <gateway /> xml 元素進行配置。以下兩組示例比較了配置同一閘道器的兩種方法

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}
與 XML 版本類似,當 Spring Integration 在元件掃描期間發現這些註解時,它會使用其訊息傳遞基礎設施建立 proxy 實現。要執行此掃描並在應用程式上下文中註冊 BeanDefinition,請將 @IntegrationComponentScan 註解新增到 @Configuration 類。標準的 @ComponentScan 基礎設施不處理介面。因此,我們引入了自定義的 @IntegrationComponentScan 邏輯來查詢介面上的 @MessagingGateway 註解併為其註冊 GatewayProxyFactoryBean 例項。另請參閱註解支援

除了 @MessagingGateway 註解之外,你還可以使用 @Profile 註解標記服務介面,以避免在未啟用該配置檔案時建立 bean。

從 6.0 版本開始,帶有 @MessagingGateway 的介面也可以用 @Primary 註解標記,以實現其相應的配置邏輯,這與任何 Spring @Component 定義都可能實現。

從 6.0 版本開始,@MessagingGateway 介面可以在標準的 Spring @Import 配置中使用。這可以作為 @IntegrationComponentScan 或手動 AnnotationGatewayProxyFactoryBean bean 定義的替代方案。

6.0 版本以來,@MessagingGateway 使用 @MessageEndpoint 進行元註解,並且 name() 屬性實際上是 @Component.value() 的別名。這樣,閘道器代理的 bean 名稱生成策略與掃描和匯入元件的標準 Spring 註解配置保持一致。預設的 AnnotationBeanNameGenerator 可以透過 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR 或作為 @IntegrationComponentScan.nameGenerator() 屬性進行全域性覆蓋。

如果你沒有 XML 配置,則至少一個 @Configuration 類上需要 @EnableIntegration 註解。有關更多資訊,請參閱配置和 @EnableIntegration

呼叫無引數方法

當呼叫閘道器介面上沒有任何引數的方法時,預設行為是從 PollableChannel 接收 Message

但是,有時你可能希望觸發無引數方法,以便與不需要使用者提供引數的其他下游元件進行互動,例如觸發無引數 SQL 呼叫或儲存過程。

要實現傳送和接收語義,你必須提供一個有效載荷。生成有效載荷不需要介面上的方法引數。你可以使用 @Payload 註解或 XML 中 method 元素上的 payload-expression 屬性。以下列表包含一些有效載荷的示例

  • 文字字串

  • #gatewayMethod.name

  • new java.util.Date()

  • @someBean.someMethod() 的返回值

以下示例展示瞭如何使用 @Payload 註解

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

你也可以使用 @Gateway 註解。

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}
如果兩個註解都存在(並且提供了 payloadExpression),則 @Gateway 獲勝。

如果方法沒有引數也沒有返回值,但包含有效載荷表示式,則將其視為僅傳送操作。

呼叫 default 方法

閘道器代理的介面也可以有 default 方法,從 5.3 版本開始,框架會將 DefaultMethodInvokingMethodInterceptor 注入到代理中,以便使用 java.lang.invoke.MethodHandle 方法呼叫 default 方法,而不是代理。JDK 中的介面,例如 java.util.function.Function,仍然可以用於閘道器代理,但由於 JDK 類的 MethodHandles.Lookup 例項化內部 Java 安全原因,它們的 default 方法無法呼叫。這些方法也可以被代理(失去其實現邏輯,同時恢復以前的閘道器代理行為),方法是在方法上使用顯式 @Gateway 註解,或者在 @MessagingGateway 註解或 <gateway> XML 元件上使用 proxyDefaultMethods

錯誤處理

閘道器呼叫可能導致錯誤。預設情況下,下游發生的任何錯誤都會在閘道器方法呼叫時“原樣”重新丟擲。例如,考慮以下簡單流程

gateway -> service-activator

如果服務啟用器呼叫的服務丟擲 MyException(例如),框架會將其包裝在 MessagingException 中,並將傳遞給服務啟用器的訊息附加到 failedMessage 屬性中。因此,框架執行的任何日誌記錄都具有完整的故障上下文。預設情況下,當閘道器捕獲異常時,MyException 會被解包並拋給呼叫者。你可以在閘道器方法宣告上配置 throws 子句,以匹配原因鏈中的特定異常型別。例如,如果你想捕獲一個帶有所有訊息傳遞資訊(下游錯誤原因)的完整 MessagingException,你的閘道器方法應該類似於以下內容

public interface MyGateway {

    void performProcess() throws MessagingException;

}

由於我們鼓勵 POJO 程式設計,你可能不想讓呼叫者接觸訊息傳遞基礎設施。

如果你的閘道器方法沒有 throws 子句,閘道器會遍歷原因樹,查詢不是 MessagingExceptionRuntimeException。如果找不到,框架會丟擲 MessagingException。如果前面討論的 MyException 的原因是 SomeOtherException,並且你的方法 throws SomeOtherException,則閘道器會進一步解包並將其拋給呼叫者。

當宣告的閘道器沒有 service-interface 時,將使用內部框架介面 RequestReplyExchanger

考慮以下示例

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

在 5.0 版本之前,此 exchange 方法沒有 throws 子句,因此異常會被解包。如果你使用此介面並希望恢復以前的解包行為,請改用自定義 service-interface 或自行訪問 MessagingExceptioncause

但是,你可能希望記錄錯誤而不是傳播它,或者你可能希望將異常視為有效回覆(透過將其對映到符合呼叫者理解的某些“錯誤訊息”契約的訊息)。為了實現這一點,閘道器透過包含對 error-channel 屬性的支援來提供對專門用於錯誤的 message channel 的支援。在以下示例中,一個“轉換器”從 Exception 建立一個回覆 Message

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer 可以是一個簡單的 POJO,它知道如何建立預期的錯誤響應物件。這成為傳送回撥用者的有效載荷。如果需要,你可以在這樣的“錯誤流”中做更多複雜的事情。它可能涉及路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter)、過濾器等等。但是,大多數情況下,一個簡單的“轉換器”就足夠了。

或者,你可能只想記錄異常(或將其非同步傳送到某個地方)。如果你提供單向流,則不會向呼叫者傳送任何內容。如果你想完全抑制異常,你可以提供對全域性 nullChannel 的引用(本質上是 /dev/null 方法)。最後,如上所述,如果未定義 error-channel,則異常照常傳播。

當您使用 @MessagingGateway 註解時(參見 @MessagingGateway` 註解),您可以使用 `errorChannel 屬性。

從 5.0 版本開始,當你使用返回型別為 void 的閘道器方法(單向流)時,error-channel 引用(如果提供)會填充到每條傳送訊息的標準 errorChannel 頭中。此功能允許基於標準 ExecutorChannel 配置(或 QueueChannel)的下游非同步流覆蓋預設的全域性 errorChannel 異常傳送行為。以前,你必須使用 @GatewayHeader 註解或 <header> 元素手動指定 errorChannel 頭。error-channel 屬性對於非同步流的 void 方法會被忽略。相反,錯誤訊息會發送到預設的 errorChannel

透過簡單的 POJI 閘道器暴露訊息系統帶來了好處,但“隱藏”底層訊息系統的真實性也付出了代價,因此你需要考慮一些事情。我們希望我們的 Java 方法儘快返回,而不是無限期地掛起,因為呼叫者正在等待它返回(無論是 void、返回值還是丟擲的異常)。當常規方法用作訊息系統前面的代理時,我們必須考慮底層訊息傳遞的潛在非同步性質。這意味著閘道器啟動的訊息可能被過濾器丟棄,並且永遠不會到達負責生成回覆的元件。一些服務啟用器方法可能導致異常,因此不提供回覆(因為我們不生成空訊息)。換句話說,多種情況可能導致回覆訊息永遠不會到達。這在訊息系統中是完全自然的。但是,請考慮對閘道器方法的影響。閘道器方法的輸入引數被合併到訊息中併發送到下游。回覆訊息將轉換為閘道器方法的返回值。因此,你可能希望確保對於每個閘道器呼叫,始終有回覆訊息。否則,如果 reply-timeout 設定為負值,你的閘道器方法可能永遠不會返回並無限期掛起。處理這種情況的一種方法是使用非同步閘道器(本節後面會解釋)。另一種處理方法是依賴預設的 reply-timeout30 秒。這樣,閘道器不會掛起超過 reply-timeout 指定的時間,並且如果超時過期則返回 'null'。最後,你可能需要考慮設定下游標誌,例如服務啟用器上的 'requires-reply' 或過濾器上的 'throw-exceptions-on-rejection'。這些選項將在本章的最後一節中更詳細地討論。
如果下游流返回 ErrorMessage,其 payload(一個 Throwable)將被視為常規的下游錯誤。如果配置了 error-channel,則會將其傳送到錯誤流。否則,有效載荷將拋給閘道器的呼叫者。類似地,如果 error-channel 上的錯誤流返回 ErrorMessage,其有效載荷將拋給呼叫者。這同樣適用於任何帶有 Throwable 有效載荷的訊息。這在非同步情況下很有用,當你需要將 Exception 直接傳播給呼叫者時。為此,你可以返回 Exception(作為某些服務的 reply)或丟擲它。通常,即使是非同步流,框架也會處理將下游流丟擲的異常傳播回閘道器。 TCP 客戶端-伺服器多路複用示例演示了將異常返回給呼叫者的兩種技術。它透過使用帶有 group-timeoutaggregator(參見聚合器和組超時)和丟棄流上的 MessagingTimeoutException 回覆來模擬等待執行緒的套接字 IO 錯誤。

閘道器超時

閘道器有兩個超時屬性:requestTimeoutreplyTimeout。請求超時僅在通道可以阻塞時適用,例如已滿的受限 QueueChannelreplyTimeout 值是閘道器等待回覆或返回 null 的時間。它預設為無限。

超時可以作為閘道器上所有方法的預設值(defaultRequestTimeoutdefaultReplyTimeout)或在 MessagingGateway 介面註解上設定。單個方法可以覆蓋這些預設值(在 <method/> 子元素中)或在 @Gateway 註解上設定。

從 5.0 版本開始,超時可以定義為表示式,如下例所示

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

評估上下文具有 BeanResolver(使用 @someBean 引用其他 bean),並且 #root 物件的 args 陣列屬性可用。有關此根物件的更多資訊,請參閱表示式和“全域性”頭部。當使用 XML 配置時,超時屬性可以是 long 值或 SpEL 表示式,如下例所示

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="args[0]"
                      request-timeout="1000"
                      reply-timeout="args[1]">
</method>

非同步閘道器

作為一種模式,訊息閘道器提供了一種很好的方式來隱藏訊息特定程式碼,同時仍然暴露訊息系統的全部功能。如前所述GatewayProxyFactoryBean 提供了一種方便的方式來透過服務介面暴露代理,為你提供基於 POJO 的訊息系統訪問(基於你自己的域中的物件、原語/字串或其他物件)。但是,當閘道器透過返回值的簡單 POJO 方法暴露時,它意味著對於每個請求訊息(在方法呼叫時生成),必須有一個回覆訊息(在方法返回時生成)。由於訊息系統本質上是非同步的,你可能無法始終保證“每個請求始終有回覆”的契約。Spring Integration 2.0 引入了對非同步閘道器的支援,它提供了一種方便的方式來啟動流,當你可能不知道是否期望回覆或回覆需要多長時間才能到達時。

為了處理這些型別的場景,Spring Integration 使用 java.util.concurrent.Future 例項來支援非同步閘道器。

從 XML 配置來看,沒有什麼變化,你仍然以與定義常規閘道器相同的方式定義非同步閘道器,如下例所示

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

然而,閘道器介面(服務介面)略有不同,如下所示

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

如上例所示,閘道器方法的返回型別是 Future。當 GatewayProxyFactoryBean 發現閘道器方法的返回型別是 Future 時,它會立即透過使用 AsyncTaskExecutor 切換到非同步模式。這就是區別的程度。對這種方法的呼叫總是立即返回一個 Future 例項。然後,你可以按照自己的節奏與 Future 互動以獲取結果、取消等。此外,與任何其他使用 Future 例項的情況一樣,呼叫 get() 可能會揭示超時、執行異常等。以下示例展示瞭如何使用從非同步閘道器返回的 Future

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

有關更詳細的示例,請參閱 Spring Integration 示例中的非同步閘道器示例。

此外,從 6.5 版本開始,Java DSL gateway() 運算子完全支援 async(true) 行為。在內部,為 GatewayProxyFactoryBean 提供了 AsyncRequestReplyExchanger 服務介面。由於 AsyncRequestReplyExchanger 契約是 CompletableFuture<Message<?>>,因此整個請求-回覆以非同步方式執行。此行為很有用,例如,在分發器-聚合器場景中,當必須為每個項呼叫另一個流時。但是,順序不重要——只有在所有處理之後它們在聚合器上的分組聚集。

AsyncTaskExecutor

預設情況下,當為返回型別為 Future 的任何閘道器方法提交內部 AsyncInvocationTask 例項時,GatewayProxyFactoryBean 使用 org.springframework.core.task.SimpleAsyncTaskExecutor。但是,<gateway/> 元素的配置中的 async-executor 屬性允許你提供對 Spring 應用程式上下文中可用的任何 java.util.concurrent.Executor 實現的引用。

(預設的)SimpleAsyncTaskExecutor 同時支援 FutureCompletableFuture 返回型別。請參閱CompletableFuture。儘管有預設執行器,但提供外部執行器通常很有用,這樣你就可以在日誌中識別其執行緒(使用 XML 時,執行緒名稱基於執行器的 bean 名稱),如下例所示

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果你希望返回不同的 Future 實現,你可以提供自定義執行器或完全停用執行器,並在下游流的回覆訊息有效載荷中返回 Future。要停用執行器,請在 GatewayProxyFactoryBean 中將其設定為 null(透過使用 setAsyncTaskExecutor(null))。當使用 XML 配置閘道器時,使用 async-executor=""。當使用 @MessagingGateway 註解配置時,使用類似於以下程式碼

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}
如果返回型別是特定的具體 Future 實現或配置的執行器不支援的其他子介面,則流在呼叫者的執行緒上執行,並且流必須在回覆訊息有效載荷中返回所需的型別。

CompletableFuture

從 4.2 版本開始,閘道器方法現在可以返回 CompletableFuture<?>。返回此型別時有兩種操作模式

  • 當提供了非同步執行器並且返回型別恰好是 CompletableFuture(而不是子類)時,框架會在執行器上執行任務,並立即將 CompletableFuture 返回給呼叫者。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) 用於建立 Future。

  • 當非同步執行器顯式設定為 null 且返回型別為 CompletableFuture 或返回型別是 CompletableFuture 的子類時,流將在呼叫者的執行緒上呼叫。在這種情況下,下游流預計會返回適當型別的 CompletableFuture

使用場景

在以下場景中,呼叫者執行緒立即返回 CompletableFuture<Invoice>,當子流回復閘道器(帶有 Invoice 物件)時,該 Future 將完成。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

在以下場景中,當子流將 CompletableFuture<Invoice> 作為回覆有效載荷提供給閘道器時,呼叫者執行緒將返回它。當發票準備好時,將來必須完成其他一些程序。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

在以下場景中,當子流將 CompletableFuture<Invoice> 作為回覆有效載荷提供給閘道器時,呼叫者執行緒將返回它。當發票準備好時,將來必須完成其他一些程序。如果啟用了 DEBUG 日誌記錄,則會發出一條日誌條目,指示此場景無法使用非同步執行器。

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

CompletableFuture 例項可用於對回覆執行額外的操作,如下例所示

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

Reactor Mono

從 5.0 版本開始,GatewayProxyFactoryBean 允許在閘道器介面方法中使用 Project Reactor,使用 Mono<T> 返回型別。內部的 AsyncInvocationTask 被包裝在 Mono.fromCallable() 中。

Mono 可用於稍後檢索結果(類似於 Future<?>),或者你可以在結果返回到閘道器時透過呼叫 Consumer 從排程器中消費它。

Mono 不會立即被框架重新整理。因此,在閘道器方法返回之前(與 Future<?> Executor 任務一樣),底層訊息流不會啟動。流在 Mono 被訂閱時啟動。或者,Mono(作為“可組合”)可能是 Reactor 流的一部分,此時 subscribe() 與整個 Flux 相關。以下示例展示瞭如何使用 Project Reactor 建立閘道器
@MessagingGateway
public interface TestGateway {

    @Gateway(requestChannel = "multiplyChannel")
    Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
    return value * 2;
}

其中這樣的閘道器可以在處理資料 Flux 的某些服務中使用

@Autowired
TestGateway testGateway;

public void handleFlux() {
    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(System.out::println);
}

另一個使用 Project Reactor 的示例是一個簡單的回撥場景,如下例所示

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

呼叫執行緒繼續執行,當流完成時呼叫 handleInvoice()

另請參閱Kotlin 協程以獲取更多資訊。

下游流返回非同步型別

如上文 AsyncTaskExecutor 部分所述,如果你希望某個下游元件返回帶有非同步有效載荷(FutureMono 等)的訊息,你必須將非同步執行器顯式設定為 null(或在使用 XML 配置時設定為 "")。然後,流將在呼叫者執行緒上呼叫,結果可以稍後檢索。

非同步 void 返回型別

訊息閘道器方法可以宣告如下

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

但下游異常不會傳播回撥用者。為了確保下游流呼叫的非同步行為和異常傳播到呼叫者,從 6.0 版本開始,框架支援 Future<Void>Mono<Void> 返回型別。用例類似於前面描述的普通 void 返回型別的傳送即忘記行為,但區別在於流執行是非同步發生的,並且返回的 Future(或 Mono)根據 send 操作結果以 null 或異常完成。

如果 Future<Void> 是確切的下游流回復,則閘道器的 asyncExecutor 選項必須設定為 null(對於 @MessagingGateway 配置,設定為 AnnotationConstants.NULL),並且 send 部分在生產者執行緒上執行。回覆部分取決於下游流配置。這樣,由目標應用程式負責正確生成 Future<Void> 回覆。Mono 用例已經超出了框架執行緒控制,因此將 asyncExecutor 設定為 null 沒有意義。Mono<Void> 作為請求-回覆閘道器操作的結果必須配置為閘道器方法的 Mono<?> 返回型別。

沒有回覆到達時的閘道器行為

前所述,閘道器透過 POJO 方法呼叫提供了一種與訊息系統互動的便捷方式。但是,典型的方法呼叫(通常期望始終返回(即使發生異常))可能並不總是與訊息交換一對一對映(例如,回覆訊息可能不會到達——這相當於方法不返回)。

本節的其餘部分涵蓋了各種場景以及如何使閘道器表現得更可預測。可以配置某些屬性以使同步閘道器行為更可預測,但其中一些屬性可能並不總是像你期望的那樣工作。其中之一是 reply-timeout(在方法級別或 default-reply-timeout 在閘道器級別)。我們檢查 reply-timeout 屬性,看看它在各種場景中如何以及不能如何影響同步閘道器的行為。我們檢查單執行緒場景(所有下游元件透過直接通道連線)和多執行緒場景(例如,在下游某個地方你可能有一個可輪詢或執行器通道,打破了單執行緒邊界)。

下游長時間執行的程序

同步閘道器,單執行緒

如果下游元件仍在執行(可能是因為無限迴圈或慢速服務),則設定 reply-timeout 無效,閘道器方法呼叫不會返回,直到下游服務退出(透過返回或丟擲異常)。

同步閘道器,多執行緒

如果下游元件在多執行緒訊息流中仍在執行(可能是由於無限迴圈或慢速服務),設定 reply-timeout 會產生效果,允許閘道器方法呼叫在超時達到後返回,因為 GatewayProxyFactoryBean 會輪詢回覆通道,等待訊息直到超時過期。但是,如果在實際回覆生成之前超時已達到,則可能導致閘道器方法返回 'null'。你應該理解,如果生成了回覆訊息,它會在閘道器方法呼叫可能已經返回後傳送到回覆通道,因此你必須意識到這一點並在設計流時考慮到它。

另請參閱 errorOnTimeout 屬性,以便在發生超時時丟擲 MessageTimeoutException 而不是返回 null

下游元件返回“null”

同步閘道器 - 單執行緒

如果下游元件返回“null”,並且 reply-timeout 已配置為負值,則閘道器方法呼叫將無限期掛起,除非在可能返回“null”的下游元件(例如服務啟用器)上設定了 requires-reply 屬性。在這種情況下,將丟擲異常並傳播到閘道器。

同步閘道器 - 多執行緒

行為與前一種情況相同。

下游元件返回簽名是“void”,而閘道器方法簽名是非“void”

同步閘道器 - 單執行緒

如果下游元件返回“void”,並且 reply-timeout 已配置為負值,則閘道器方法呼叫將無限期掛起。

同步閘道器 - 多執行緒

行為與前一種情況相同。

下游元件導致執行時異常

同步閘道器 - 單執行緒

如果下游元件丟擲執行時異常,則該異常將透過錯誤訊息傳播回閘道器並重新丟擲。

同步閘道器 - 多執行緒

行為與前一種情況相同。

你應該理解,預設情況下,reply-timeout 是無界的。因此,如果你將 reply-timeout 設定為負值,你的閘道器方法呼叫可能會無限期掛起。所以,為了確保你分析你的流,並且即使有遠端的可能性發生這些場景之一,你也應該將 reply-timeout 屬性設定為一個“安全”值。它預設是 30 秒。甚至更好,你可以將下游元件的 requires-reply 屬性設定為“true”,以確保及時響應,方法是當該下游元件內部返回 null 時立即丟擲異常。但是,你也應該意識到存在一些場景(參見第一個reply-timeout 沒有幫助。這意味著分析你的訊息流並決定何時使用同步閘道器而不是非同步閘道器也很重要。如前所述,後一種情況是定義返回 Future 例項的閘道器方法。然後,你保證會收到該返回值,並且你對呼叫結果有更細粒度的控制。此外,在處理路由器時,你應該記住將 resolution-required 屬性設定為“true”會導致路由器在無法解析特定通道時丟擲異常。同樣,在處理過濾器時,你可以設定 throw-exception-on-rejection 屬性。在這兩種情況下,生成的流的行為就像它包含一個帶有 'requires-reply' 屬性的服務啟用器。換句話說,它有助於確保閘道器方法呼叫的及時響應。
你應該理解,計時器線上程返回到閘道器時啟動——也就是說,當流完成或訊息傳遞給另一個執行緒時。此時,呼叫執行緒開始等待回覆。如果流是完全同步的,則回覆立即可用。對於非同步流,執行緒會等待到此時間。

從 6.2 版本開始,MessagingGatewaySupport 的內部 MethodInvocationGateway 擴充套件的 errorOnTimeout 屬性暴露在 @MessagingGatewayGatewayEndpointSpec 上。此選項的含義與 端點摘要 章末解釋的任何入站閘道器完全相同。換句話說,將此選項設定為 true,將在接收超時耗盡時,從傳送和接收閘道器操作丟擲 MessageTimeoutException 而不是返回 null

有關透過 IntegrationFlow 定義閘道器的選項,請參閱 Java DSL 章中的IntegrationFlow 作為閘道器

© . This site is unofficial and not affiliated with VMware.