註解支援

除了用於配置訊息端點的 XML 名稱空間支援外,您還可以使用註解。首先,Spring Integration 提供了類級別的 @MessageEndpoint 作為原型註解,這意味著它本身使用 Spring 的 @Component 註解進行了標註,因此可以被 Spring 的元件掃描自動識別為 Bean 定義。

更重要的是各種方法級別的註解。它們表明被註解的方法能夠處理訊息。以下示例展示了類級別和方法級別的註解

@MessageEndpoint
public class FooService {

    @ServiceActivator
    public void processMessage(Message message) {
        ...
    }
}

方法“處理”訊息的具體含義取決於特定的註解。Spring Integration 中可用的註解包括

如果您將 XML 配置與註解結合使用,則不需要 @MessageEndpoint 註解。如果您想從 <service-activator/> 元素的 ref 屬性配置 POJO 引用,則只需提供方法級別的註解即可。在這種情況下,即使 <service-activator/> 元素上沒有方法級別屬性,註解也可以防止歧義。

在大多數情況下,被註解的處理方法不應該要求將 Message 型別作為其引數。相反,方法引數型別可以與訊息的載荷型別匹配,如下例所示

public class ThingService {

    @ServiceActivator
    public void bar(Thing thing) {
        ...
    }

}

當方法引數應從 MessageHeaders 中的值對映時,另一種選擇是使用引數級別的 @Header 註解。通常,使用 Spring Integration 註解標註的方法可以接受 Message 本身、訊息載荷或訊息頭值(使用 @Header)作為引數。實際上,方法可以接受組合,如下例所示

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Header("x") int valueX, @Header("y") int valueY) {
        ...
    }

}

您還可以使用 @Headers 註解將所有訊息頭作為 Map 提供,如下例所示

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}
註解的值也可以是 SpEL 表示式(例如 someHeader.toUpperCase()),這在您希望在注入訊息頭值之前對其進行操作時很有用。它還提供了一個可選的 required 屬性,該屬性指定屬性值是否必須在訊息頭中可用。required 屬性的預設值為 true

對於這些註解中的一些,當訊息處理方法返回非 null 值時,端點會嘗試傳送回覆。這在兩種配置選項(名稱空間和註解)中是一致的,即使用此類端點的輸出通道(如果可用),並使用 REPLY_CHANNEL 訊息頭值作為備選。

端點上的輸出通道和回覆通道訊息頭的組合實現了流水線(pipeline)方法,其中多個元件具有輸出通道,而最終元件允許將回復訊息轉發到回覆通道(如原始請求訊息中指定)。換句話說,最終元件依賴於原始傳送者提供的資訊,因此可以動態支援任意數量的客戶端。這是 回覆地址 模式的一個示例。

除了此處顯示的示例外,這些註解還支援 inputChanneloutputChannel 屬性,如下例所示

@Service
public class ThingService {

    @ServiceActivator(inputChannel="input", outputChannel="output")
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}

這些註解的處理會建立與相應的 XML 元件相同的 Bean —— AbstractEndpoint 例項和 MessageHandler 例項(或入站通道介面卡的 MessageSource 例項)。參見 @Bean 方法上的註解。Bean 名稱按照以下模式生成:[元件名稱].[方法名稱].[註解類短名稱小寫首字母]. 在前面的示例中,AbstractEndpoint 的 Bean 名稱是 thingService.otherThing.serviceActivator,而 MessageHandler (MessageSource) Bean 的名稱相同,並額外帶有 .handler (.source) 字尾。可以使用 @EndpointId 註解以及這些訊息註解來自定義此類名稱。MessageHandler 例項 (MessageSource 例項) 也符合被訊息歷史記錄跟蹤的條件。

從 4.0 版本開始,所有訊息註解都提供了 SmartLifecycle 選項(autoStartupphase),以便在應用程式上下文初始化時控制端點生命週期。它們預設分別為 true0。要更改端點的狀態(例如 start()stop()),您可以使用 BeanFactory(或透過自動裝配)獲取端點 Bean 的引用並呼叫其方法。或者,您可以向控制匯流排傳送命令訊息。為了這些目的,您應該使用前面段落中提到的 beanName

在解析上述註解後自動建立的通道(當未配置特定通道 Bean 時)以及相應的消費者端點,在上下文初始化接近結束時被宣告為 Bean。這些 Bean 可以 在其他服務中自動裝配,但必須使用 @Lazy 註解標記,因為通常在正常的自動裝配處理期間,這些定義尚未可用。

@Autowired
@Lazy
@Qualifier("someChannel")
MessageChannel someChannel;
...

@Bean
Thing1 dependsOnSPCA(@Qualifier("someInboundAdapter") @Lazy SourcePollingChannelAdapter someInboundAdapter) {
    ...
}

從 6.0 版本開始,所有訊息註解都變為 @Repeatable,因此可以在同一個服務方法上宣告多個相同型別的註解,這意味著將根據重複的註解數量建立相應數量的端點。

@Transformer(inputChannel = "inputChannel1", outputChannel = "outputChannel1")
@Transformer(inputChannel = "inputChannel2", outputChannel = "outputChannel2")
public String transform(String input) {
    return input.toUpperCase();
}

使用 @Poller 註解

在 Spring Integration 4.0 之前,訊息註解要求 inputChannel 是對 SubscribableChannel 的引用。對於 PollableChannel 例項,需要 <int:bridge/> 元素來配置 <int:poller/> 並使組合端點成為 PollingConsumer。4.0 版本引入了 @Poller 註解,允許直接在訊息註解上配置 poller 屬性,如下例所示

public class AnnotationService {

    @Transformer(inputChannel = "input", outputChannel = "output",
        poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
    public String handle(String payload) {
        ...
    }
}

@Poller 註解僅提供簡單的 PollerMetadata 選項。您可以使用屬性佔位符配置 @Poller 註解的屬性(maxMessagesPerPollfixedDelayfixedRatecron)。此外,從 5.1 版本開始,還提供了 PollingConsumerreceiveTimeout 選項。如果需要提供更多輪詢選項(例如 transactionadvice-chainerror-handler 等),則應將 PollerMetadata 配置為通用 Bean,並將其 Bean 名稱用作 @Pollervalue 屬性。在這種情況下,不允許使用其他屬性(必須在 PollerMetadata Bean 上指定)。請注意,如果 inputChannelPollableChannel 並且未配置 @Poller,則使用預設的 PollerMetadata(如果存在於應用程式上下文中)。要使用 @Configuration 註解宣告預設輪詢器,請使用類似於以下示例的程式碼

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
}

以下示例展示瞭如何使用預設輪詢器

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
    public String handle(String payload) {
        ...
    }
}

以下示例展示瞭如何使用命名輪詢器

@Bean
public PollerMetadata myPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(1000));
    return pollerMetadata;
}

以下示例展示了一個使用預設輪詢器的端點

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
                           poller = @Poller("myPoller"))
    public String handle(String payload) {
         ...
    }
}

從 4.3.3 版本開始,@Poller 註解包含 errorChannel 屬性,以便更輕鬆地配置底層的 MessagePublishingErrorHandler。此屬性與 <poller> XML 元件中的 error-channel 作用相同。有關更多資訊,請參見 端點名稱空間支援

訊息註解上的 poller() 屬性與 reactive() 屬性互斥。有關更多資訊,請參見下一節。

使用 @Reactive 註解

ReactiveStreamsConsumer 自 5.0 版本以來就已存在,但僅在端點的輸入通道是 FluxMessageChannel(或任何 org.reactivestreams.Publisher 實現)時才適用。從 5.3 版本開始,當目標訊息處理器是 ReactiveMessageHandler 時,無論輸入通道型別如何,框架也會建立其例項。從 5.5 版本開始,所有訊息註解都引入了 @Reactive 子註解(類似於上面提到的 @Poller)。它接受一個可選的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> Bean 引用,並且無論輸入通道型別和訊息處理器如何,都可以將目標端點轉換為 ReactiveStreamsConsumer 例項。該函式用於 Flux.transform() 運算子,以對來自輸入通道的響應式流源應用一些自定義(例如 publishOn()doOnNext()log()retry() 等)。

以下示例演示瞭如何獨立於最終訂閱者和生產者,更改來自輸入通道的釋出執行緒到該 DirectChannel

@Bean
public Function<Flux<?>, Flux<?>> publishOnCustomizer() {
    return flux -> flux.publishOn(Schedulers.parallel());
}

@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
public void handleReactive(String payload) {
    ...
}

訊息註解上的 reactive() 屬性與 poller() 屬性互斥。有關更多資訊,請參見 使用 @Poller 註解響應式流支援

使用 @InboundChannelAdapter 註解

4.0 版本引入了 @InboundChannelAdapter 方法級別註解。它基於被註解方法的 MethodInvokingMessageSource 產生一個 SourcePollingChannelAdapter 整合元件。此註解與 <int:inbound-channel-adapter> XML 元件類似,並具有相同的限制:方法不能有引數,返回型別不能是 void。它有兩個屬性:value(必需的 MessageChannel Bean 名稱)和 poller(可選的 @Poller 註解,如前所述)。如果需要提供一些 MessageHeaders,請使用 Message<?> 返回型別,並使用 MessageBuilder 構建 Message<?>。使用 MessageBuilder 可以配置 MessageHeaders。以下示例展示瞭如何使用 @InboundChannelAdapter 註解

@InboundChannelAdapter("counterChannel")
public Integer count() {
    return this.counter.incrementAndGet();
}

@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
    return "foo";
}

4.3 版本引入了 value 註解屬性的 channel 別名,以提高原始碼的可讀性。此外,目標 MessageChannel Bean 是在 SourcePollingChannelAdapter 中,透過提供的名稱(由 outputChannelName 選項設定)在第一次呼叫 receive() 時解析的,而不是在初始化階段。這允許“後期繫結”邏輯:從消費者角度看,目標 MessageChannel Bean 的建立和註冊比 @InboundChannelAdapter 的解析階段稍晚。

第一個示例要求預設輪詢器已在應用程式上下文中的其他位置宣告。

使用 @MessagingGateway 註解

使用 @IntegrationComponentScan 註解

標準的 Spring Framework @ComponentScan 註解不會掃描介面以查詢原型 @Component 註解。為了克服這個限制並允許配置 @MessagingGateway(參見 @MessagingGateway 註解),我們引入了 @IntegrationComponentScan 機制。此註解必須與 @Configuration 註解一起放置,並進行自定義以定義其掃描選項,例如 basePackagesbasePackageClasses。在這種情況下,所有使用 @MessagingGateway 註解的發現的介面都會被解析並註冊為 GatewayProxyFactoryBean 例項。所有其他基於類的元件由標準的 @ComponentScan 解析。