訊息釋出

面向切面程式設計 (AOP) 訊息釋出功能讓您可以在方法呼叫完成後構造和傳送訊息。例如,假設您有一個元件,並且每次此元件的狀態更改時,您都希望透過訊息收到通知。傳送此類通知的最簡單方法是將訊息傳送到專用通道,但是如何將更改物件狀態的方法呼叫連線到訊息傳送過程,以及通知訊息應該如何結構化?AOP 訊息釋出功能透過配置驅動的方法處理這些職責。

訊息釋出配置

Spring Integration 提供兩種方法:XML 配置和註解驅動 (Java) 配置。

使用 @Publisher 註解的註解驅動配置

註解驅動的方法允許您使用 @Publisher 註解來標註任何方法以指定“通道”屬性。從 5.1 版本開始,要啟用此功能,您必須在某些 @Configuration 類上使用 @EnablePublisher 註解。有關更多資訊,請參閱配置和 @EnableIntegration。訊息由方法呼叫的返回值構造併發送到“通道”屬性指定的通道。要進一步管理訊息結構,您還可以結合使用 @Payload@Header 註解。

在內部,Spring Integration 的此訊息釋出功能透過定義 PublisherAnnotationAdvisor 和 Spring Expression Language (SpEL) 使用 Spring AOP,為您提供了極大的靈活性和對釋出訊息結構的控制。

PublisherAnnotationAdvisor 定義並繫結以下變數

  • #return:繫結到返回值,允許您引用它或它的屬性(例如,#return.something,其中“something”是繫結到 #return 的物件的屬性)

  • #exception:如果方法呼叫丟擲異常,則繫結到異常

  • #args:繫結到方法引數,以便您可以透過名稱提取單個引數(例如,#args.fname

考慮以下示例

@Publisher
public String defaultPayload(String fname, String lname) {
  return fname + " " + lname;
}

在前面的示例中,訊息以以下結構構造

  • 訊息載荷是方法的返回型別和值。這是預設值。

  • 新構造的訊息被髮送到預設的釋出者通道,該通道使用註解後處理器(本節後面會介紹)進行配置。

以下示例與前面的示例相同,只是它不使用預設的釋出通道

@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
  return fname + " " + lname;
}

我們不使用預設的釋出通道,而是透過設定 @Publisher 註解的“通道”屬性來指定釋出通道。我們還添加了一個 @Header 註解,這導致名為“last”的訊息頭具有與“lname”方法引數相同的值。該頭被新增到新構造的訊息中。

以下示例與前面的示例幾乎相同

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
  return fname + " " + lname;
}

唯一的區別是我們在方法上使用 @Payload 註解來明確指定方法的返回值應作為訊息的載荷。

以下示例透過在 @Payload 註解中使用 Spring Expression Language 進一步擴充套件了之前的配置,以指示框架如何構造訊息

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
  return fname + " " + lname;
}

在前面的示例中,訊息是方法呼叫的返回值和“lname”輸入引數的連線。名為“x”的訊息頭的值由“num”輸入引數確定。該頭被新增到新構造的訊息中。

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
  return fname + " " + lname;
}

在前面的示例中,您看到了 @Payload 註解的另一種用法。在這裡,我們標註了一個方法引數,該引數成為新構造訊息的載荷。

與 Spring 中的大多數其他註解驅動功能一樣,您需要註冊一個後處理器 (PublisherAnnotationBeanPostProcessor)。以下示例展示瞭如何執行此操作

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

對於更簡潔的配置,您可以轉而使用名稱空間支援,如下面的示例所示

<int:annotation-config>
    <int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>

對於 Java 配置,您必須使用 @EnablePublisher 註解,如下面的示例所示

@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
    ...
}

從 5.1.3 版本開始,<int:enable-publisher> 元件以及 @EnablePublisher 註解具有 proxy-target-classorder 屬性,用於調整 ProxyFactory 配置。

與其他 Spring 註解(@Component@Scheduled 等)類似,您還可以將 @Publisher 用作元註解。這意味著您可以定義自己的註解,這些註解與 @Publisher 本身的處理方式相同。以下示例展示瞭如何執行此操作

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}

在前面的示例中,我們定義了 @Audit 註解,它本身被 @Publisher 註解。另請注意,您可以在元註解上定義一個 channel 屬性,以封裝此註解內部發送訊息的位置。現在,您可以使用 @Audit 註解標註任何方法,如下面的示例所示

@Audit
public String test() {
    return "Hello";
}

在前面的示例中,每次呼叫 test() 方法都會生成一個訊息,其載荷由其返回值建立。每條訊息都發送到名為 auditChannel 的通道。此技術的好處之一是您可以避免在多個註解中重複使用相同的通道名稱。您還可以為您自己的(可能是特定領域的)註解和框架提供的註解之間提供一層間接。

您還可以標註類,這允許您將此註解的屬性應用於該類的每個公共方法,如下面的示例所示

@Audit
static class BankingOperationsImpl implements BankingOperations {

  public String debit(String amount) {
     . . .

  }

  public String credit(String amount) {
     . . .
  }

}

使用 <publishing-interceptor> 元素的基於 XML 的方法

基於 XML 的方法允許您將相同的基於 AOP 的訊息釋出功能配置為 MessagePublishingInterceptor 的基於名稱空間的配置。它確實比註解驅動的方法有一些好處,因為它允許您使用 AOP 切入點表示式,從而可能一次攔截多個方法或攔截併發布您沒有原始碼的方法。

要使用 XML 配置訊息釋出,您只需執行以下兩件事

  • 使用 <publishing-interceptor> XML 元素為 MessagePublishingInterceptor 提供配置。

  • 提供 AOP 配置以將 MessagePublishingInterceptor 應用於託管物件。

以下示例展示瞭如何配置 publishing-interceptor 元素

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
  <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" value="something"/>
  </method>
  <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" expression="'something'.toUpperCase()"/>
  </method>
  <method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>

<publishing-interceptor> 配置看起來與基於註解的方法非常相似,它也使用了 Spring Expression Language 的強大功能。

在前面的示例中,testBeanecho 方法的執行會呈現一個具有以下結構的訊息

  • Message 載荷型別為 String,內容如下:Echoing: [value],其中 value 是執行方法返回的值。

  • Message 有一個名為 things 的頭,其值為 something

  • Message 被髮送到 echoChannel

第二種方法與第一種非常相似。在這裡,每個以“repl”開頭的方法都會呈現一個具有以下結構的訊息

  • Message 載荷與前面的示例相同。

  • Message 有一個名為 things 的頭,其值是 SpEL 表示式 'something'.toUpperCase() 的結果。

  • Message 被髮送到 echoChannel

第二種方法,對映任何以 echoDef 開頭的方法的執行,會生成一個具有以下結構的訊息

  • Message 載荷是執行方法返回的值。

  • 由於未提供 channel 屬性,因此 Message 被髮送到 publisher 定義的 defaultChannel

對於簡單的對映規則,您可以依賴 publisher 預設值,如下面的示例所示

<publishing-interceptor id="anotherInterceptor"/>

前面的示例將與切入點表示式匹配的每個方法的返回值對映到載荷,併發送到 default-channel。如果您未指定 defaultChannel(如前面的示例所示),則訊息將傳送到全域性 nullChannel(相當於 /dev/null)。

非同步釋出

釋出與元件的執行發生在同一執行緒中。因此,預設情況下,它是同步的。這意味著整個訊息流必須等待發布者的流完成。然而,開發人員通常希望完全相反:使用此訊息釋出功能來啟動非同步流。例如,您可能託管一個服務(HTTP、WS 等),該服務接收遠端請求。您可能希望將此請求內部發送到可能需要一段時間才能處理的程序。但是,您也可能希望立即回覆使用者。因此,與其將入站請求傳送到輸出通道進行處理(傳統方式),不如使用“output-channel”或“replyChannel”頭向呼叫者傳送一個簡單的確認式回覆,同時使用訊息釋出功能啟動複雜的流。

以下示例中的服務接收復雜的載荷(需要進一步傳送以進行處理),但它還需要向呼叫者回復一個簡單的確認

public String echo(Object complexPayload) {
     return "ACK";
}

因此,我們不將複雜的流連線到輸出通道,而是使用訊息釋出功能。我們將其配置為使用服務方法的輸入引數(如前面的示例所示)建立一條新訊息,並將其傳送到“localProcessChannel”。為了確保此流是非同步的,我們只需將其傳送到任何型別的非同步通道(下一個示例中的 ExecutorChannel)。以下示例展示瞭如何配置非同步 publishing-interceptor

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleService" class="test.SampleService"/>

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
  <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
    <int:header name="sample_header" expression="'some sample value'"/>
  </int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
  <int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>

處理此類場景的另一種方法是使用 wire-tap。請參閱Wire Tap

基於計劃觸發器生成和釋出訊息

在前面的章節中,我們研究了訊息釋出功能,該功能將訊息構造併發布為方法呼叫的副產品。但是,在這些情況下,您仍然負責呼叫方法。Spring Integration 2.0 添加了對計劃訊息生產者和釋出者的支援,在“inbound-channel-adapter”元素上新增了 expression 屬性。您可以基於幾個觸發器進行計劃,其中任何一個都可以在“poller”元素上進行配置。目前,我們支援 cronfixed-ratefixed-delay 以及您實現並透過“trigger”屬性值引用的任何自定義觸發器。

如前所述,對計劃生產者和釋出者的支援是透過 <inbound-channel-adapter> XML 元素提供的。請考慮以下示例

<int:inbound-channel-adapter id="fixedDelayProducer"
       expression="'fixedDelayTest'"
       channel="fixedDelayChannel">
    <int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>

前面的示例建立了一個入站通道介面卡,該介面卡構造一個 Message,其載荷是 expression 屬性中定義的表示式的結果。每當發生 fixed-delay 屬性指定的延遲時,就會建立併發送此類訊息。

以下示例與前面的示例類似,只是它使用了 fixed-rate 屬性

<int:inbound-channel-adapter id="fixedRateProducer"
       expression="'fixedRateTest'"
       channel="fixedRateChannel">
    <int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>

fixed-rate 屬性允許您以固定速率傳送訊息(從每個任務的開始時間測量)。

以下示例展示瞭如何應用 Cron 觸發器,其值在 cron 屬性中指定

<int:inbound-channel-adapter id="cronProducer"
       expression="'cronTest'"
       channel="cronChannel">
    <int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>

以下示例展示瞭如何向訊息中插入額外的頭

<int:inbound-channel-adapter id="headerExpressionsProducer"
       expression="'headerExpressionsTest'"
       channel="headerExpressionsChannel"
       auto-startup="false">
    <int:poller fixed-delay="5000"/>
    <int:header name="foo" expression="6 * 7"/>
    <int:header name="bar" value="x"/>
</int:inbound-channel-adapter>

附加訊息頭可以採用標量值或評估 Spring 表示式的結果。

如果您需要實現自己的自定義觸發器,可以使用 trigger 屬性提供對實現 org.springframework.scheduling.Trigger 介面的任何 Spring 配置 bean 的引用。以下示例展示瞭如何執行此操作

<int:inbound-channel-adapter id="triggerRefProducer"
       expression="'triggerRefTest'" channel="triggerRefChannel">
    <int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
    <beans:constructor-arg value="9999"/>
</beans:bean>
© . This site is unofficial and not affiliated with VMware.