Spring Integration Framework 概述

Spring Integration 透過擴充套件 Spring 程式設計模型來支援著名的 企業整合模式(Enterprise Integration Patterns)。它在基於 Spring 的應用中實現了輕量級訊息傳遞,並透過宣告式介面卡支援與外部系統的整合。這些介面卡在 Spring 現有對遠端呼叫、訊息傳遞和排程支援的基礎上,提供了更高層次的抽象。

Spring Integration 的主要目標是提供一種構建企業整合解決方案的簡單模型,同時保持關注點分離,這對於生成可維護、可測試的程式碼至關重要。

Spring Integration 概述

本章提供了 Spring Integration 核心概念和元件的高層介紹。它包含一些程式設計技巧,幫助您充分利用 Spring Integration。

背景

Spring Framework 的一個關鍵主題是控制反轉(Inversion of Control,IoC)。從最廣泛的意義上講,這意味著框架代表其上下文內管理的元件處理職責。元件本身得到了簡化,因為它們被免除了這些職責。例如,依賴注入減輕了元件定位或建立其依賴項的責任。類似地,面向切面程式設計透過將通用的橫切關注點模組化為可重用的切面,從而減輕了業務元件的負擔。在每種情況下,最終結果都是一個更易於測試、理解、維護和擴充套件的系統。

此外,Spring Framework 和相關專案為構建企業應用提供了全面的程式設計模型。開發人員受益於該模型的一致性,特別是它基於成熟的最佳實踐,例如面向介面程式設計和優先使用組合而非繼承。Spring 簡化的抽象和強大的支援庫提高了開發人員的生產力,同時增加了可測試性和可移植性。

Spring Integration 正是基於這些相同的目標和原則。它將 Spring 程式設計模型擴充套件到訊息傳遞領域,並基於 Spring 現有的企業整合支援,提供了更高層次的抽象。它支援訊息驅動架構,其中控制反轉應用於執行時關注點,例如何時應執行特定業務邏輯以及應將響應傳送到何處。它支援訊息的路由和轉換,以便整合不同的傳輸方式和不同的資料格式,而不會影響可測試性。換句話說,訊息傳遞和整合關注點由框架處理。業務元件進一步與基礎設施隔離,開發人員也從複雜的整合職責中解脫出來。

作為 Spring 程式設計模型的擴充套件,Spring Integration 提供了多種配置選項,包括註解、支援名稱空間的 XML、帶通用“bean”元素的 XML 以及直接使用底層 API。該 API 基於定義明確的策略介面和非侵入式委託介面卡。Spring Integration 的設計靈感來自於對 Spring 內部常見模式與 Gregor Hohpe 和 Bobby Woolf 所著《企業整合模式(Enterprise Integration Patterns)》(Addison Wesley,2004 年)中所述的著名模式之間強烈關聯性的認識。讀過這本書的開發人員應該會立即熟悉 Spring Integration 的概念和術語。

目標與原則

Spring Integration 的目標如下

  • 提供一種簡單的模型來實現複雜的企業整合解決方案。

  • 促進基於 Spring 的應用中的非同步、訊息驅動行為。

  • 促進現有 Spring 使用者直觀、漸進地採用。

Spring Integration 遵循以下原則

  • 元件應松耦合以實現模組化和可測試性。

  • 框架應強制分離業務邏輯和整合邏輯的關注點。

  • 擴充套件點本質上應該是抽象的(但在明確定義的範圍內),以促進重用和可移植性。

主要元件

從垂直角度來看,分層架構有利於關注點分離,而層之間的基於介面的契約促進了松耦合。基於 Spring 的應用通常採用這種設計,Spring Framework 和相關專案為企業應用的整個技術棧遵循這種最佳實踐提供了堅實的基礎。訊息驅動架構增加了橫向視角,但這些相同的目標仍然相關。正如“分層架構”是一個非常通用和抽象的正規化一樣,訊息系統通常遵循類似的抽象“管道與過濾器(pipes-and-filters)”模型。“過濾器”代表任何能夠生成或消費訊息的元件,“管道”則在過濾器之間傳輸訊息,從而使元件本身保持松耦合。值得注意的是,這兩種高層正規化並非互斥。支援“管道”的底層訊息基礎設施仍應封裝在一個層中,該層的契約由介面定義。同樣,“過濾器”本身也應在邏輯上位於應用服務層之上的層中進行管理,透過介面與這些服務互動,其方式與 Web 層非常相似。

訊息

在 Spring Integration 中,訊息是任何 Java 物件的通用包裝器,並結合了框架在處理該物件時使用的元資料。它由一個 Payload 和 Headers 組成。Payload 可以是任何型別,Headers 包含常用的必需資訊,例如 ID、時間戳、關聯 ID 和回覆地址。Headers 也用於向連線的傳輸傳送或從連線的傳輸接收值。例如,從接收到的檔案建立訊息時,檔名可以儲存在 Header 中,供下游元件訪問。同樣,如果訊息內容最終將由出站郵件介面卡傳送,各種屬性(收件人、發件人、抄送、主題等)可以由上游元件配置為訊息頭值。開發人員還可以在 Headers 中儲存任何任意的鍵值對。

Message
圖 1. 訊息

訊息通道

訊息通道代表了管道與過濾器架構中的“管道”。生產者將訊息傳送到通道,消費者從通道接收訊息。因此,訊息通道解耦了訊息元件,並提供了方便的訊息攔截和監控點。

Message Channel
圖 2. 訊息通道

訊息通道可以遵循點對點或釋出-訂閱語義。對於點對點通道,傳送到通道的每條訊息最多隻能有一個消費者接收。另一方面,釋出-訂閱通道嘗試將每條訊息廣播給通道上的所有訂閱者。Spring Integration 支援這兩種模型。

雖然“點對點”和“釋出-訂閱”定義了最終有多少消費者接收每條訊息的兩種選項,但還有另一個重要的考慮因素:通道是否應該緩衝訊息?在 Spring Integration 中,可輪詢通道能夠在佇列中緩衝訊息。緩衝的優勢在於它可以限制入站訊息的速度,從而防止消費者過載。然而,顧名思義,這也增加了一些複雜性,因為只有配置了輪詢器,消費者才能從這種通道接收訊息。另一方面,連線到可訂閱通道的消費者只是訊息驅動的。訊息通道實現 詳細討論了 Spring Integration 中可用的各種通道實現。

訊息端點

Spring Integration 的主要目標之一是透過控制反轉來簡化企業整合解決方案的開發。這意味著您不必直接實現消費者和生產者,甚至不必構建訊息並在訊息通道上呼叫傳送或接收操作。相反,您應該能夠專注於您的特定領域模型,並基於普通物件進行實現。然後,透過提供宣告式配置,您可以將特定領域的程式碼“連線”到 Spring Integration 提供的訊息基礎設施。負責這些連線的元件就是訊息端點。這並不意味著您必須直接連線您現有的應用程式碼。任何實際的企業整合解決方案都需要一些專注於整合關注點的程式碼,例如路由和轉換。重要的是實現整合邏輯和業務邏輯之間的關注點分離。換句話說,就像 Web 應用的 Model-View-Controller (MVC) 正規化一樣,目標應該是提供一個輕薄但專用的層,將入站請求轉換為服務層呼叫,然後將服務層返回值轉換為出站回覆。下一節概述了處理這些職責的訊息端點型別,在後續章節中,您將看到 Spring Integration 的宣告式配置選項如何提供非侵入式的方式來使用這些元件。

訊息端點

訊息端點代表了管道與過濾器架構中的“過濾器”。如前所述,端點的主要作用是以非侵入式的方式將應用程式碼連線到訊息框架。換句話說,應用程式碼理想情況下應該完全感知不到訊息物件或訊息通道。這類似於 MVC 正規化中控制器的作用。就像控制器處理 HTTP 請求一樣,訊息端點處理訊息。就像控制器對映到 URL 模式一樣,訊息端點對映到訊息通道。這兩種情況下的目標都是相同的:將應用程式碼與基礎設施隔離。這些概念以及隨後的所有模式都在《企業整合模式(Enterprise Integration Patterns)》一書中有詳細討論。在這裡,我們僅提供 Spring Integration 支援的主要端點型別以及與這些型別相關的角色的高層描述。接下來的章節將詳細闡述並提供示例程式碼和配置示例。

訊息轉換器

訊息轉換器負責轉換訊息的內容或結構,並返回修改後的訊息。最常見的轉換器型別可能是將訊息的 Payload 從一種格式轉換為另一種格式(例如從 XML 轉換為 java.lang.String)。類似地,轉換器可以新增、刪除或修改訊息的 Header 值。

訊息過濾器

訊息過濾器決定訊息是否應該傳遞到輸出通道。這隻需要一個布林測試方法,該方法可以檢查特定的 Payload 內容型別、屬性值、Header 是否存在或其他條件。如果訊息被接受,則傳送到輸出通道。否則,訊息將被丟棄(或者,對於更嚴格的實現,可能會丟擲 Exception)。訊息過濾器通常與釋出-訂閱通道結合使用,在釋出-訂閱通道中,多個消費者可能會接收到同一條訊息,並使用過濾器的條件來縮小要處理的訊息集。

請注意不要混淆管道與過濾器架構模式中“過濾器”的泛指用法與此處特指的選擇性過濾兩個通道之間訊息流動的端點型別。管道與過濾器概念中的“過濾器”更接近 Spring Integration 的訊息端點:任何可以連線到訊息通道以傳送或接收訊息的元件。

訊息路由器

訊息路由器負責決定接下來哪個通道或哪些通道(如果有)應該接收訊息。通常,該決定基於訊息的內容或訊息 Header 中可用的元資料。訊息路由器通常用作服務啟用器或能傳送回覆訊息的其他端點上靜態配置輸出通道的動態替代方案。同樣,訊息路由器提供了對先前描述的多個訂閱者使用的響應式訊息過濾器的一種主動替代方案。

Router
圖 3. 訊息路由器

分發器

分發器是另一種訊息端點型別,其職責是接收來自其輸入通道的訊息,將該訊息分割成多條訊息,並將每條訊息傳送到其輸出通道。這通常用於將“複合”Payload 物件分解為包含細分 Payload 的一組訊息。

聚合器

聚合器基本上是分發器的映象,它是一種接收多條訊息並將其組合成一條訊息的訊息端點型別。實際上,聚合器通常是包含分發器的管道中的下游消費者。從技術上講,聚合器比分發器更復雜,因為它需要維護狀態(要聚合的訊息),決定何時完整的一組訊息可用,並在必要時進行超時處理。此外,在超時情況下,聚合器需要知道是傳送部分結果、丟棄它們還是將它們傳送到單獨的通道。Spring Integration 提供了 CorrelationStrategyReleaseStrategy,以及用於超時、超時時是否傳送部分結果以及丟棄通道的可配置設定。

服務啟用器

服務啟用器是用於將服務例項連線到訊息系統的通用端點。必須配置輸入訊息通道,並且,如果被呼叫的服務方法能夠返回值,也可以提供輸出訊息通道。

輸出通道是可選的,因為每條訊息也可能提供自己的“回覆地址”Header。此規則適用於所有消費者端點。

服務啟用器呼叫某個服務物件上的操作來處理請求訊息,提取請求訊息的 Payload 並進行轉換(如果方法不期望訊息型別的引數)。當服務物件的方法返回一個值時,該返回值同樣會在必要時轉換為回覆訊息(如果它本身不是訊息型別)。該回復訊息被髮送到輸出通道。如果未配置輸出通道,則回覆訊息將被髮送到訊息“回覆地址”中指定的通道(如果可用)。

請求-回覆服務啟用器端點將目標物件的方法連線到輸入和輸出訊息通道。

handler endpoint
圖 4. 服務啟用器
如前所述,在訊息通道中,通道可以是可輪詢的或可訂閱的。在上面的圖中,這透過“時鐘”符號和實線箭頭(輪詢)以及虛線箭頭(訂閱)來表示。

通道介面卡

通道介面卡是一個端點,它將訊息通道連線到其他系統或傳輸。通道介面卡可以是入站的,也可以是出站的。通常,通道介面卡會在訊息與從其他系統接收或傳送到其他系統的任何物件或資源(檔案、HTTP請求、JMS訊息等)之間進行一些對映。根據傳輸方式的不同,通道介面卡還可能填充或提取訊息頭的值。Spring Integration 提供了許多通道介面卡,這些介面卡將在後續章節中介紹。

source endpoint
圖5. 入站通道介面卡端點將源系統連線到MessageChannel
訊息源可以是可輪詢的(例如 POP3),也可以是訊息驅動的(例如 IMAP Idle)。在前面的圖中,這由“時鐘”符號、實心箭頭(輪詢)和虛線箭頭(訊息驅動)表示。
target endpoint
圖6. 出站通道介面卡端點將MessageChannel連線到目標系統。
訊息通道一節中討論的,通道可以是可輪詢的或可訂閱的。在前面的圖中,這由“時鐘”符號、實心箭頭(輪詢)和虛線箭頭(訂閱)表示。

端點 Bean 名稱

消費端點(任何具有inputChannel的端點)由兩個 bean 組成:消費者和訊息處理器。消費者引用訊息處理器,並在訊息到達時呼叫它。

考慮以下 XML 示例

<int:service-activator id = "someService" ... />

鑑於前面的示例,bean 名稱如下

  • 消費者:someServiceid

  • 處理器:someService.handler

使用企業整合模式 (EIP) 註解時,名稱取決於多種因素。考慮以下帶有註解的 POJO 示例

@Component
public class SomeComponent {

    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

鑑於前面的示例,bean 名稱如下

  • 消費者:someComponent.someMethod.serviceActivator

  • 處理器:someComponent.someMethod.serviceActivator.handler

從 5.0.4 版本開始,您可以使用@EndpointId註解修改這些名稱,如下例所示

@Component
public class SomeComponent {

    @EndpointId("someService")
    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

鑑於前面的示例,bean 名稱如下

  • 消費者:someService

  • 處理器:someService.handler

@EndpointId建立的名稱與 XML 配置中使用id屬性建立的名稱相同。考慮以下帶有註解的 bean 示例

@Configuration
public class SomeConfiguration {

    @Bean
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}

鑑於前面的示例,bean 名稱如下

  • 消費者:someConfiguration.someHandler.serviceActivator

  • 處理器:someHandler@Bean名稱)

從 5.0.4 版本開始,您可以使用@EndpointId註解修改這些名稱,如下例所示

@Configuration
public class SomeConfiguration {

    @Bean("someService.handler")             (1)
    @EndpointId("someService")               (2)
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}
1 處理器:someService.handler(bean 名稱)
2 消費者:someService(端點 ID)

只要您遵循將.handler附加到@Bean名稱的約定,@EndpointId註解建立的名稱就與 XML 配置中使用id屬性建立的名稱相同。

有一個特殊情況會建立第三個 bean:出於架構原因,如果一個MessageHandler @Bean沒有定義AbstractReplyProducingMessageHandler,框架會將提供的 bean 包裝在一個ReplyProducingMessageHandlerWrapper中。這個包裝器支援請求處理器建議處理,併發出正常的“produced no reply”除錯日誌訊息。它的 bean 名稱是處理器 bean 名稱加上.wrapper(當存在@EndpointId — 否則,它是正常的生成處理器名稱)。

類似地,可輪詢訊息源建立兩個 bean:一個SourcePollingChannelAdapter (SPCA) 和一個MessageSource

考慮以下 XML 配置

<int:inbound-channel-adapter id = "someAdapter" ... />

鑑於前面的 XML 配置,bean 名稱如下

  • SPCA:someAdapterid

  • 處理器:someAdapter.source

考慮以下帶有註解@EndpointId的 POJO 的 Java 配置

@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
    ...
}

鑑於前面的 Java 配置示例,bean 名稱如下

  • SPCA:someAdapter

  • 處理器:someAdapter.source

考慮以下帶有註解@EndpointID的 bean 的 Java 配置

@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
    return () -> {
        ...
    };
}

鑑於前面的示例,bean 名稱如下

  • SPCA:someAdapter

  • 處理器:someAdapter.source(只要您遵循將.source附加到@Bean名稱的約定)

配置與@EnableIntegration

在本文件中,您可以看到許多引用,關於用於在 Spring Integration 流中宣告元素的 XML 名稱空間支援。這種支援由一系列名稱空間解析器提供,它們生成適當的 bean 定義來實現特定的元件。例如,許多端點由一個MessageHandler bean 和一個ConsumerEndpointFactoryBean組成,處理器和輸入通道名稱被注入其中。

當第一次遇到 Spring Integration 名稱空間元素時,框架會自動宣告許多 bean(任務排程器、隱式通道建立器等),這些 bean 用於支援執行時環境。

版本 4.0 引入了@EnableIntegration註解,以允許註冊 Spring Integration 基礎設施 bean(參閱Javadoc)。當僅使用 Java 配置時,此註解是必需的——例如在使用 Spring Boot 或 Spring Integration 訊息註解支援,以及沒有 XML 整合配置的 Spring Integration Java DSL 時。

@EnableIntegration註解在您有一個沒有 Spring Integration 元件的父上下文以及兩個或更多使用 Spring Integration 的子上下文時也很有用。它允許這些通用元件只在父上下文中宣告一次。

@EnableIntegration註解會嚮應用上下文註冊許多基礎設施元件。特別是,它會

  • 註冊一些內建 bean,例如errorChannel及其LoggingHandler、用於輪詢器的taskSchedulerjsonPath SpEL 函式等等。

  • 新增多個BeanFactoryPostProcessor例項,以增強全域性和預設整合環境的BeanFactory

  • 新增多個BeanPostProcessor例項,以增強或轉換幷包裝特定 bean 用於整合目的。

  • 添加註解處理器來解析訊息註解,並嚮應用上下文註冊相應的元件。

@IntegrationComponentScan註解也允許類路徑掃描。此註解的作用類似於標準的 Spring Framework @ComponentScan註解,但它僅限於 Spring Integration 特有的元件和註解,而標準的 Spring Framework 元件掃描機制無法觸及這些元件和註解。例如,請參閱@MessagingGateway註解

@EnablePublisher註解註冊一個PublisherAnnotationBeanPostProcessor bean,併為那些沒有提供channel屬性的@Publisher註解配置default-publisher-channel。如果找到多個@EnablePublisher註解,它們必須都具有相同的預設通道值。有關更多資訊,請參閱使用@Publisher註解的註解驅動配置

引入了@GlobalChannelInterceptor註解,用於標記用於全域性通道攔截的ChannelInterceptor bean。此註解類似於<int:channel-interceptor> XML 元素(參閱全域性通道攔截器配置)。@GlobalChannelInterceptor註解可以放置在類級別(帶有@Component刻板印象註解)或@Configuration類中的@Bean方法上。無論哪種情況,bean 都必須實現ChannelInterceptor

從 5.1 版本開始,全域性通道攔截器應用於動態註冊的通道——例如使用beanFactory.initializeBean()或透過IntegrationFlowContext使用 Java DSL 初始化 bean。在此之前,當 bean 在應用上下文重新整理後建立時,攔截器不會被應用。

@IntegrationConverter註解將ConverterGenericConverterConverterFactory bean 標記為integrationConversionService的候選轉換器。此註解類似於<int:converter> XML 元素(參閱Payload 型別轉換)。您可以將@IntegrationConverter註解放置在類級別(帶有@Component刻板印象註解)或@Configuration類中的@Bean方法上。

有關訊息註解的更多資訊,請參閱註解支援

程式設計注意事項

Spring Integration 中的大多數類(除非另有說明)必須在應用上下文中宣告為 bean 並作為單例。這意味著這些類的例項是執行緒安全的,它們的生命週期以及與其他元件的連線由 Spring 依賴注入容器管理。實用工具類和構建器類(JacksonJsonUtils, `MessageBuilder, ExpressionEvalMap, IntegrationReactiveUtils 等)可以直接在 Java 程式碼中使用。然而,Java DSL 工廠和IntegrationComponentSpec實現結果仍然必須註冊為應用上下文中的 bean。許多模組中存在的Session抽象不是執行緒安全的,通常透過Factory模式實現建立,並在執行緒安全的Template模式中使用。例如,請參閱SftpRemoteFileTemplate及其與DefaultSftpSessionFactory的關係。

您應儘可能使用普通 Java 物件 (POJO)(用於目標邏輯中的訊息處理),並且僅在絕對必要時才在程式碼中暴露框架。有關更多資訊,請參閱POJO 方法呼叫

如果您確實在類中暴露了框架,則需要考慮一些事項,尤其是在應用啟動期間

  • 如果您的元件是ApplicationContextAware,通常不應在setApplicationContext()方法中使用ApplicationContext。相反,應儲存一個引用,並將此類使用推遲到上下文生命週期稍後進行。

  • 如果您的元件是InitializingBean或使用@PostConstruct方法,請不要從這些初始化方法中傳送任何訊息。呼叫這些方法時,應用上下文尚未初始化,傳送此類訊息可能會失敗。如果您需要在啟動期間傳送訊息,請實現ApplicationListener並等待ContextRefreshedEvent。或者,實現SmartLifecycle,將您的 bean 放在一個較晚的階段,然後從start()方法傳送訊息。

使用打包(例如 Shaded)Jar 時的注意事項

Spring Integration 使用 Spring Framework 的SpringFactories機制引導某些功能,以載入多個IntegrationConfigurationInitializer類。這包括-core jar 以及其他一些 jar,包括-http-jmx。此過程的資訊儲存在每個 jar 的META-INF/spring.factories檔案中。

一些開發者傾向於使用知名工具(例如Apache Maven Shade Plugin)將其應用和所有依賴項重新打包到一個單獨的 jar 中。

預設情況下,shade 外掛在生成 shaded jar 時不會合並spring.factories檔案。

除了spring.factories,還有其他META-INF檔案(spring.handlersspring.schemas)用於 XML 配置。這些檔案也需要合併。

Spring Boot 的可執行 jar 機制採用了不同的方法,它將 jar 檔案巢狀,從而保留類路徑上的每個spring.factories檔案。因此,對於 Spring Boot 應用,如果您使用其預設的可執行 jar 格式,則無需額外操作。

即使您不使用 Spring Boot,您仍然可以使用 Boot 提供的工具,透過為上述檔案新增 transformer 來增強 shade 外掛。以下示例展示瞭如何配置外掛

示例 1. pom.xml
...
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <configuration>
                <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
                <createDependencyReducedPom>true</createDependencyReducedPom>
            </configuration>
            <dependencies>
                <dependency> (1)
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                </dependency>
            </dependencies>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers> (2)
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.handlers</resource>
                            </transformer>
                            <transformer
                                implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                <resource>META-INF/spring.factories</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.schemas</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
...

具體來說,

1 新增spring-boot-maven-plugin作為依賴項。
2 配置 transformer。

您可以為${spring.boot.version}新增一個屬性,或使用明確的版本。

程式設計技巧與竅門

本節介紹了一些充分利用 Spring Integration 的方法。

XML Schema

使用 XML 配置時,為了避免出現錯誤的 schema 驗證錯誤,您應該使用“Spring 感知”的 IDE,例如 Spring Tool Suite (STS)、帶有 Spring IDE 外掛的 Eclipse 或 IntelliJ IDEA。這些 IDE 知道如何從類路徑解析正確的 XML schema(透過使用 jar 中的META-INF/spring.schemas檔案)。在使用 STS 或帶有外掛的 Eclipse 時,您必須在專案上啟用Spring Project Nature

某些遺留模組(版本 1.0 中存在的模組)在網際網路上託管的 schema 是 1.0 版本,這是出於相容性原因。如果您的 IDE 使用這些 schema,您很可能會看到錯誤提示。

每個線上 schema 都有一個類似於以下的警告

此 schema 用於 Spring Integration Core 的 1.0 版本。我們無法將其更新到當前 schema,因為這將破壞使用 1.0.3 或更低版本的任何應用。對於後續版本,將從類路徑解析“無版本”schema 並從 jar 中獲取。請參閱 GitHub

受影響的模組有

  • corespring-integration.xsd

  • file

  • http

  • jms

  • mail

  • security

  • stream

  • ws

  • xml

查詢 Java 和 DSL 配置的類名

透過 XML 配置和 Spring Integration 名稱空間支援,XML 解析器隱藏了目標 bean 如何宣告和連線在一起的細節。對於 Java 配置,瞭解面向終端使用者應用的框架 API 至關重要。

EIP 實現中的一等公民是MessageChannelEndpoint(參閱本章前面的主要元件)。它們的實現(契約)是

  • org.springframework.messaging.Message:參閱Message

  • org.springframework.messaging.MessageChannel:參閱訊息通道

  • org.springframework.integration.endpoint.AbstractEndpoint:參閱Poller

前兩個很容易理解如何實現、配置和使用。最後一個值得更多關注

AbstractEndpoint在整個 Spring Framework 中廣泛用於不同的元件實現。其主要實現是

  • EventDrivenConsumer,當我們訂閱SubscribableChannel以監聽訊息時使用。

  • PollingConsumer,當我們從PollableChannel輪詢訊息時使用。

使用訊息註解或 Java DSL 時,您無需擔心這些元件,因為框架會使用適當的註解和BeanPostProcessor實現自動生成它們。手動構建元件時,您應該使用ConsumerEndpointFactoryBean來幫助確定要建立的目標AbstractEndpoint消費者實現,這基於提供的inputChannel屬性。

另一方面,ConsumerEndpointFactoryBean將委託給框架中的另一個一等公民 - org.springframework.messaging.MessageHandler。此介面實現的目的是處理由端點從通道消費的訊息。Spring Integration 中的所有 EIP 元件都是MessageHandler實現(例如,AggregatingMessageHandlerMessageTransformingHandlerAbstractMessageSplitter等)。目標協議的出站介面卡(FileWritingMessageHandlerHttpRequestExecutingMessageHandlerAbstractMqttMessageHandler等)也是MessageHandler實現。使用 Java 配置開發 Spring Integration 應用時,您應該檢視 Spring Integration 模組,找到適合用於@ServiceActivator配置的MessageHandler實現。例如,要傳送 XMPP 訊息(參閱XMPP 支援),您應該配置類似以下內容

@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
    ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);

    DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
    xmppHeaderMapper.setRequestHeaderNames("*");
    handler.setHeaderMapper(xmppHeaderMapper);

    return handler;
}

MessageHandler實現表示訊息流的出站和處理部分。

入站訊息流側有其自身的元件,分為輪詢和監聽行為。監聽(訊息驅動)元件很簡單,通常只需要一個目標類實現就可以準備好生成訊息。監聽元件可以是一次性的MessageProducerSupport實現(例如AbstractMqttMessageDrivenChannelAdapterImapIdleChannelAdapter),也可以是請求-回覆的MessagingGatewaySupport實現(例如AmqpInboundGatewayAbstractWebServiceInboundGateway)。

輪詢入站端點適用於那些不提供監聽器 API 或不適合此類行為的協議,包括任何基於檔案的協議(例如 FTP)、任何資料庫(RDBMS 或 NoSQL)等。

這些入站端點由兩個元件組成:輪詢器配置,用於定期啟動輪詢任務;以及訊息源類,用於從目標協議讀取資料併為下游整合流生成訊息。輪詢器配置的第一個類是SourcePollingChannelAdapter。它是另一種AbstractEndpoint實現,但特別用於輪詢以啟動整合流。通常,使用訊息註解或 Java DSL 時,您無需擔心這個類。框架會根據@InboundChannelAdapter配置或 Java DSL 構建器規範為其生成一個 bean。

訊息源元件對於目標應用開發更為重要,它們都實現了MessageSource介面(例如,MongoDbMessageSourceAbstractTwitterMessageSource)。考慮到這一點,我們使用 JDBC 從 RDBMS 表讀取資料的配置可能類似於以下內容

@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
    return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}

您可以在特定的 Spring Integration 模組中找到目標協議所需的所有入站和出站類(在大多數情況下,在相應的包中)。例如,spring-integration-websocket介面卡包括

  • o.s.i.websocket.inbound.WebSocketInboundChannelAdapter:實現MessageProducerSupport,用於監聽 socket 上的幀並將訊息傳送到通道。

  • o.s.i.websocket.outbound.WebSocketOutboundMessageHandler:一次性的AbstractMessageHandler實現,用於將接收到的訊息轉換為適當的幀並透過 websocket 傳送。

如果您熟悉 Spring Integration XML 配置,從版本 4.3 開始,我們在 XSD 元素定義中提供了關於哪些目標類用於宣告介面卡或閘道器 bean 的資訊,如下例所示

<xsd:element name="outbound-async-gateway">
    <xsd:annotation>
		<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
       </xsd:documentation>
	</xsd:annotation>

POJO 方法呼叫

程式設計注意事項中所述,我們推薦使用 POJO 程式設計風格,如下例所示

@ServiceActivator
public String myService(String payload) { ... }

在這種情況下,框架會提取String payload,呼叫您的方法,並將結果包裝在一條訊息中傳送到流程中的下一個元件(原始訊息頭會複製到新訊息中)。事實上,如果您使用 XML 配置,您甚至不需要@ServiceActivator註解,如下面的成對示例所示

<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }

只要類中的 public 方法沒有歧義,您可以省略method屬性。

您也可以在 POJO 方法中獲取訊息頭資訊,如下例所示

@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }

您還可以解引用訊息上的屬性,如下例所示

@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }

由於有各種 POJO 方法呼叫方式,5.0 版本之前使用 SpEL(Spring Expression Language)來呼叫 POJO 方法。與方法中實際執行的工作相比,SpEL(即使是解釋執行)對於這些操作通常“足夠快”。然而,從 5.0 版本開始,只要可能,預設使用org.springframework.messaging.handler.invocation.InvocableHandlerMethod。這種技術通常比解釋執行的 SpEL 執行速度更快,並且與其他 Spring 訊息專案一致。InvocableHandlerMethod類似於在 Spring MVC 中用於呼叫控制器方法的技術。某些方法在使用 SpEL 時仍然總是被呼叫。例如,前面討論的帶有解引用屬性的註解引數。這是因為 SpEL 具有導航屬性路徑的能力。

可能還有一些我們沒有考慮到的邊緣情況,這些情況也無法與InvocableHandlerMethod例項一起工作。因此,在這些情況下,我們會自動回退到使用 SpEL。

如果您願意,您也可以設定您的 POJO 方法,使其始終使用 SpEL,並使用UseSpelInvoker註解,如下例所示

@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }

如果省略compilerMode屬性,則spring.expression.compiler.mode系統屬性決定編譯器模式。有關編譯 SpEL 的更多資訊,請參閱SpEL 編譯