Spring 的資料整合之旅簡史

Spring 的資料整合之旅始於 Spring Integration。憑藉其程式設計模型,它提供了一致的開發體驗,用於構建能夠採納 Enterprise Integration Patterns 以連線資料庫、訊息代理等外部系統的應用。

快進到雲時代,微服務已在企業環境中變得舉足輕重。Spring Boot 改變了開發者構建應用的方式。藉助 Spring 的程式設計模型和 Spring Boot 處理的執行時職責,開發獨立的、生產級 Spring 微服務變得無縫便捷。

為了將此擴充套件到資料整合工作負載,Spring Integration 和 Spring Boot 被整合到了一個新專案中。Spring Cloud Stream 應運而生。

使用 Spring Cloud Stream,開發者可以:

  • 獨立構建、測試和部署以資料為中心的應用。

  • 應用現代微服務架構模式,包括透過訊息進行組合。

  • 以事件為中心進行思考,解耦應用職責。事件可以代表時間中發生的某個事情,下游消費者應用可以對此作出反應,而無需知道事件的來源或生產者的身份。

  • 將業務邏輯移植到訊息代理上(如 RabbitMQ、Apache Kafka、Amazon Kinesis)。

  • 對於常見用例,可以依賴框架的自動內容型別支援。也可以擴充套件支援不同的資料轉換型別。

  • 以及更多功能...

快速入門

在深入瞭解任何細節之前,您可以透過遵循這個三步指南,在不到 5 分鐘內嘗試 Spring Cloud Stream。

我們將向您展示如何建立一個 Spring Cloud Stream 應用,該應用接收來自您選擇的訊息中介軟體(稍後會詳細介紹)的訊息,並將接收到的訊息記錄到控制檯。我們將其命名為 `LoggingConsumer`。雖然不是很實用,但它很好地介紹了 Spring Cloud Stream 的一些主要概念和抽象,使您更容易理解本使用者指南的其餘部分。

這三個步驟如下:

使用 Spring Initializr 建立示例應用

開始之前,請訪問 Spring Initializr。在那裡,您可以生成我們的 `LoggingConsumer` 應用。具體操作如下:

  1. Dependencies 部分,開始輸入 `stream`。當出現“Cloud Stream”選項時,選中它。

  2. 開始輸入 'kafka' 或 'rabbit'。

  3. 選擇“Kafka”或“RabbitMQ”。

    基本上,您選擇的是您的應用將繫結的訊息中介軟體。我們建議使用您已安裝或更熟悉安裝和執行的中介軟體。此外,正如您在 Initializr 螢幕上看到的,還有一些其他選項可供選擇。例如,您可以選擇 Gradle 作為構建工具而不是 Maven(預設)。

  4. Artifact 欄位中,輸入 'logging-consumer'。

    Artifact 欄位的值將成為應用名稱。如果您為中介軟體選擇了 RabbitMQ,您的 Spring Initializr 現在應該如下所示:

spring initializr
  1. 點選 Generate Project 按鈕。

    這樣做會將生成的專案的 zip 版本下載到您的硬碟驅動器上。

  2. 將檔案解壓到您希望用作專案目錄的資料夾中。

我們鼓勵您探索 Spring Initializr 中提供的許多可能性。它可以讓您建立各種不同的 Spring 應用。

將專案匯入 IDE

現在您可以將專案匯入到您的 IDE 中。請記住,根據不同的 IDE,您可能需要遵循特定的匯入步驟。例如,根據專案的生成方式(Maven 或 Gradle),您可能需要遵循特定的匯入步驟(例如,在 Eclipse 或 STS 中,您需要使用 File → Import → Maven → Existing Maven Project)。

匯入後,專案不應該有任何錯誤。此外,`src/main/java` 目錄應該包含 `com.example.loggingconsumer.LoggingConsumerApplication` 檔案。

理論上,此時您可以執行應用的主類。它已經是一個有效的 Spring Boot 應用。但是,它不會執行任何操作,所以我們需要新增一些程式碼。

新增訊息處理器,構建和執行

修改 `com.example.loggingconsumer.LoggingConsumerApplication` 類,使其如下所示:

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

從上面的列表可以看出:

  • 我們使用函數語言程式設計模型(參見 [Spring Cloud Function support])將一個訊息處理器定義為一個 `Consumer`。

  • 我們依賴框架約定,將此類處理器繫結到由 binder 公開的輸入目標繫結。

這樣做還可以讓您看到框架的一個核心功能:它會嘗試自動將傳入訊息的 payload 轉換為 `Person` 型別。

現在您擁有一個功能齊全的 Spring Cloud Stream 應用,它可以監聽訊息。在此基礎上,為了簡單起見,我們假設您在 第一個步驟 中選擇了 RabbitMQ。如果您已安裝並執行 RabbitMQ,您可以在 IDE 中執行其 `main` 方法來啟動應用。

您應該會看到以下輸出:

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

轉到 RabbitMQ 管理控制檯或任何其他 RabbitMQ 客戶端,向 `input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg` 傳送一條訊息。`anonymous.CbMIwdkJSBO1ZoPDOtHtCg` 部分表示組名,它是自動生成的,因此在您的環境中會有所不同。為了獲得更可預測的名稱,您可以透過設定 `spring.cloud.stream.bindings.input.group=hello`(或您喜歡的任何名稱)來使用一個顯式的組名。

訊息的內容應該是 `Person` 類的 JSON 表示形式,如下所示:

{"name":"Sam Spade"}

然後,在您的控制檯中,您應該會看到:

收到:Sam Spade

您還可以將應用構建並打包成一個 boot jar(使用 `./mvnw clean install` 命令),然後使用 `java -jar` 命令執行構建好的 JAR 檔案。

現在您擁有了一個可工作的(儘管非常基礎的)Spring Cloud Stream 應用。

流式資料上下文中的 Spring Expression Language (SpEL)

在本參考手冊中,您會遇到許多可以使用 Spring Expression Language (SpEL) 的特性和示例。理解使用它時的一些限制非常重要。

SpEL 讓您可以訪問當前的訊息 (Message) 以及您正在執行的應用上下文 (Application Context)。然而,理解 SpEL 能夠看到的資料型別非常重要,尤其是在傳入訊息的上下文中。從訊息代理接收的訊息是以 `byte[]` 陣列的形式到達的。然後,binder 將其轉換為 `Message`,您可以看到訊息的 payload 保持其原始形式。訊息的 headers 是 `` 型別,其中值通常是另一個原始型別或原始型別的集合/陣列,因此是 Object。這是因為 binder 不知道所需的輸入型別,因為它無法訪問使用者程式碼(函式)。所以,實際上 binder 傳遞了一個帶有 payload 和一些可讀元資料(以訊息 headers 的形式)的信封,就像郵件投遞的信件一樣。這意味著,雖然可以訪問訊息的 payload,但您只能以原始資料(即 `byte[]`)的形式訪問它。開發者可能經常希望 SpEL 能以具體型別(例如 Foo, Bar 等)訪問 payload 物件的欄位,但您可以看到這有多麼困難甚至不可能實現。這裡有一個例子來演示這個問題;假設您有一個路由表示式,根據 payload 型別路由到不同的函式。這個要求意味著需要將 payload 從 `byte[]` 轉換為特定型別,然後再應用 SpEL。然而,為了執行這種轉換,我們需要知道要傳遞給轉換器 (converter) 的實際型別,而這個型別來自函式的簽名 (function’s signature),我們不知道是哪個簽名。解決這個問題的更好方法是將型別資訊作為訊息 header 傳遞(例如,`application/json;type=foo.bar.Baz`)。您將獲得一個清晰可讀的 String 值,該值可以在易於讀取的 SpEL 表示式中進行訪問和評估。

此外,使用 payload 進行路由決策被認為是非常糟糕的做法,因為 payload 被認為是特權資料——只有最終接收者才能讀取的資料。再次以郵件投遞類比,您不會希望郵遞員開啟您的信封並閱讀信件內容來決定如何投遞。同樣的道理也適用於此處,特別是當生成訊息時包含此類資訊相對容易時。這有助於強制執行與網路傳輸資料設計相關的某些紀律,明確哪些資料部分可以被視為公共的,哪些是特權的。