訊息釋出

(面向切面程式設計) AOP 訊息釋出功能允許您在方法呼叫完成後構建併發送訊息作為其副產品。例如,想象您有一個元件,並且每次該元件的狀態發生變化時,您都希望透過訊息收到通知。傳送此類通知的最簡單方法是將訊息傳送到專用通道,但是如何將更改物件狀態的方法呼叫連線到訊息傳送過程,以及通知訊息應該如何構建呢?AOP 訊息釋出功能透過配置驅動的方法處理這些職責。

訊息釋出配置

Spring Integration 提供了兩種方法:XML 配置和註解驅動 (Java) 配置。

使用 @Publisher 註解的註解驅動配置

註解驅動的方法允許您使用 @Publisher 註解來標記任何方法,以指定一個 'channel' 屬性。從 5.1 版本開始,要啟用此功能,您必須在某個 @Configuration 類上使用 @EnablePublisher 註解。有關更多資訊,請參閱配置和 @EnableIntegration。訊息由方法呼叫的返回值構建,併發送到 'channel' 屬性指定的通道。為了進一步管理訊息結構,您還可以結合使用 @Payload@Header 註解。

在內部,Spring Integration 的此訊息釋出功能透過定義 PublisherAnnotationAdvisor 來使用 Spring AOP,並使用 Spring 表示式語言 (SpEL),這使您可以對釋出的 Message 結構進行極大的靈活性和控制。

PublisherAnnotationAdvisor 定義並繫結以下變數

  • #return:繫結到返回值,允許您引用它或其屬性(例如,#return.something,其中 'something' 是繫結到 #return 的物件的屬性)

  • #exception:如果方法呼叫丟擲異常,則繫結到該異常

  • #args:繫結到方法引數,以便您可以按名稱提取單個引數(例如,#args.fname

請考慮以下示例

@Publisher
public String defaultPayload(String fname, String lname) {
  return fname + " " + lname;
}

在前面的示例中,訊息按以下結構構建

  • 訊息負載是方法的返回型別和值。這是預設行為。

  • 構建的新訊息被髮送到預設的釋出者通道,該通道已配置了註解後處理器(本節後面將介紹)。

以下示例與前面的示例相同,但它不使用預設的釋出通道

@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
  return fname + " " + lname;
}

我們沒有使用預設的釋出通道,而是透過設定 @Publisher 註解的 'channel' 屬性來指定釋出通道。我們還添加了一個 @Header 註解,這導致名為 'last' 的訊息頭與 'lname' 方法引數具有相同的值。該訊息頭被新增到新構建的訊息中。

以下示例與前面的示例幾乎相同

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
  return fname + " " + lname;
}

唯一的區別在於我們在方法上使用了 @Payload 註解,明確指定方法的返回值應作為訊息的負載。

以下示例透過在 @Payload 註解中使用 Spring 表示式語言,進一步擴充套件了先前的配置,以指導框架如何構建訊息。

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
  return fname + " " + lname;
}

在前面的示例中,訊息是方法呼叫的返回值與 'lname' 輸入引數的拼接。名為 'x' 的訊息頭的值由 'num' 輸入引數確定。該訊息頭被新增到新構建的訊息中。

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
  return fname + " " + lname;
}

在前面的示例中,您看到了 @Payload 註解的另一種用法。在這裡,我們標記了一個方法引數,該引數成為新構建訊息的負載。

與 Spring 中大多數其他註解驅動的功能一樣,您需要註冊一個後處理器 (PublisherAnnotationBeanPostProcessor)。以下示例展示瞭如何做到這一點。

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

為了更簡潔的配置,您可以使用名稱空間支援,如下例所示。

<int:annotation-config>
    <int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>

對於 Java 配置,您必須使用 @EnablePublisher 註解,如下例所示。

@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
    ...
}

從 5.1.3 版本開始,<int:enable-publisher> 元件以及 @EnablePublisher 註解擁有 proxy-target-classorder 屬性,用於調整 ProxyFactory 配置。

與其他 Spring 註解(如 @Component@Scheduled 等)類似,您也可以將 @Publisher 用作元註解。這意味著您可以定義自己的註解,它們將與 @Publisher 本身以相同的方式處理。以下示例展示瞭如何做到這一點。

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}

在前面的示例中,我們定義了 @Audit 註解,它本身被 @Publisher 註解標記。另請注意,您可以在元註解上定義一個 channel 屬性,以封裝此註解內部發送訊息的位置。現在,您可以使用 @Audit 註解標記任何方法,如下例所示。

@Audit
public String test() {
    return "Hello";
}

在前面的示例中,每次呼叫 test() 方法都會產生一條訊息,其負載由其返回值建立。每條訊息都被髮送到名為 auditChannel 的通道。這種技術的好處之一是,您可以避免在多個註解中重複使用相同的通道名稱。您還可以為您自己的、可能特定於領域的註解與框架提供的註解之間提供一定程度的間接性。

您還可以註解類,這允許您將此註解的屬性應用於該類的每個公共方法,如下例所示。

@Audit
static class BankingOperationsImpl implements BankingOperations {

  public String debit(String amount) {
     . . .

  }

  public String credit(String amount) {
     . . .
  }

}

使用 <publishing-interceptor> 元素的基於 XML 的方法

基於 XML 的方法允許您將相同的基於 AOP 的訊息釋出功能配置為 MessagePublishingInterceptor 的名稱空間配置。它肯定比註解驅動的方法有一些優勢,因為它允許您使用 AOP 切入點表示式,從而可能一次攔截多個方法,或者攔截和釋出您沒有原始碼的方法。

要使用 XML 配置訊息釋出,您只需執行以下兩項操作

  • 使用 <publishing-interceptor> XML 元素提供 MessagePublishingInterceptor 的配置。

  • 提供 AOP 配置以將 MessagePublishingInterceptor 應用於託管物件。

以下示例展示瞭如何配置 publishing-interceptor 元素。

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
  <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" value="something"/>
  </method>
  <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" expression="'something'.toUpperCase()"/>
  </method>
  <method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>

<publishing-interceptor> 配置看起來與基於註解的方法非常相似,它也使用了 Spring 表示式語言的強大功能。

在前面的示例中,執行 testBeanecho 方法會生成一個具有以下結構的 Message

  • Message 的負載型別為 String,內容如下:Echoing: [value],其中 value 是被執行方法返回的值。

  • Message 有一個名為 things 的訊息頭,其值為 something

  • Message 被髮送到 echoChannel

第二種方法與第一種非常相似。在這裡,每個以 'repl' 開頭的方法都會生成一個具有以下結構的 Message

  • Message 負載與前面的示例相同。

  • Message 有一個名為 things 的訊息頭,其值是 SpEL 表示式 'something'.toUpperCase() 的結果。

  • Message 被髮送到 echoChannel

第二種方法,對映任何以 echoDef 開頭的方法執行,會產生一個具有以下結構的 Message

  • Message 負載是被執行方法返回的值。

  • 由於沒有提供 channel 屬性,Message 被髮送到由 publisher 定義的 defaultChannel

對於簡單的對映規則,您可以依賴 publisher 的預設設定,如下例所示。

<publishing-interceptor id="anotherInterceptor"/>

前面的示例將匹配切入點表示式的每個方法的返回值對映為負載,併發送到 default-channel。如果您沒有指定 defaultChannel(就像前面的示例那樣),則訊息會發送到全域性 nullChannel(相當於 /dev/null)。

非同步釋出

釋出與您的元件執行在同一執行緒中發生。因此,預設情況下它是同步的。這意味著整個訊息流必須等待發布者的流完成。然而,開發人員通常想要完全相反的效果:使用此訊息釋出功能來啟動非同步流。例如,您可能託管一個服務(HTTP、WS 等),它接收遠端請求。您可能希望將此請求內部發送到一個可能需要一段時間才能完成的程序。但是,您也可能希望立即回覆使用者。因此,您可以不將入站請求傳送到輸出通道進行處理(傳統方式),而是使用 'output-channel' 或 'replyChannel' 頭部向呼叫者傳送一個簡單的確認式回覆,同時使用訊息釋出功能啟動複雜的流。

以下示例中的服務接收一個複雜的負載(需要進一步傳送進行處理),但它也需要向呼叫者回復一個簡單的確認。

public String echo(Object complexPayload) {
     return "ACK";
}

因此,我們沒有將複雜流連線到輸出通道,而是使用了訊息釋出功能。我們配置它使用服務方法(如前面的示例所示)的輸入引數來建立一個新訊息,並將其傳送到 'localProcessChannel'。為了確保此流是非同步的,我們只需將其傳送到任何型別的非同步通道(下一個示例中的 ExecutorChannel)。以下示例展示瞭如何配置非同步 publishing-interceptor

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleService" class="test.SampleService"/>

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
  <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
    <int:header name="sample_header" expression="'some sample value'"/>
  </int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
  <int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>

處理此類場景的另一種方法是使用線控(wire-tap)。請參閱線控(Wire Tap)

基於排程觸發器生成和釋出訊息

在前面的章節中,我們研究了訊息釋出功能,該功能在方法呼叫完成後構建併發布訊息作為其副產品。但是,在這些情況下,您仍然負責呼叫方法。Spring Integration 2.0 添加了對排程訊息生產者和釋出者的支援,透過在 'inbound-channel-adapter' 元素上新增了 expression 屬性。您可以基於幾種觸發器進行排程,其中任何一種都可以在 'poller' 元素上配置。目前,我們支援 cronfixed-ratefixed-delay 以及您實現的任何自定義觸發器(透過 'trigger' 屬性值引用)。

如前所述,透過 <inbound-channel-adapter> XML 元素提供對排程生產者和釋出者的支援。請考慮以下示例。

<int:inbound-channel-adapter id="fixedDelayProducer"
       expression="'fixedDelayTest'"
       channel="fixedDelayChannel">
    <int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>

前面的示例建立了一個入站通道介面卡,它構造一個 Message,其負載是 expression 屬性中定義的表示式的結果。每當 fixed-delay 屬性指定的延遲發生時,就會建立併發送此類訊息。

以下示例與前面的示例類似,但它使用了 fixed-rate 屬性。

<int:inbound-channel-adapter id="fixedRateProducer"
       expression="'fixedRateTest'"
       channel="fixedRateChannel">
    <int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>

fixed-rate 屬性允許您以固定的速率(從每個任務的開始時間測量)傳送訊息。

以下示例展示瞭如何應用 Cron 觸發器,並在 cron 屬性中指定值。

<int:inbound-channel-adapter id="cronProducer"
       expression="'cronTest'"
       channel="cronChannel">
    <int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>

以下示例展示瞭如何向訊息中插入附加的訊息頭。

<int:inbound-channel-adapter id="headerExpressionsProducer"
       expression="'headerExpressionsTest'"
       channel="headerExpressionsChannel"
       auto-startup="false">
    <int:poller fixed-delay="5000"/>
    <int:header name="foo" expression="6 * 7"/>
    <int:header name="bar" value="x"/>
</int:inbound-channel-adapter>

附加的訊息頭可以採用標量值或評估 Spring 表示式的結果。

如果您需要實現自己的自定義觸發器,可以使用 trigger 屬性提供對實現 org.springframework.scheduling.Trigger 介面的任何 Spring 配置的 Bean 的引用。以下示例展示瞭如何做到這一點。

<int:inbound-channel-adapter id="triggerRefProducer"
       expression="'triggerRefTest'" channel="triggerRefChannel">
    <int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
    <beans:constructor-arg value="9999"/>
</beans:bean>