4.0.5

前言

Spring 資料整合歷程簡史

Spring 的資料整合歷程始於 Spring Integration。憑藉其程式設計模型,它提供了穩定一致的開發體驗,用於構建可以採用 企業整合模式 來連線外部系統(例如資料庫、訊息代理等)的應用。

快進到雲時代,微服務在企業環境中變得越來越重要。Spring Boot 改變了開發者構建應用的方式。憑藉 Spring 的程式設計模型以及 Spring Boot 處理的執行時職責,開發基於 Spring 的獨立、生產級微服務變得無縫順暢。

為了將此擴充套件到資料整合工作負載,Spring Integration 和 Spring Boot 被整合到一個新專案中。Spring Cloud Stream 就此誕生。

藉助 Spring Cloud Stream,開發者可以:

  • 獨立構建、測試和部署以資料為中心的應用。

  • 應用現代微服務架構模式,包括透過訊息傳遞進行組合。

  • 透過以事件為中心的思維方式解耦應用職責。事件可以代表在某個時間點發生的事情,下游消費者應用可以對此作出反應,而無需知道事件的來源或生產者的身份。

  • 將業務邏輯遷移到訊息代理上(例如 RabbitMQ, Apache Kafka, Amazon Kinesis)。

  • 依賴框架對常見用例的自動內容型別支援。也可以擴充套件支援不同的資料轉換型別。

  • 還有更多功能。 . .

快速入門

您可以按照這個三步指南,在不到 5 分鐘的時間內嘗試 Spring Cloud Stream,甚至在深入瞭解任何細節之前。

我們將向您展示如何建立一個 Spring Cloud Stream 應用,該應用接收來自您選擇的訊息中介軟體的訊息(稍後會詳細介紹),並將收到的訊息記錄到控制檯。我們稱之為 LoggingConsumer。雖然不是很實用,但它很好地介紹了 Spring Cloud Stream 的一些主要概念和抽象,使您更容易理解本使用者指南的其餘部分。

這三個步驟如下:

使用 Spring Initializr 建立示例應用

要開始使用,請訪問 Spring Initializr。在那裡,您可以生成我們的 LoggingConsumer 應用。具體步驟如下:

  1. Dependencies 部分,開始輸入 stream。當出現“Cloud Stream”選項時,選擇它。

  2. 開始輸入 'kafka' 或 'rabbit'。

  3. 選擇“Kafka”或“RabbitMQ”。

    基本上,您選擇應用要繫結的訊息中介軟體。我們建議使用您已安裝或更熟悉安裝和執行的中介軟體。此外,正如您從 Initializr 螢幕上看到的,還有其他一些選項可供選擇。例如,您可以選擇 Gradle 作為構建工具,而不是 Maven(預設)。

  4. Artifact 欄位中,輸入 'logging-consumer'。

    Artifact 欄位的值將成為應用名稱。如果您選擇 RabbitMQ 作為中介軟體,您的 Spring Initializr 現在應該如下所示:

spring initializr
  1. 點選 Generate Project 按鈕。

    這樣做會將生成的專案的壓縮版本下載到您的硬碟。

  2. 將檔案解壓縮到您想要用作專案目錄的資料夾中。

我們鼓勵您探索 Spring Initializr 中的眾多可能性。它允許您建立多種不同型別的 Spring 應用。

將專案匯入您的 IDE

現在您可以將專案匯入您的 IDE。請記住,根據不同的 IDE,您可能需要遵循特定的匯入過程。例如,根據專案的生成方式(Maven 或 Gradle),您可能需要遵循特定的匯入過程(例如,在 Eclipse 或 STS 中,您需要使用 File → Import → Maven → Existing Maven Project)。

匯入後,專案不應有任何錯誤。此外,src/main/java 目錄應包含 com.example.loggingconsumer.LoggingConsumerApplication 檔案。

技術上講,此時您可以執行應用的主類。它已經是一個有效的 Spring Boot 應用。但是,它目前沒有任何功能,所以我們需要新增一些程式碼。

新增訊息處理器、構建和執行

修改 com.example.loggingconsumer.LoggingConsumerApplication 類,使其看起來如下所示:

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

正如您從上面的程式碼清單中看到的:

  • 我們正在使用函數語言程式設計模型(參見 Spring Cloud Function 支援)將單個訊息處理器定義為一個 Consumer

  • 我們依賴於框架約定,將該處理器繫結到 Binder 公開的輸入目標繫結。

這樣做還可以讓您看到框架的核心功能之一:它嘗試自動將傳入訊息的有效載荷轉換為 Person 型別。

現在您有了一個功能齊全的 Spring Cloud Stream 應用,它可以監聽訊息。為了簡單起見,我們假設您在第一步中選擇了 RabbitMQ。假設您已經安裝並運行了 RabbitMQ,您可以在 IDE 中執行其 main 方法來啟動應用。

您應該看到以下輸出:

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

轉到 RabbitMQ 管理控制檯或任何其他 RabbitMQ 客戶端,向 input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg 傳送訊息。anonymous.CbMIwdkJSBO1ZoPDOtHtCg 部分代表組名,它是生成的,因此在您的環境中肯定會不同。為了更具可預測性,您可以透過設定 spring.cloud.stream.bindings.input.group=hello(或您喜歡的任何名稱)來使用顯式的組名。

訊息內容應是 Person 類的 JSON 表示形式,如下所示:

{"name":"Sam Spade"}

然後,在您的控制檯中,您應該會看到:

Received: Sam Spade

您還可以將應用構建並打包成一個可執行 JAR(使用 ./mvnw clean install 命令),然後使用 java -jar 命令執行構建好的 JAR。

現在您有了一個可以工作的(儘管非常基礎的)Spring Cloud Stream 應用。

流式資料上下文中的 Spring 表示式語言 (SpEL)

在本參考手冊中,您將遇到許多可以利用 Spring 表示式語言 (SpEL) 的功能和示例。在使用它時,瞭解某些限制非常重要。

SpEL 允許您訪問當前 Message 以及您正在執行的 Application Context。但是,瞭解 SpEL 能夠看到的資料型別非常重要,特別是在接收到的 Message 的上下文中。從訊息代理傳來的訊息是以 byte[] 形式到達的。然後由 Binder 將其轉換為 Message<byte[]>,您可以看到訊息的有效載荷保持其原始形式。訊息的頭是 <String, Object> 型別,其中值通常是另一個基本型別或基本型別的集合/陣列,因此是 Object。這是因為 Binder 不知道所需的輸入型別,因為它無法訪問使用者程式碼(函式)。所以,實際上,Binder 傳遞了一個包含有效載荷和一些以訊息頭形式存在的、可讀的元資料的信封,就像郵件遞送的信件一樣。這意味著,雖然可以訪問訊息的有效載荷,但您只能以原始資料(即 byte[])的形式訪問它。雖然開發者經常要求 SpEL 能夠以具體型別(例如 Foo, Bar 等)訪問有效載荷物件的欄位,但您可以看到這有多麼困難甚至不可能實現。這裡有一個例子來說明這個問題;想象一下您有一個路由表示式,根據有效載荷型別將訊息路由到不同的函式。這個需求意味著需要將有效載荷從 byte[] 轉換為特定型別,然後應用 SpEL。然而,為了執行這種轉換,我們需要知道要傳遞給轉換器的實際型別,而這來源於函式的簽名,我們不知道是哪個簽名。解決這個需求的更好方法是將型別資訊作為訊息頭傳遞(例如,application/json;type=foo.bar.Baz)。您將獲得一個清晰可讀的字串值,可以在一個易於理解的 SpEL 表示式中訪問和評估。

此外,將有效載荷用於路由決策被認為是非常糟糕的做法,因為有效載荷被視為特權資料——只能由最終接收者讀取的資料。再次使用郵件遞送的類比,您不會希望郵遞員開啟您的信封並閱讀信件內容來做出一些遞送決策。同樣的概念也適用於此,特別是在生成 Message 時包含此類資訊相對容易的情況下。這強制要求在設計透過網路傳輸的資料時遵循一定程度的規範,明確哪些資料片段可以被視為公共的,哪些是特權的。

Spring Cloud Stream 介紹

Spring Cloud Stream 是一個用於構建訊息驅動微服務應用的框架。Spring Cloud Stream 構建於 Spring Boot 之上,用於建立獨立的、生產級的 Spring 應用,並使用 Spring Integration 提供與訊息代理的連線。它提供了對多個供應商中介軟體的規定性配置,引入了持久化釋出/訂閱語義、消費者組和分割槽等概念。

透過將 spring-cloud-stream 依賴新增到您的應用 classpath 中,您可以立即連線到由提供的 spring-cloud-stream Binder 公開的訊息代理(稍後將詳細介紹),並且您可以實現您的功能需求,該需求由 java.util.function.Function(根據傳入訊息)執行。

以下程式碼清單展示了一個快速示例:

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class, args);
	}

    @Bean
	public Function<String, String> uppercase() {
	    return value -> value.toUpperCase();
	}
}

以下程式碼清單展示了相應的測試:

@SpringBootTest(classes =  SampleApplication.class)
@Import({TestChannelBinderConfiguration.class})
class BootTestStreamApplicationTests {

	@Autowired
	private InputDestination input;

	@Autowired
	private OutputDestination output;

	@Test
	void contextLoads() {
		input.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}
}

主要概念

Spring Cloud Stream 提供了一些抽象和原語,簡化了訊息驅動微服務應用的編寫。本節概述了以下內容:

應用模型

Spring Cloud Stream 應用由一箇中間件無關的核心組成。應用透過在外部訊息代理暴露的目標與程式碼中的輸入/輸出引數之間建立 繫結 來與外部世界通訊。建立繫結所需的特定於訊息代理的詳細資訊由特定於中介軟體的 Binder 實現處理。

SCSt with binder
圖 1. Spring Cloud Stream 應用

Fat JAR(可執行 JAR 包)

Spring Cloud Stream 應用可以在您的 IDE 中以獨立模式執行以進行測試。要在生產環境中執行 Spring Cloud Stream 應用,您可以使用 Maven 或 Gradle 提供的標準 Spring Boot 工具建立一個可執行的(或稱為“胖”)JAR 包。有關更多詳細資訊,請參見Spring Boot 參考指南

Binder 抽象

Spring Cloud Stream 提供了針對 KafkaRabbit MQ 的 Binder 實現。框架還包含一個用於對您的應用進行整合測試的測試 Binder,作為 Spring Cloud Stream 應用。有關更多詳細資訊,請參見測試部分。

Binder 抽象也是框架的擴充套件點之一,這意味著您可以在 Spring Cloud Stream 之上實現自己的 Binder。在 如何從頭開始建立 Spring Cloud Stream Binder 這篇文章中,一位社群成員詳細記錄了實現自定義 Binder 所需的一系列步驟,並提供了示例。這些步驟也在 實現自定義 Binder 部分中有所強調。

Spring Cloud Stream 使用 Spring Boot 進行配置,Binder 抽象使得 Spring Cloud Stream 應用在連線到中介軟體方面具有靈活性。例如,部署者可以在執行時動態選擇外部目標(如 Kafka 主題或 RabbitMQ 交換機)與訊息處理器的輸入和輸出(如函式的輸入引數及其返回值)之間的對映關係。此類配置可以透過外部配置屬性提供,並且可以是 Spring Boot 支援的任何形式(包括應用引數、環境變數以及 application.ymlapplication.properties 檔案)。在Spring Cloud Stream 介紹部分中的 Sink 示例中,將 spring.cloud.stream.bindings.input.destination 應用屬性設定為 raw-sensor-data 將使其從 raw-sensor-data Kafka 主題或繫結到 raw-sensor-data RabbitMQ 交換機的佇列讀取。

Spring Cloud Stream 會自動檢測並使用 classpath 中找到的 Binder。您可以使用相同的程式碼與不同型別的中介軟體進行互動。為此,只需在構建時包含不同的 Binder 即可。對於更復雜的用例,您還可以將多個 Binder 打包到您的應用中,並在執行時選擇 Binder(甚至為不同的繫結使用不同的 Binder)。

持久化釋出/訂閱支援

應用之間的通訊遵循釋出/訂閱模型,其中資料透過共享主題進行廣播。這可以在下圖中看到,該圖顯示了一組互動式 Spring Cloud Stream 應用的典型部署。

SCSt sensors
圖 2. Spring Cloud Stream 釋出/訂閱

感測器報告到 HTTP 端點的資料被髮送到一個名為 raw-sensor-data 的公共目標。從該目標,它被一個計算時間視窗平均值的微服務應用以及另一個將原始資料匯入 HDFS (Hadoop 分散式檔案系統) 的微服務應用獨立處理。為了處理資料,這兩個應用都在執行時將該主題宣告為其輸入。

釋出/訂閱通訊模型降低了生產者和消費者的複雜性,並允許在不中斷現有流程的情況下向拓撲結構新增新應用。例如,在計算平均值的應用下游,您可以新增一個應用來計算最高溫度值以供顯示和監控。然後,您可以新增另一個應用來解釋相同的平均值流以進行故障檢測。透過共享主題而不是點對點佇列進行所有通訊,可以減少微服務之間的耦合。

雖然釋出/訂閱訊息傳遞的概念並不新穎,但 Spring Cloud Stream 更進一步,將其作為其應用模型的規定性選擇。透過使用原生中介軟體支援,Spring Cloud Stream 還簡化了在不同平臺中使用釋出/訂閱模型的方式。

消費者組

雖然釋出/訂閱模型使得透過共享主題連線應用變得容易,但透過建立給定應用的多個例項來實現橫向擴充套件同樣重要。在這種情況下,應用的各個例項之間處於競爭消費者關係,其中只有一個例項預期會處理給定的訊息。

Spring Cloud Stream 透過消費者組的概念來模擬這種行為。(Spring Cloud Stream 的消費者組類似於 Kafka 的消費者組,並受到其啟發。)每個消費者繫結都可以使用 spring.cloud.stream.bindings.<bindingName>.group 屬性來指定組名。對於下圖中顯示的消費者,該屬性將設定為 spring.cloud.stream.bindings.<bindingName>.group=hdfsWritespring.cloud.stream.bindings.<bindingName>.group=average

SCSt groups
圖 3. Spring Cloud Stream 消費者組

所有訂閱給定目標的組都會收到釋出資料的一個副本,但每個組中只有一名成員會收到來自該目標的給定訊息。預設情況下,當未指定組時,Spring Cloud Stream 會將應用分配給一個匿名且獨立的單成員消費者組,該組與所有其他消費者組處於釋出/訂閱關係中。

消費者型別

支援兩種型別的消費者:

  • 訊息驅動型(有時稱為非同步型)

  • 輪詢型(有時稱為同步型)

在版本 2.0 之前,僅支援非同步消費者。只要訊息可用且有執行緒可處理,訊息就會被立即傳遞。

當您希望控制訊息處理的速度時,您可能希望使用同步消費者。

永續性

與 Spring Cloud Stream 的規定性應用模型一致,消費者組訂閱是持久化的。也就是說,Binder 實現確保組訂閱是持久的,並且一旦為一個組建立了至少一個訂閱,該組就會收到訊息,即使在組中的所有應用都停止時傳送的訊息也能收到。

匿名訂閱本質上是非持久化的。對於某些 Binder 實現(例如 RabbitMQ),可以有非持久化的組訂閱。

通常,在將應用繫結到給定目標時,最好始終指定一個消費者組。橫向擴充套件 Spring Cloud Stream 應用時,必須為其每個輸入繫結指定一個消費者組。這樣做可以防止應用的例項接收重複的訊息(除非需要這種行為,但這並不常見)。

分割槽支援

Spring Cloud Stream 支援在給定應用的多個例項之間對資料進行分割槽。在分割槽場景中,物理通訊媒介(如訊息代理主題)被視為結構化為多個分割槽。一個或多個生產者應用例項將資料傳送到多個消費者應用例項,並確保具有共同特徵的資料由同一個消費者例項處理。

Spring Cloud Stream 提供了一個通用抽象,用於以統一的方式實現分割槽處理用例。因此,無論訊息代理本身是否天然支援分割槽(例如 Kafka)或不支援(例如 RabbitMQ),都可以使用分割槽功能。

SCSt partitioning
圖 4. Spring Cloud Stream 分割槽

分割槽是狀態處理中的一個關鍵概念,在狀態處理中,確保所有相關資料一起處理至關重要(無論是出於效能還是一致性原因)。例如,在時間視窗平均值計算示例中,來自任何給定感測器的所有測量資料都由同一個應用例項處理非常重要。

要設定分割槽處理場景,您必須同時配置資料生產端和資料消費端。

程式設計模型

要理解程式設計模型,您應該熟悉以下核心概念:

  • 目標 Binder: 負責提供與外部訊息系統整合的元件。

  • 繫結: 連線外部訊息系統與應用提供的訊息 生產者消費者(由目標 Binder 建立)之間的橋樑。

  • 訊息: 生產者和消費者用於與目標 Binder(以及透過外部訊息系統與其他應用)通訊的規範資料結構。

SCSt overview

目標 Binder

目標 Binder 是 Spring Cloud Stream 的擴充套件元件,負責提供必要的配置和實現,以促進與外部訊息系統的整合。這種整合負責連線、訊息到生產者和消費者的委託和路由、資料型別轉換、使用者程式碼呼叫等等。

Binder 處理了許多本應由您承擔的樣板職責。然而,為了實現這一點,Binder 仍然需要使用者提供一些幫助,以最小但必需的指令集形式,這通常以某種型別的 繫結 配置出現。

雖然本節的範圍不包括討論所有可用的 Binder 和繫結配置選項(手冊的其餘部分將廣泛介紹它們),但 繫結 作為概念確實需要特別關注。下一節將詳細討論它。

繫結

如前所述,繫結 提供了外部訊息系統(例如佇列、主題等)與應用提供的訊息 生產者消費者 之間的橋樑。

以下示例展示了一個完整配置並正常執行的 Spring Cloud Stream 應用,該應用接收訊息的有效載荷(payload)為 String 型別(參見 內容型別協商 部分),將其記錄到控制檯,並在將其轉換為大寫後傳送到下游。

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class, args);
	}

	@Bean
	public Function<String, String> uppercase() {
	    return value -> {
	        System.out.println("Received: " + value);
	        return value.toUpperCase();
	    };
	}
}

上述示例看起來與任何普通的 Spring Boot 應用沒有什麼不同。它定義了一個 Function 型別的 bean,僅此而已。那麼,它是如何成為一個 Spring Cloud Stream 應用的呢?它之所以成為 Spring Cloud Stream 應用,僅僅是因為 classpath 中存在 Spring Cloud Stream 和 Binder 的依賴及自動配置類,從而有效地將您的 Boot 應用上下文設定為一個 Spring Cloud Stream 應用。在這種上下文下,SupplierFunctionConsumer 型別的 bean 被視為實際的訊息處理器,根據特定的命名約定和規則觸發與提供的 Binder 公開的目標的繫結,從而避免額外的配置。

繫結和繫結名稱

繫結是一個抽象,它代表了 Binder 公開的源和目標與使用者程式碼之間的橋樑。這個抽象有一個名稱,雖然我們盡最大努力限制執行 Spring Cloud Stream 應用所需的配置,但在需要額外的按繫結配置的情況下,瞭解此類名稱是必要的。

在本手冊中,您將看到諸如 spring.cloud.stream.bindings.input.destination=myQueue 之類的配置屬性示例。此屬性名稱中的 input 部分就是我們所說的 繫結名稱,它可以透過多種機制派生而來。以下子節將描述 Spring Cloud Stream 用於控制繫結名稱的命名約定和配置元素。

如果您的繫結名稱包含特殊字元,例如 . 字元,您需要用方括號 ([]) 將繫結鍵括起來,然後再用引號包裹。例如:spring.cloud.stream.bindings."[my.output.binding.key]".destination
函式式繫結名稱

與 Spring Cloud Stream 早期版本中使用的基於註解的支援(遺留方式)所需的顯式命名不同,函數語言程式設計模型在繫結命名方面預設採用簡單的約定,從而極大地簡化了應用配置。讓我們看第一個例子:

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
	    return value -> value.toUpperCase();
	}
}

在前面的例子中,我們有一個應用,其中包含一個充當訊息處理器的函式。作為一個 Function,它有一個輸入和一個輸出。用於命名輸入和輸出繫結的命名約定如下:

  • 輸入 - <函式名> + -in- + <索引>

  • 輸出 - <函式名> + -out- + <索引>

inout 對應於繫結的型別(例如 輸入輸出)。index 是輸入或輸出繫結的索引。對於典型的單輸入/輸出函式,它總是 0,因此它只與 具有多個輸入和輸出引數的函式 相關。

因此,如果例如您想將此函式的輸入對映到一個名為 "my-topic" 的遠端目標(例如主題、佇列等),您將使用以下屬性進行配置:

--spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic

注意 uppercase-in-0 如何用作屬性名稱的一部分。uppercase-out-0 也一樣。

描述性繫結名稱

有時為了提高可讀性,您可能希望給您的繫結一個更具描述性的名稱(例如 'account', 'orders' 等)。另一種看待它的方式是,您可以將一個 隱式繫結名稱 對映到一個 顯式繫結名稱。您可以使用 spring.cloud.stream.function.bindings.<繫結名稱> 屬性來實現這一點。此屬性也為依賴需要顯式名稱的自定義基於介面的繫結的現有應用提供了遷移路徑。

例如,

--spring.cloud.stream.function.bindings.uppercase-in-0=input

在前面的示例中,您對映並有效地將繫結名稱 uppercase-in-0 重新命名為 input。現在所有配置屬性都可以引用 input 繫結名稱(例如,--spring.cloud.stream.bindings.input.destination=my-topic)。

雖然描述性的繫結名稱可以增強配置的可讀性,但它們透過將隱式繫結名稱對映到顯式繫結名稱,也引入了另一層誤導。並且由於所有後續配置屬性都將使用顯式繫結名稱,您必須始終引用此 'bindings' 屬性來關聯它實際對應哪個函式。我們認為在大多數情況下(函式式組合除外),這可能是過度設計,因此,我們建議完全避免使用它,特別是因為不使用它可以在繫結器目標和繫結名稱之間提供清晰的路徑,例如 spring.cloud.stream.bindings.uppercase-in-0.destination=sample-topic,您可以透過它清楚地將 uppercase 函式的輸入與 sample-topic 目標關聯起來。

有關屬性和其他配置選項的更多資訊,請參閱配置選項部分。

顯式繫結建立

在上一節中,我們解釋瞭如何根據您的應用程式提供的 FunctionSupplierConsumer bean 的名稱隱式建立繫結。然而,有時您可能需要顯式建立繫結,這些繫結不與任何函式關聯。這通常是為了透過 StreamBridge 支援與其他框架的整合。

Spring Cloud Stream 允許您透過 spring.cloud.stream.input-bindingsspring.cloud.stream.output-bindings 屬性顯式定義輸入和輸出繫結。請注意屬性名稱中的複數形式,這允許您透過簡單地使用 ; 作為分隔符來定義多個繫結。請看下面的測試用例作為示例。

@Test
public void testExplicitBindings() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
		TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class))
				.web(WebApplicationType.NONE)
				.run("--spring.jmx.enabled=false",
					"--spring.cloud.stream.input-bindings=fooin;barin",
					"--spring.cloud.stream.output-bindings=fooout;barout")) {


	. . .
	}
}

@EnableAutoConfiguration
@Configuration
public static class EmptyConfiguration {
}

正如您所見,我們聲明瞭兩個輸入繫結和兩個輸出繫結,而我們的配置中沒有定義任何函式,但我們仍然能夠成功建立這些繫結並訪問它們相應的通道。

生產和消費訊息

您只需編寫函式並將其公開為 @Bean,即可編寫 Spring Cloud Stream 應用程式。您還可以使用基於 Spring Integration 註解的配置或基於 Spring Cloud Stream 註解的配置,儘管從 spring-cloud-stream 3.x 開始,我們推薦使用函式式實現。

Spring Cloud Function 支援

概述

自 Spring Cloud Stream v2.1 起,定義流處理器的另一種替代方法是使用對Spring Cloud Function的內建支援,它們可以表示為型別為 java.util.function.[Supplier/Function/Consumer] 的 bean。

要指定將哪個函式式 bean 繫結到繫結公開的外部目標,您必須提供 spring.cloud.function.definition 屬性。

如果您只有型別為 java.util.function.[Supplier/Function/Consumer] 的單個 bean,則可以跳過 spring.cloud.function.definition 屬性,因為此類函式式 bean 將被自動發現。然而,最佳實踐是使用此屬性以避免任何混淆。有時這種自動發現可能會帶來麻煩,因為型別為 java.util.function.[Supplier/Function/Consumer] 的單個 bean 可能用於訊息處理以外的目的,但由於是單個 bean,它會被自動發現並自動繫結。對於這些罕見情況,您可以透過提供 spring.cloud.stream.function.autodetect 屬性並將其值設定為 false 來停用自動發現。

以下是應用程式將訊息處理器公開為 java.util.function.Function 的示例,透過充當資料的消費者和生產者,有效支援*直通*語義。

@SpringBootApplication
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class);
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

在前面的示例中,我們定義了一個名為 toUpperCasejava.util.function.Function 型別的 bean,它充當訊息處理器,其 'input' 和 'output' 必須繫結到提供的目標繫結器公開的外部目標。預設情況下,'input' 和 'output' 繫結名稱將是 toUpperCase-in-0toUpperCase-out-0。有關用於建立繫結名稱的命名約定的詳細資訊,請參閱函式式繫結名稱部分。

以下是支援其他語義的簡單函式式應用程式示例

以下是將*源*語義公開為 java.util.function.Supplier 的示例

@SpringBootApplication
public static class SourceFromSupplier {

	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

以下是將*接收器語義*公開為 java.util.function.Consumer 的示例

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}
提供者 (源)

就觸發方式而言,FunctionConsumer 都相當直接。它們是根據傳送到它們繫結目標的資料(事件)觸發的。換句話說,它們是經典的事件驅動元件。

然而,Supplier 在觸發方面屬於其獨有的類別。因為它根據定義是資料的來源(起點),所以它不訂閱任何入站目標,因此必須由其他機制觸發。還有一個 Supplier 實現的問題,它可以是*命令式*或*響應式*的,這與此類提供者的觸發直接相關。

考慮以下示例

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

前面的 Supplier bean 在其 get() 方法被呼叫時生成一個字串。然而,誰呼叫這個方法以及呼叫頻率如何?框架提供了一個預設的輪詢機制(回答了“誰?”的問題),該機制將觸發提供者的呼叫,預設情況下它每秒都會執行一次(回答了“頻率如何?”的問題)。換句話說,以上配置每秒生成一條訊息,並且每條訊息都發送到繫結器公開的 output 目標。要了解如何自定義輪詢機制,請參閱輪詢配置屬性部分。

考慮另一個示例

@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

前面的 Supplier bean 採用了響應式程式設計風格。通常,與命令式提供者不同,它應該只被觸發一次,因為其 get() 方法的呼叫會產生(提供)連續的訊息流而不是單個訊息。

框架識別出程式設計風格的差異,並保證此類提供者只被觸發一次。

然而,想象一下這樣的用例:您想輪詢某個資料來源並返回表示結果集的有限資料流。響應式程式設計風格是此類提供者的完美機制。然而,考慮到生成流的有限性,此類提供者仍然需要定期呼叫。

考慮以下示例,它透過生成有限資料流來模擬此類用例

@SpringBootApplication
public static class SupplierConfiguration {

	@PollableBean
	public Supplier<Flux<String>> stringSupplier() {
		return () -> Flux.just("hello", "bye");
	}
}

該 bean 本身使用 PollableBean 註解進行標註(它是 @Bean 的一個子集),從而向框架表明,儘管此類提供者的實現是響應式的,但仍需要進行輪詢。

PollableBean 中定義了一個 splittable 屬性,它向此註解的後處理器發出訊號,表明被註解元件產生的結果必須被拆分,並且預設設定為 true。這意味著框架將拆分返回的結果,並將每個項作為單獨的訊息傳送出去。如果這不是期望的行為,您可以將其設定為 false,此時此類提供者將簡單地返回生成的 Flux 而不進行拆分。
提供者與執行緒
正如您現在瞭解到的,與由事件觸發(它們有輸入資料)的 FunctionConsumer 不同,Supplier 沒有任何輸入,因此由不同的機制 - *輪詢器* - 觸發,而輪詢器可能具有不可預測的執行緒機制。雖然執行緒機制的細節在大多數情況下與函式的下游執行無關,但在某些情況下可能會出現問題,特別是在與可能對執行緒親和性有特定期望的整合框架配合使用時。例如,依賴於儲存線上程本地中的跟蹤資料的Spring Cloud Sleuth。對於這些情況,我們提供了另一種透過 StreamBridge 的機制,使用者可以對執行緒機制有更多控制。您可以在傳送任意資料到輸出(例如,外部事件驅動源)部分獲取更多詳細資訊。
消費者 (響應式)

響應式 Consumer 有點特別,因為它返回型別是 void,使得框架無法獲得用於訂閱的引用。您很可能不需要編寫 Consumer<Flux<?>>,而是將其編寫為 Function<Flux<?>, Mono<Void>>,並在流上將 then 運算子作為最後一個運算子呼叫。

例如

public Function<Flux<?>, Mono<Void>> consumer() {
	return flux -> flux.map(..).filter(..).then();
}

但是如果您確實需要編寫顯式的 Consumer<Flux<?>>,請記住訂閱輸入的 Flux。

此外,請記住,在混合響應式和命令式函式時,同樣的規則也適用於函式組合。Spring Cloud Function 確實支援響應式函式與命令式函式組合,但您必須注意某些限制。例如,假設您將響應式函式與命令式消費者組合。這種組合的結果是響應式 Consumer。然而,正如本節前面討論的,無法訂閱此類消費者,因此此限制只能透過使您的消費者成為響應式的並手動訂閱(如前所述),或將您的函式更改為命令式來解決。

輪詢配置屬性

Spring Cloud Stream 公開了以下屬性,這些屬性以 spring.integration.poller. 為字首。

fixedDelay

預設輪詢器的固定延遲,單位為毫秒。

預設值:1000L。

maxMessagesPerPoll

預設輪詢器每次輪詢事件的最大訊息數。

預設值:1L。

cron

Cron Trigger 的 Cron 表示式值。

預設值:無。

initialDelay

週期性觸發器的初始延遲。

預設值:0。

timeUnit

應用於延遲值的 TimeUnit。

預設值:MILLISECONDS。

例如,--spring.integration.poller.fixed-delay=2000 將輪詢器間隔設定為每兩秒輪詢一次。

每繫結輪詢配置

上一節展示瞭如何配置一個應用於所有繫結的單個預設輪詢器。雖然這與 Spring Cloud Stream 設計的微服務模型非常契合(其中每個微服務代表一個元件,例如 Supplier,因此預設輪詢配置就足夠了),但在某些極端情況下,您可能有多個元件需要不同的輪詢配置。

對於此類情況,請使用每繫結配置輪詢器的方式。例如,假設您有一個輸出繫結 supply-out-0。在這種情況下,您可以使用 spring.cloud.stream.bindings.supply-out-0.producer.poller.. 字首為此類繫結配置輪詢器(例如,spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000)。

傳送任意資料到輸出(例如,外部事件驅動源)

在某些情況下,資料的實際來源可能來自非繫結器的外部系統。例如,資料的來源可能是一個經典的 REST 端點。我們如何將此類源與 spring-cloud-stream 使用的函式機制橋接起來?

Spring Cloud Stream 提供了兩種機制,讓我們詳細瞭解它們

在這裡,對於這兩個示例,我們將使用一個標準的 MVC 端點方法 delegateToSupplier,它繫結到根 Web 上下文,透過 StreamBridge 機制將傳入請求委託給流。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream", body);
	}
}

這裡我們自動注入了一個 StreamBridge bean,它允許我們將資料傳送到輸出繫結,有效地將非流應用程式與 spring-cloud-stream 連線起來。請注意,前面的示例沒有定義任何源函式(例如 Supplier bean),這使得框架沒有預先建立源繫結的觸發器,而這在配置包含函式 bean 的情況下是典型的。這沒有問題,因為 StreamBridge 會在首次呼叫其 send(..) 操作時啟動為不存在的繫結建立輸出繫結(如果需要,還會自動配置目標),並對其進行快取以供後續重用(有關更多詳細資訊,請參閱StreamBridge 和動態目標)。

然而,如果您想在初始化(啟動)時預先建立輸出繫結,您可以利用 spring.cloud.stream.output-bindings 屬性,在該屬性中宣告源的名稱。提供的名稱將用作建立源繫結的觸發器。您可以使用 ; 表示多個源(多個輸出繫結)(例如,--spring.cloud.stream.output-bindings=foo;bar)。

此外,請注意 streamBridge.send(..) 方法接受一個 Object 作為資料。這意味著您可以向其傳送 POJO 或 Message,它在傳送輸出時會經歷與來自任何 Function 或 Supplier 時相同的流程,從而提供與函式相同的級別的一致性。這意味著輸出型別轉換、分割槽等都將像函式產生的輸出一樣被尊重和處理。

StreamBridge 與動態目標

StreamBridge 也可以用於輸出目標事先未知的情況,這類似於從消費者路由部分描述的用例。

讓我們看一個示例

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, args);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("myDestination", body);
	}
}

正如您所見,前面的示例與前一個示例非常相似,不同之處在於沒有透過 spring.cloud.stream.output-bindings 屬性提供顯式繫結指令(此處未提供)。這裡我們將資料傳送到名稱為 myDestination 的目標,該名稱作為繫結不存在。因此,此類名稱將被視為動態目標,如從消費者路由部分所述。

在前面的示例中,我們使用 ApplicationRunner 作為*外部源*來為流提供資料。

一個更實際的示例,其中外部源是 REST 端點。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		streamBridge.send("myBinding", body);
	}
}

正如您所見,在 delegateToSupplier 方法內部,我們使用 StreamBridge 將資料傳送到 myBinding 繫結。在這裡,您也受益於 StreamBridge 的動態特性,如果 myBinding 不存在,它將自動建立並快取,否則將使用現有繫結。

快取動態目標(繫結)可能導致記憶體洩漏,如果存在許多動態目標。為了進行一定程度的控制,我們為輸出繫結提供了一個自淘汰快取機制,預設快取大小為 10。這意味著如果您的動態目標數量超過此數值,則有可能現有繫結會被淘汰,從而需要重新建立,這可能會導致輕微的效能下降。您可以透過 spring.cloud.stream.dynamic-destination-cache-size 屬性將快取大小設定為所需的值來增加快取大小。
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" https://:8080/

透過展示這兩個示例,我們想強調這種方法適用於任何型別的外部源。

如果您使用的是 Solace PubSub+ 繫結器,Spring Cloud Stream 保留了 scst_targetDestination 頭部(可透過 BinderHeaders.TARGET_DESTINATION 獲取),該頭部允許將訊息從其繫結配置的目標重定向到此頭部指定的目標。這使得繫結器能夠管理釋出到動態目標所需的資源,從而減輕了框架的負擔,並避免了之前註釋中提到的快取問題。更多資訊請見此處
使用 StreamBridge 的輸出內容型別

如果需要,您還可以使用以下方法簽名 public boolean send(String bindingName, Object data, MimeType outputContentType) 提供特定的內容型別。或者,如果您將資料作為 Message 傳送,其內容型別將得到尊重。

使用 StreamBridge 指定繫結器型別

Spring Cloud Stream 支援多種繫結器場景。例如,您可能從 Kafka 接收資料並將其傳送到 RabbitMQ。

有關多種繫結器場景的更多資訊,請參閱繫結器部分,特別是Classpath 上的多種繫結器

如果您計劃使用 StreamBridge 並在應用程式中配置了多個繫結器,則還必須告訴 StreamBridge 使用哪個繫結器。為此,send 方法還有兩個變體

public boolean send(String bindingName, @Nullable String binderType, Object data)

public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)

正如您所見,您可以提供一個附加引數 - binderType,用於告訴 BindingService 在建立動態繫結時使用哪個繫結器。

對於使用 spring.cloud.stream.output-bindings 屬性的情況,或者繫結已在不同的繫結器下建立的情況,binderType 引數將無效。
使用 StreamBridge 的通道攔截器

由於 StreamBridge 使用 MessageChannel 建立輸出繫結,因此您可以在透過 StreamBridge 傳送資料時啟用通道攔截器。由應用程式決定哪些通道攔截器應用於 StreamBridge。Spring Cloud Stream 不會將所有檢測到的通道攔截器注入到 StreamBridge 中,除非它們使用 @GlobalChannelInterceptor(patterns = "*") 進行標註。

假設您的應用程式中有以下兩個不同的 StreamBridge 繫結。

streamBridge.send("foo-out-0", message);

以及

streamBridge.send("bar-out-0", message);

現在,如果您想讓通道攔截器應用於這兩個 StreamBridge 繫結,那麼您可以宣告以下 GlobalChannelInterceptor bean。

@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

但是,如果您不喜歡上面的全域性方法,並且想為每個繫結設定一個專用的攔截器,那麼您可以這樣做。

@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

以及

@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

您可以靈活地使模式更嚴格或根據您的業務需求進行定製。

透過這種方法,應用程式可以決定將哪些攔截器注入到 StreamBridge 中,而不是應用所有可用的攔截器。

StreamBridge 透過 StreamOperations 介面提供了一個契約,該介面包含了 StreamBridge 的所有 send 方法。因此,應用程式可以選擇使用 StreamOperations 進行自動注入。這對於透過為 StreamOperations 介面提供模擬或類似機制來單元測試使用 StreamBridge 的程式碼非常方便。
響應式函式支援

由於 *Spring Cloud Function* 構建在 Project Reactor 之上,因此在實現 SupplierFunctionConsumer 時,您無需做太多即可受益於響應式程式設計模型。

例如

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
		return flux -> flux.map(val -> val.toUpperCase());
	}
}

在選擇響應式或指令式程式設計模型時,必須理解一些重要事項。

完全響應式還是僅 API?

使用響應式 API 不一定意味著您可以從該 API 的所有響應式特性中受益。換句話說,背壓等高階特性只有在與相容系統(例如 Reactive Kafka 繫結器)一起使用時才能發揮作用。如果您使用的是常規 Kafka、Rabbit 或任何其他非響應式繫結器,您只能從響應式 API 本身的便利性中受益,而不能從其高階特性中受益,因為流的實際源或目標不是響應式的。

錯誤處理和重試

在本手冊中,您將看到一些關於基於框架的錯誤處理、重試及其他特性以及與其相關的配置屬性的引用。重要的是要理解,它們僅影響命令式函式,對於響應式函式,您不應抱有同樣的期望。原因如下:響應式函式和命令式函式之間存在根本區別。命令式函式是一個由框架在接收到每條訊息時呼叫的*訊息處理器*。因此,對於 N 條訊息,此類函式將被呼叫 N 次,正因為如此,我們可以包裝此類函式並新增額外的功能,如錯誤處理、重試等。響應式函式是*初始化函式*。它只被呼叫一次,以獲取使用者提供的 Flux/Mono 的引用,該引用將與框架提供的引用連線起來。之後,我們(框架)對流完全沒有可見性或控制權。因此,對於響應式函式,在錯誤處理和重試方面(即 doOnError(), .onError*() 等),您必須依賴於響應式 API 的豐富性。

函式式組合

使用函數語言程式設計模型,您還可以受益於函式式組合,透過它您可以從一組簡單函式動態組合複雜的處理器。例如,讓我們將以下函式 bean 新增到上面定義的應用程式中

@Bean
public Function<String, String> wrapInQuotes() {
	return s -> "\"" + s + "\"";
}

並修改 spring.cloud.function.definition 屬性以反映您從 ‘toUpperCase’ 和 ‘wrapInQuotes’ 組合一個新函式的意圖。為此,Spring Cloud Function 依賴於 |(管道)符號。因此,為了完成我們的示例,我們的屬性現在看起來像這樣

--spring.cloud.function.definition=toUpperCase|wrapInQuotes
Spring Cloud Function 提供的函式式組合支援的一個巨大優勢是,您可以組合*響應式*和*命令式*函式。

組合的結果是一個單一函式,正如您可能猜到的那樣,它可能有一個非常長且相當難以理解的名稱(例如,foo|bar|baz|xyz. . .),這在涉及其他配置屬性時會帶來很大的不便。這就是函式式繫結名稱部分描述的*描述性繫結名稱*特性可以提供幫助的地方。

例如,如果我們想給我們的 toUpperCase|wrapInQuotes 一個更具描述性的名稱,我們可以使用以下屬性來實現:spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput,從而允許其他配置屬性引用該繫結名稱(例如,spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination)。

函式式組合與橫切關注點

函式組合有效地允許您透過將其分解為一組簡單且可單獨管理/測試的元件來解決複雜性,這些元件在執行時仍然可以表示為一個整體。但這並不是唯一的好處。

您還可以使用組合來處理某些橫切的非功能性關注點,例如內容豐富。例如,假設您收到一條傳入訊息,該訊息可能缺少某些頭部,或者某些頭部不處於您的業務函式所期望的精確狀態。現在,您可以實現一個單獨的函式來解決這些問題,然後將其與主要的業務函式組合起來。

讓我們看一個示例

@SpringBootApplication
public class DemoStreamApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoStreamApplication.class,
				"--spring.cloud.function.definition=enrich|echo",
				"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
				"--spring.cloud.stream.bindings.input.destination=myDestination",
				"--spring.cloud.stream.bindings.input.group=myGroup");

	}

	@Bean
	public Function<Message<String>, Message<String>> enrich() {
		return message -> {
			Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
			return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
		};
	}

	@Bean
	public Function<Message<String>, Message<String>> echo() {
		return message -> {
			Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
			System.out.println("Incoming message " + message);
			return message;
		};
	}
}

雖然簡單,但此示例展示了一個函式如何用附加頭部(非功能性關注點)豐富傳入的訊息,以便另一個函式 - echo - 可以從中受益。echo 函式保持簡潔,只專注於業務邏輯。您還可以看到使用 spring.cloud.stream.function.bindings 屬性來簡化組合繫結名稱。

具有多個輸入和輸出引數的函式

從 3.0 版本開始,Spring Cloud Stream 支援具有多個輸入和/或多個輸出(返回值)的函式。這到底意味著什麼,以及它針對哪類用例?

  • 大資料:想象一下您正在處理的資料來源高度無組織,包含各種型別的資料元素(例如訂單、交易等),您需要有效地對其進行整理。

  • 資料聚合:另一個用例可能需要您合併來自 2 個或更多傳入_流的資料元素。.

上面描述的只是少數幾個用例,您可能需要在這些用例中使用單個函式來接收和/或產生多個數據*流*。這就是我們在這裡針對的用例型別。

此外,請注意此處對*流*概念略有不同的強調。假設此類函式只有在能夠訪問實際的資料流(而非單個元素)時才具有價值。因此,為此我們依賴於Project Reactor提供的抽象(即 FluxMono),這些抽象作為 spring-cloud-functions 引入的依賴項的一部分已在 classpath 中可用。

另一個重要方面是多個輸入和輸出的表示。雖然 Java 提供了各種不同的抽象來表示*多個事物*,但這些抽象是 *a) 無界的*、*b) 缺乏元數*,並且 *c) 缺乏型別資訊*,這些在此上下文中都很重要。例如,讓我們看看 Collection 或陣列,它們只允許我們描述單個型別的*多個*或將所有內容向上轉換為 Object,這會影響 Spring Cloud Stream 的透明型別轉換特性等等。

因此,為了滿足所有這些要求,初始支援依賴於使用 Project Reactor 提供的另一種抽象 - Tuples 的簽名。然而,我們正在努力允許更靈活的簽名。

請參閱繫結與繫結名稱部分,瞭解此類應用程式用於建立*繫結名稱*的命名約定。

讓我們看幾個示例

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
		return tuple -> {
			Flux<String> stringStream = tuple.getT1();
			Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
			return Flux.merge(stringStream, intStream);
		};
	}
}

上面的示例演示了一個函式,它接受兩個輸入(第一個型別為 String,第二個型別為 Integer)併產生一個型別為 String 的輸出。

因此,對於上面的示例,兩個輸入繫結將是 gather-in-0gather-in-1,為了保持一致性,輸出繫結也遵循相同的約定,名稱為 gather-out-0

瞭解這一點將允許您設定繫結特定的屬性。例如,以下配置將覆蓋 gather-in-0 繫結的內容型別

--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {

	@Bean
	public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
		return flux -> {
			Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
			UnicastProcessor even = UnicastProcessor.create();
			UnicastProcessor odd = UnicastProcessor.create();
			Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
			Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));

			return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
		};
	}
}

上面的示例與前一個示例有些相反,它演示了一個函式,該函式接受一個型別為 Integer 的輸入併產生兩個輸出(都型別為 String)。

因此,對於上面的示例,輸入繫結是 scatter-in-0,輸出繫結是 scatter-out-0scatter-out-1

您可以使用以下程式碼進行測試

@Test
public void testSingleInputMultiOutput() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleApplication.class))
							.run("--spring.cloud.function.definition=scatter")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		for (int i = 0; i < 10; i++) {
			inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
		}

		int counter = 0;
		for (int i = 0; i < 5; i++) {
			Message<byte[]> even = outputDestination.receive(0, 0);
			assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
			Message<byte[]> odd = outputDestination.receive(0, 1);
			assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
		}
	}
}
單個應用程式中的多個函式

也可能需要在單個應用程式中將多個訊息處理器進行分組。您可以透過定義多個函式來實現這一點。

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

在上面的示例中,我們有一個配置,它定義了兩個函式 uppercasereverse。所以首先,如前所述,我們需要注意到存在衝突(函式多於一個),因此需要透過提供指向我們希望繫結的實際函式的 spring.cloud.function.definition 屬性來解決它。不同之處在於,在這裡我們將使用 ; 分隔符來指向這兩個函式(參見下面的測試用例)。

與具有多個輸入/輸出的函式一樣,請參閱繫結與繫結名稱部分,瞭解此類應用程式用於建立*繫結名稱*的命名約定。

您可以使用以下程式碼進行測試

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					ReactiveFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-1");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}
批次消費者

使用支援批處理監聽器的 MessageChannelBinder 時,如果消費者繫結啟用了該特性,可以將 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode 設定為 true,以將整批訊息以 List 的形式傳遞給函式。

@Bean
public Function<List<Person>, Person> findFirstPerson() {
    return persons -> persons.get(0);
}
批處理生產者

您也可以在生產者端使用批處理的概念,透過返回一個訊息集合來實現。這實際上提供了一種相反的效果,即集合中的每條訊息都將由繫結器單獨傳送。

考慮以下函式

@Bean
public Function<String, List<Message<String>>> batch() {
	return p -> {
		List<Message<String>> list = new ArrayList<>();
		list.add(MessageBuilder.withPayload(p + ":1").build());
		list.add(MessageBuilder.withPayload(p + ":2").build());
		list.add(MessageBuilder.withPayload(p + ":3").build());
		list.add(MessageBuilder.withPayload(p + ":4").build());
		return list;
	};
}

返回列表中的每條訊息都將單獨傳送,最終傳送四條訊息到輸出目標。

將 Spring Integration 流作為函式

當您實現一個函式時,您可能有一些複雜的、符合企業整合模式 (EIP) 類別要求。使用 Spring Integration (SI) 這樣的框架最適合處理這些需求,SI 是 EIP 的參考實現。

值得慶幸的是,SI 已經提供了透過整合流作為閘道器來將整合流暴露為函式的功能。考慮以下示例

@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {

	public static void main(String[] args) {
		SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
	}

	@Bean
	public IntegrationFlow uppercaseFlow() {
		return IntegrationFlows.from(MessageFunction.class, "uppercase")
				.<String, String>transform(String::toUpperCase)
				.logAndReply(LoggingHandler.Level.WARN);
	}

	public interface MessageFunction extends Function<Message<String>, Message<String>> {

	}
}

對於熟悉 SI 的使用者,您可以看到我們定義了一個型別為 IntegrationFlow 的 bean,在此我們聲明瞭一個希望將其暴露為名為 uppercaseFunction<String, String>(使用 SI DSL)的整合流。MessageFunction 介面允許我們明確宣告輸入和輸出的型別以便進行正確的型別轉換。有關型別轉換的更多資訊,請參閱內容型別協商部分。

要接收原始輸入,您可以使用 from(Function.class, …​)

生成的函式繫結到目標繫結器所暴露的輸入和輸出目標。

請參閱繫結與繫結名稱部分,瞭解此類應用程式用於建立*繫結名稱*的命名約定。

有關 Spring Integration 和 Spring Cloud Stream 之間互操作性的更多詳細資訊,特別是圍繞函數語言程式設計模型,您可能會發現這篇博文非常有趣,因為它深入探討了透過融合 Spring Integration 和 Spring Cloud Stream/Functions 的最佳特性可以應用的各種模式。

使用輪詢消費者

概述

使用輪詢消費者時,您可以按需輪詢 PollableMessageSource。要定義輪詢消費者的繫結,您需要提供 spring.cloud.stream.pollable-source 屬性。

考慮以下輪詢消費者繫結的示例

--spring.cloud.stream.pollable-source=myDestination

前例中的輪詢源名稱 myDestination 將生成 myDestination-in-0 作為繫結名稱,以與函數語言程式設計模型保持一致。

鑑於前例中的輪詢消費者,您可以按如下方式使用它

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

一種不那麼手動化、更像 Spring 的替代方案是配置一個計劃任務 bean。例如,

@Scheduled(fixedDelay = 5_000)
public void poll() {
	System.out.println("Polling...");
	this.source.poll(m -> {
		System.out.println(m.getPayload());

	}, new ParameterizedTypeReference<Foo>() { });
}

PollableMessageSource.poll() 方法接受一個 MessageHandler 引數(通常是一個 lambda 表示式,如這裡所示)。如果訊息已被接收併成功處理,它將返回 true

與訊息驅動的消費者一樣,如果 MessageHandler 丟擲異常,訊息將被髮布到錯誤通道,如 錯誤處理 中所述。

通常,當 MessageHandler 退出時,poll() 方法會確認訊息。如果方法異常退出,訊息將被拒絕(不會重新入隊),但請參閱處理錯誤。您可以透過自己負責確認來覆蓋該行為,如下例所示

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}
您必須在某個時候 ack(或 nack)訊息,以避免資源洩漏。
一些訊息系統(如 Apache Kafka)在日誌中維護一個簡單的偏移量。如果投遞失敗並使用 StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE); 重新入隊,任何稍後成功確認的訊息都會被重新投遞。

還有一個過載的 poll 方法,其定義如下

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type 是一個轉換提示,允許對傳入訊息的有效載荷進行轉換,如下例所示

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});
錯誤處理

預設情況下,為輪詢源配置了一個錯誤通道;如果回撥丟擲異常,一個 ErrorMessage 將被髮送到錯誤通道 (`<destination>.<group>.errors`);這個錯誤通道也被橋接到全域性的 Spring Integration errorChannel

您可以使用 `@ServiceActivator` 訂閱任何一個錯誤通道來處理錯誤;如果沒有訂閱,錯誤將簡單地被記錄日誌,並且訊息將被確認為成功。如果錯誤通道服務啟用器丟擲異常,訊息將被拒絕(預設情況下)並且不會被重新投遞。如果服務啟用器丟擲 RequeueCurrentMessageException,訊息將在訊息代理處重新入隊,並在後續輪詢時再次檢索。

如果監聽器直接丟擲 RequeueCurrentMessageException,訊息將按上述方式重新入隊,並且不會發送到錯誤通道。

事件路由

事件路由在 Spring Cloud Stream 的上下文中是指 a) 將事件路由到特定的事件訂閱者,或 b) 將事件訂閱者產生的事件路由到特定的目標。這裡我們將其稱為“路由到”和“路由自”。

路由到消費者

路由可以透過依賴 Spring Cloud Function 3.0 中提供的 RoutingFunction 來實現。您只需透過 --spring.cloud.stream.function.routing.enabled=true 應用屬性啟用它,或提供 spring.cloud.function.routing-expression 屬性。啟用後,RoutingFunction 將繫結到接收所有訊息的輸入目標,並根據提供的指令將它們路由到其他函式。

為了繫結的目的,路由目標的名稱是 functionRouter-in-0(請參閱 RoutingFunction.FUNCTION_NAME 和繫結命名約定 函式式繫結名稱)。

指令可以透過單個訊息或應用屬性提供。

以下是幾個示例

使用訊息頭
@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class,
                       "--spring.cloud.stream.function.routing.enabled=true");
	}

	@Bean
	public Consumer<String> even() {
		return value -> {
			System.out.println("EVEN: " + value);
		};
	}

	@Bean
	public Consumer<String> odd() {
		return value -> {
			System.out.println("ODD: " + value);
		};
    }
}

透過將訊息傳送到繫結器(即 rabbit, kafka)暴露的 functionRouter-in-0 目標,該訊息將被路由到相應的(“even”或“odd”)消費者。

預設情況下,RoutingFunction 將查詢 spring.cloud.function.definitionspring.cloud.function.routing-expression(對於更動態的 SpEL 場景)頭。如果找到,其值將被視為路由指令。

例如,將 spring.cloud.function.routing-expression 頭的值設定為 T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd' 最終會將請求半隨機地路由到 oddeven 函式。此外,對於 SpEL,評估上下文的根物件是 Message,因此您也可以對單個頭(或訊息)進行評估,例如 …​.routing-expression=headers['type']

使用應用屬性

spring.cloud.function.routing-expression 和/或 spring.cloud.function.definition 可以作為應用屬性傳遞(例如,spring.cloud.function.routing-expression=headers['type'])。

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}
透過應用屬性傳遞指令對於響應式函式尤其重要,因為響應式函式只調用一次以傳遞 Publisher,因此對單個項的訪問是受限的。
路由函式和輸出繫結

RoutingFunction 是一個 Function,因此與其他任何函式沒有區別。嗯……差不多。

RoutingFunction 路由到另一個 Function 時,其輸出會發送到 RoutingFunction 的輸出繫結,該繫結按預期是 functionRouter-in-0。但是如果 RoutingFunction 路由到一個 Consumer 呢?換句話說,呼叫 RoutingFunction 的結果可能不會產生任何需要傳送到輸出繫結的內容,因此甚至有輸出繫結也不必要。因此,我們在建立繫結時對 RoutingFunction 進行了一些不同的處理。儘管這對使用者是透明的(您實際上無需做任何事情),但瞭解一些機制將有助於您理解其內部工作原理。

因此,規則是:我們從不為 RoutingFunction 建立輸出繫結,只建立輸入繫結。所以當您路由到 Consumer 時,RoutingFunction 透過沒有任何輸出繫結,實際上成為了一個 Consumer。然而,如果 RoutingFunction 恰好路由到另一個會產生輸出的 Function,那麼 RoutingFunction 的輸出繫結將動態建立,此時 RoutingFunction 將在繫結方面表現得像一個常規的 Function(同時擁有輸入和輸出繫結)。

路由自消費者

除了靜態目標外,Spring Cloud Stream 還允許應用程式將訊息傳送到動態繫結的目標。這非常有用,例如當目標需要在執行時確定時。應用程式可以透過以下兩種方式之一實現。

spring.cloud.stream.sendto.destination

您還可以委託框架透過指定 spring.cloud.stream.sendto.destination 頭來動態解析輸出目標,該頭的值設定為要解析的目標名稱。

考慮以下示例

@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {

    @Bean
	public Function<String, Message<String>> destinationAsPayload() {
		return value -> {
			return MessageBuilder.withPayload(value)
				.setHeader("spring.cloud.stream.sendto.destination", value).build();};
	}
}

儘管微不足道,但在此示例中您可以清楚地看到,我們的輸出是一個訊息,其中 spring.cloud.stream.sendto.destination 頭被設定為輸入引數的值。框架將查閱此頭,並嘗試建立或發現具有該名稱的目標,然後將輸出傳送到該目標。

如果目標名稱是預先知道的,您可以像配置其他任何目標一樣配置生產者屬性。或者,如果您註冊了一個 NewDestinationBindingCallback<> bean,它會在繫結建立之前被呼叫。該回調函式接收繫結器使用的擴充套件生產者屬性的泛型型別。它有一個方法

void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例展示瞭如何使用 RabbitMQ 繫結器

@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}
如果您需要支援具有多種繫結器型別的動態目標,請使用 Object 作為泛型型別,並根據需要強制轉換 extended 引數。

此外,請參閱[使用 StreamBridge] 部分,瞭解如何利用另一種選項 (StreamBridge) 來處理類似情況。

後處理(傳送訊息後)

函式被呼叫後,其結果由框架傳送到目標目的地,這實際上完成了函式呼叫週期。

然而,從業務角度來看,只有在此週期完成後執行一些附加任務,該週期才算完全完成。雖然這可以透過此 Stack Overflow 文章中描述的 ConsumerStreamBridge 的簡單組合來實現,但自 4.0.3 版本以來,框架透過 Spring Cloud Function 專案提供的 PostProcessingFunction 提供了一種更規範的方法來解決此問題。PostProcessingFunction 是一個特殊的半標記函式,它包含一個額外的 postProcess(Message>) 方法,旨在為實現此類後處理任務提供一個位置。

package org.springframework.cloud.function.context
. . .
public interface PostProcessingFunction<I, O> extends Function<I, O> {
	default void postProcess(Message<O> result) {
	}
}

因此,您現在有兩個選擇。

選項 1:您可以將您的函式實現為 PostProcessingFunction,並透過實現其 postProcess(Message>) 方法來包含額外的後處理行為。

private static class Uppercase implements PostProcessingFunction<String, String> {

	@Override
	public String apply(String input) {
		return input.toUpperCase();
	}

	@Override
	public void postProcess(Message<String> result) {
		System.out.println("Function Uppercase has been successfully invoked and its result successfully sent to target destination");
	}
}
. . .
@Bean
public Function<String, String> uppercase() {
	return new Uppercase();
}

選項 2:如果您已經有一個現有函式,並且不想更改其實現或想將您的函式保留為 POJO,您可以只實現 postProcess(Message>) 方法,並將這個新的後處理函式與您的其他函式進行組合。

private static class Logger implements PostProcessingFunction<?, String> {

	@Override
	public void postProcess(Message<String> result) {
		System.out.println("Function has been successfully invoked and its result successfully sent to target destination");
	}
}
. . .
@Bean
public Function<String, String> uppercase() {
	return v -> v.toUpperCase();
}
@Bean
public Function<String, String> logger() {
	return new Logger();
}
. . .
//  and then have your function definition as such `uppercase|logger`

注意:在函式組合的情況下,只有最後一個 PostProcessingFunction 例項(如果存在)會生效。例如,假設您有以下函式定義 - foo|bar|baz,並且 foobaz 都是 PostProcessingFunction 的例項。只有 baz.postProcess(Message>) 會被呼叫。如果 baz 不是 PostProcessingFunction 的例項,則不會執行任何後處理功能。

有人可能會爭辯說,您可以透過函式組合輕鬆實現這一點,只需將後處理器作為另一個 Function 進行組合即可。這確實是一種可能性,但在這種情況下,後處理功能將在前一個函式呼叫之後立即被呼叫,並在訊息傳送到目標目的地之前,而這發生在函式呼叫週期完成之前。

錯誤處理

在本節中,我們將解釋框架提供的錯誤處理機制背後的總體思路。我們將使用 Rabbit 繫結器作為示例,因為不同的繫結器為底層訊息代理的特定能力(例如 Kafka 繫結器)支援的某些機制定義了不同的屬性集。

錯誤總是會發生,Spring Cloud Stream 提供了幾種靈活的機制來處理它們。請注意,這些技術取決於繫結器的實現、底層訊息中介軟體的能力以及程式設計模型(稍後會詳細介紹)。

每當訊息處理程式(函式)丟擲異常時,異常會傳播回繫結器,此時繫結器將使用 Spring Retry 庫提供的 RetryTemplate 多次嘗試重試同一條訊息(預設重試 3 次)。如果重試失敗,則取決於錯誤處理機制來決定是丟棄訊息、將訊息重新入隊以便重新處理,還是將失敗的訊息傳送到 DLQ

Rabbit 和 Kafka 都支援這些概念(尤其是 DLQ)。但是,其他繫結器可能不支援,因此請參閱您所使用的繫結器的文件,以瞭解支援的錯誤處理選項的詳細資訊。

然而,請記住,響應式函式不屬於訊息處理程式,因為它不處理單個訊息,而是提供了一種將框架提供的流(即 Flux)與使用者提供的流連線起來的方式。這為什麼很重要?這是因為您在本節後面關於 Retry Template、丟棄失敗訊息、重試、DLQ 以及有助於實現所有這些的配置屬性的任何內容,適用於訊息處理程式(即命令式函式)。

響應式 API 提供了非常豐富的自身運算子和機制庫,可以幫助您處理各種響應式用例特有的錯誤,這些用例比簡單的訊息處理程式用例複雜得多。因此,請使用它們,例如您可以在 reactor.core.publisher.Flux 中找到的 public final Flux<T> retryWhen(Retry retrySpec);

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
	return flux -> flux
			.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
			.map(v -> v.toUpperCase());
}

丟棄失敗的訊息

預設情況下,系統提供了錯誤處理程式。第一個錯誤處理程式將簡單地記錄錯誤訊息。第二個錯誤處理程式是特定於繫結器的錯誤處理程式,負責在特定訊息系統(例如,傳送到 DLQ)的上下文中處理錯誤訊息。但由於未提供額外的錯誤處理配置(在此當前場景中),此處理程式將不會執行任何操作。因此,本質上在記錄日誌後,訊息將被丟棄。

儘管在某些情況下可以接受,但在大多數情況下,這是不可接受的,我們需要某種恢復機制來避免訊息丟失。

處理錯誤訊息

在上一節中,我們提到預設情況下,導致錯誤的訊息實際上會被記錄日誌並丟棄。框架還提供了一種機制,允許您提供自定義錯誤處理程式(例如,傳送通知或寫入資料庫等)。您可以透過新增一個專門設計用於接受 ErrorMessageConsumer 來實現,該 ErrorMessage 除了包含有關錯誤的所有資訊(例如堆疊跟蹤等)之外,還包含原始訊息(觸發錯誤的那條訊息)。注意:自定義錯誤處理程式與框架提供的錯誤處理程式(即日誌記錄和繫結器錯誤處理程式 - 請參閱上一節)是互斥的,以確保它們不會相互干擾。

@Bean
public Consumer<ErrorMessage> myErrorHandler() {
	return v -> {
		// send SMS notification code
	};
}

要將此類消費者標識為錯誤處理程式,您只需提供指向函式名稱的 error-handler-definition 屬性 - spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler

例如,對於繫結名稱 uppercase-in-0,屬性將如下所示

spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler

如果您使用特殊的對映指令將繫結對映到更易讀的名稱 - spring.cloud.stream.function.bindings.uppercase-in-0=upper,那麼該屬性將如下所示

spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
如果您不小心將此類處理程式宣告為 Function,它仍然會工作,但其輸出將不會做任何處理。然而,考慮到此類處理程式仍然依賴於 Spring Cloud Function 提供的功能,如果您的處理程式有一些您希望透過函式組合來解決的複雜性(儘管不太可能),您也可以從中受益於函式組合。

預設錯誤處理程式

如果您想為所有函式 bean 設定單個錯誤處理程式,可以使用標準的 spring-cloud-stream 機制來定義預設屬性 spring.cloud.stream.default.error-handler-definition=myErrorHandler

DLQ - 死信佇列

DLQ 可能是最常用的機制,它允許將失敗的訊息傳送到特殊目標:死信佇列。

配置後,失敗的訊息將被髮送到此目標,以便後續重新處理或審計和對賬。

考慮以下示例

@SpringBootApplication
public class SimpleStreamApplication {

	public static void main(String[] args) throws Exception {
		SpringApplication.run(SimpleStreamApplication.class,
		  "--spring.cloud.function.definition=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
		  "--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
		);
	}

	@Bean
	public Function<Person, Person> uppercase() {
		return personIn -> {
		   throw new RuntimeException("intentional");
	      });
		};
	}
}

提醒一下,在此示例中,屬性的 uppercase-in-0 部分對應於輸入目標繫結的名稱。consumer 部分表示這是一個消費者屬性。

使用 DLQ 時,至少必須提供 group 屬性,以便正確命名 DLQ 目標。然而,`group` 通常與 `destination` 屬性一起使用,如我們的示例所示。

除了一些標準屬性外,我們還設定了 auto-bind-dlq,指示繫結器為 uppercase-in-0 繫結建立並配置 DLQ 目標,該繫結對應於 uppercase 目標(請參閱相應屬性),這會導致一個名為 uppercase.myGroup.dlq 的額外 Rabbit 佇列(有關 Kafka 特定的 DLQ 屬性,請參閱 Kafka 文件)。

配置完成後,所有失敗的訊息都將被路由到此目標,並保留原始訊息以供進一步處理。

您可以看到錯誤訊息包含與原始錯誤相關的更多資訊,如下所示

. . . .
x-exception-stacktrace:	org.springframework.messaging.MessageHandlingException: nested exception is
      org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
      headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
      deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
      amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
      at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
      at. . . . .
Payload: blah

您還可以透過將 max-attempts 設定為 '1' 來促進立即分派到 DLQ(無需重試)。例如,

--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1

重試模板

在本節中,我們介紹與配置重試能力相關的配置屬性。

RetryTemplateSpring Retry 庫的一部分。雖然本文件的範圍不包括 RetryTemplate 的所有功能,但我們將提及以下與 RetryTemplate 特別相關的消費者屬性

maxAttempts

處理訊息的嘗試次數。

預設值:3。

backOffInitialInterval

重試時的退避初始間隔。

預設值:1000 毫秒。

backOffMaxInterval

最大退避間隔。

預設值:10000 毫秒。

backOffMultiplier

退避乘數。

預設值:2.0。

defaultRetryable

監聽器丟擲的未列在 retryableExceptions 中的異常是否可重試。

預設值:true

retryableExceptions

一個以 Throwable 類名為鍵、布林值為值的對映。指定哪些異常(及其子類)將或不會被重試。另請參閱 defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

預設值:空。

雖然前面的設定對於大多數定製需求來說已經足夠,但它們可能無法滿足某些複雜的要求,此時您可能希望提供自己的 RetryTemplate 例項。為此,請在您的應用程式配置中將其配置為一個 bean。應用程式提供的例項將覆蓋框架提供的例項。此外,為避免衝突,您必須將您希望由繫結器使用的 RetryTemplate 例項限定為 `@StreamRetryTemplate`。例如,

@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
    return new RetryTemplate();
}

正如您從上述示例中看到的,您不需要使用 `@Bean` 註解它,因為 `@StreamRetryTemplate` 是一個限定的 `@Bean`。

如果您需要對您的 RetryTemplate 更精確地控制,可以在您的 ConsumerProperties 中透過名稱指定 bean,從而為每個繫結關聯特定的重試 bean。

spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>

繫結器

Spring Cloud Stream 提供了一個 Binder 抽象層,用於連線外部中介軟體上的物理目標。本節提供了關於 Binder SPI 的主要概念、其主要元件以及實現特定細節的資訊。

生產者和消費者

下圖顯示了生產者和消費者的一般關係

producers consumers
圖 5. 生產者和消費者

生產者是任何向繫結目標傳送訊息的元件。繫結目標可以透過針對該訊息代理的 Binder 實現繫結到外部訊息代理。呼叫 bindProducer() 方法時,第一個引數是訊息代理內部目標的名稱,第二個引數是生產者傳送訊息的本地目標例項,第三個引數包含用於為該繫結目標建立的介面卡的屬性(例如分割槽鍵表示式)。

消費者是任何從繫結目標接收訊息的元件。與生產者一樣,消費者可以繫結到外部訊息代理。呼叫 bindConsumer() 方法時,第一個引數是目標名稱,第二個引數提供了一組邏輯消費者的名稱。對於給定目標,由消費者繫結表示的每個組都會收到生產者傳送到該目標的每條訊息的副本(即,遵循正常的釋出-訂閱語義)。如果存在多個具有相同組名繫結的消費者例項,則訊息會在這些消費者例項之間進行負載均衡,以便生產者傳送的每條訊息僅由每個組內的一個消費者例項消費(即,遵循正常的佇列語義)。

Binder SPI

Binder SPI 由多個介面、開箱即用的實用程式類和發現策略組成,它們提供了一種可插拔的機制來連線外部中介軟體。

SPI 的關鍵在於 Binder 介面,它是連線輸入和輸出到外部中介軟體的策略。以下清單顯示了 Binder 介面的定義

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String bindingName, String group, T inboundBindTarget, C consumerProperties);

    Binding<T> bindProducer(String bindingName, T outboundBindTarget, P producerProperties);
}

該介面是引數化的,提供了許多擴充套件點

  • 輸入和輸出繫結目標。

  • 擴充套件的消費者和生產者屬性,允許特定的 Binder 實現新增可以以型別安全方式支援的補充屬性。

典型的繫結器實現包括以下內容

  • 一個實現 Binder 介面的類;

  • 一個 Spring `@Configuration` 類,它會建立 Binder 型別的 bean 以及中介軟體連線基礎架構。

  • 在 classpath 上找到一個 META-INF/spring.binders 檔案,其中包含一個或多個繫結器定義,如下例所示

    kafka:\
    org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
如前所述,Binder 抽象也是框架的擴充套件點之一。因此,如果您在前面的列表中找不到合適的繫結器,您可以在 Spring Cloud Stream 之上實現自己的繫結器。在如何從零開始建立 Spring Cloud Stream Binder 一文中,社群成員詳細記錄了實現自定義繫結器所需的一系列步驟,並提供了示例。這些步驟也在 實現自定義繫結器 部分中突出顯示。

繫結器檢測

Spring Cloud Stream 依靠 Binder SPI 的實現來執行連線(繫結)使用者程式碼到訊息代理的任務。每個 Binder 實現通常連線到一種型別的訊息系統。

Classpath 檢測

預設情況下,Spring Cloud Stream 依賴於 Spring Boot 的自動配置來配置繫結過程。如果在 classpath 上找到一個 Binder 實現,Spring Cloud Stream 會自動使用它。例如,一個只打算繫結到 RabbitMQ 的 Spring Cloud Stream 專案可以新增以下依賴項

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

有關其他繫結器依賴項的具體 Maven 座標,請參閱該繫結器實現的文件。

Classpath 上的多個繫結器

當 classpath 上存在多個繫結器時,應用程式必須指示每個目標繫結使用哪個繫結器。每個繫結器配置都包含一個 META-INF/spring.binders 檔案,這是一個簡單的屬性檔案,如下例所示

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

其他提供的繫結器實現(如 Kafka)也存在類似的檔案,自定義繫結器實現也應該提供它們。鍵表示繫結器實現的標識名稱,而值是逗號分隔的配置類列表,每個配置類僅包含一個型別為 org.springframework.cloud.stream.binder.Binder 的 bean 定義。

繫結器選擇可以全域性執行,使用 spring.cloud.stream.defaultBinder 屬性(例如,spring.cloud.stream.defaultBinder=rabbit),也可以單獨執行,透過在每個繫結上配置繫結器。例如,一個從 Kafka 讀取並寫入 RabbitMQ 的處理器應用程式(分別具有名為 inputoutput 的讀寫繫結)可以指定以下配置

spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit

連線到多個系統

預設情況下,繫結器共享應用程式的 Spring Boot 自動配置,以便建立 classpath 上找到的每個繫結器的一個例項。如果您的應用程式需要連線到多個相同型別的訊息代理,您可以指定多個繫結器配置,每個配置具有不同的環境設定。

啟用顯式繫結器配置會完全停用預設的繫結器配置過程。如果這樣做,所有正在使用的繫結器都必須包含在配置中。旨在透明使用 Spring Cloud Stream 的框架可以建立可透過名稱引用的繫結器配置,但它們不影響預設的繫結器配置。為此,繫結器配置可以將其 defaultCandidate 標誌設定為 false(例如,spring.cloud.stream.binders.<configurationName>.defaultCandidate=false)。這表示一個獨立於預設繫結器配置過程而存在的配置。

以下示例顯示了一個連線到兩個 RabbitMQ 訊息代理例項的處理器應用程式的典型配置

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: thing1
          binder: rabbit1
        output:
          destination: thing2
          binder: rabbit2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host1>
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host2>
特定繫結器的 environment 屬性也可以用於任何 Spring Boot 屬性,包括 spring.main.sources,這對於為特定繫結器新增額外配置非常有用,例如覆蓋自動配置的 bean。

例如;

environment:
    spring:
        main:
           sources: com.acme.config.MyCustomBinderConfiguration

要為特定繫結器環境啟用特定配置檔案,您應該使用 spring.profiles.active 屬性

environment:
    spring:
        profiles:
           active: myBinderProfile

在多繫結器應用程式中定製繫結器

當應用程式中包含多個繫結器並希望對繫結器進行定製時,可以透過提供 BinderCustomizer 實現來實現。在單繫結器應用程式的情況下,不需要此特殊的定製器,因為繫結器上下文可以直接訪問定製 bean。然而,在多繫結器場景中並非如此,因為各種繫結器位於不同的應用程式上下文中。透過提供 BinderCustomizer 介面的實現,繫結器儘管位於不同的應用程式上下文中,但將接收到定製。Spring Cloud Stream 確保在應用程式開始使用繫結器之前進行定製。使用者必須檢查繫結器型別,然後應用必要的定製。

以下是提供 BinderCustomizer bean 的示例。

@Bean
public BinderCustomizer binderCustomizer() {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
            kafkaMessageChannelBinder.setRebalanceListener(...);
        }
        else if (binder instanceof KStreamBinder) {
            ...
        }
        else if (binder instanceof RabbitMessageChannelBinder) {
            ...
        }
    };
}

請注意,當存在多個相同型別的繫結器例項時,可以使用繫結器名稱來過濾定製。

繫結視覺化和控制

Spring Cloud Stream 透過 Actuator 端點和程式設計式方式支援繫結的視覺化和控制。

程式設計式方式

自 3.1 版本起,我們暴露了 org.springframework.cloud.stream.binding.BindingsLifecycleController,它被註冊為一個 bean,一旦注入即可用於控制單個繫結的生命週期

例如,看看其中一個測試用例的片段。正如您所見,我們從 Spring 應用程式上下文中檢索 BindingsLifecycleController 並執行單個方法來控制 echo-in-0 繫結的生命週期。

BindingsLifecycleController bindingsController = context.getBean(BindingsLifecycleController.class);
Binding binding = bindingsController.queryState("echo-in-0");
assertThat(binding.isRunning()).isTrue();
bindingsController.changeState("echo-in-0", State.STOPPED);
//Alternative way of changing state. For convenience we expose start/stop and pause/resume operations.
//bindingsController.stop("echo-in-0")
assertThat(binding.isRunning()).isFalse();

Actuator

由於 Actuator 和 Web 是可選的,您必須首先手動新增一個 Web 依賴項以及 Actuator 依賴項。以下示例展示瞭如何為 Web 框架新增依賴項

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

以下示例展示瞭如何為 WebFlux 框架新增依賴項

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

您可以按如下方式新增 Actuator 依賴項

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
要在 Cloud Foundry 中執行 Spring Cloud Stream 2.0 應用程式,您必須將 spring-boot-starter-webspring-boot-starter-actuator 新增到 classpath 中。否則,由於健康檢查失敗,應用程式將無法啟動。

您還必須透過設定以下屬性來啟用 bindings Actuator 端點:--management.endpoints.web.exposure.include=bindings

滿足這些先決條件後。應用程式啟動時,您應該在日誌中看到以下內容

: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .

要視覺化當前的繫結,請訪問以下 URL:http://<host>:<port>/actuator/bindings

或者,要檢視單個繫結,請訪問類似於以下 URL 的一個:http://<host>:<port>/actuator/bindings/<bindingName>;

您還可以透過向同一 URL 傳送請求,並在 JSON 中提供一個 state 引數來停止、啟動、暫停和恢復單個繫結,如下例所示

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
PAUSEDRESUMED 僅在相應的繫結器及其底層技術支援時有效。否則,您將在日誌中看到警告訊息。目前,只有 Kafka 和 [Solace](https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pauseresume) 繫結器支援 PAUSEDRESUMED 狀態。

繫結器配置屬性

自定義繫結器配置時,以下屬性可用。這些屬性透過 org.springframework.cloud.stream.config.BinderProperties 公開

它們必須以 spring.cloud.stream.binders.<configurationName> 為字首。

type

繫結器型別。它通常引用類路徑中找到的繫結器之一,特別是 META-INF/spring.binders 檔案中的一個鍵。

預設情況下,它與配置名稱相同。

inheritEnvironment

配置是否繼承應用程式本身的環境。

預設值:true

environment

一組屬性的根,可用於自定義繫結器的環境。設定此屬性後,建立繫結器的上下文不是應用程式上下文的子上下文。此設定允許繫結器元件與應用程式元件完全分離。

預設值:empty

defaultCandidate

繫結器配置是否可以被視為預設繫結器,或只能在明確引用時使用。此設定允許新增繫結器配置而不干擾預設處理。

預設值:true

實現自定義繫結器

為了實現自定義 Binder,您只需要

  • 新增所需依賴

  • 提供 ProvisioningProvider 實現

  • 提供 MessageProducer 實現

  • 提供 MessageHandler 實現

  • 提供 Binder 實現

  • 建立繫結器配置

  • 在 META-INF/spring.binders 中定義您的繫結器

新增所需依賴

spring-cloud-stream 依賴新增到您的專案 *(例如,對於 Maven)*

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>${spring.cloud.stream.version}</version>
</dependency>

提供 ProvisioningProvider 實現

ProvisioningProvider 負責消費者和生產者目的地的配置,並需要將 application.yml 或 application.properties 檔案中包含的邏輯目的地轉換為物理目的地引用。

下面是一個 ProvisioningProvider 實現示例,它僅修剪透過輸入/輸出繫結配置提供的目的地

public class FileMessageBinderProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {

    @Override
    public ProducerDestination provisionProducerDestination(
            final String name,
            final ProducerProperties properties) {

        return new FileMessageDestination(name);
    }

    @Override
    public ConsumerDestination provisionConsumerDestination(
            final String name,
            final String group,
            final ConsumerProperties properties) {

        return new FileMessageDestination(name);
    }

    private class FileMessageDestination implements ProducerDestination, ConsumerDestination {

        private final String destination;

        private FileMessageDestination(final String destination) {
            this.destination = destination;
        }

        @Override
        public String getName() {
            return destination.trim();
        }

        @Override
        public String getNameForPartition(int partition) {
            throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
        }

    }

}

提供 MessageProducer 實現

MessageProducer 負責消費事件並將其作為訊息處理給配置為消費此類事件的客戶端應用程式。

這裡是一個 MessageProducer 實現示例,它擴充套件了 MessageProducerSupport 抽象,以便輪詢專案路徑中與修剪後的目的地名稱匹配的檔案,同時還會歸檔已讀訊息並丟棄後續的相同訊息。

public class FileMessageProducer extends MessageProducerSupport {

    public static final String ARCHIVE = "archive.txt";
    private final ConsumerDestination destination;
    private String previousPayload;

    public FileMessageProducer(ConsumerDestination destination) {
        this.destination = destination;
    }

    @Override
    public void doStart() {
        receive();
    }

    private void receive() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

        executorService.scheduleWithFixedDelay(() -> {
            String payload = getPayload();

            if(payload != null) {
                Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
                archiveMessage(payload);
                sendMessage(receivedMessage);
            }

        }, 0, 50, MILLISECONDS);
    }

    private String getPayload() {
        try {
            List<String> allLines = Files.readAllLines(Paths.get(destination.getName()));
            String currentPayload = allLines.get(allLines.size() - 1);

            if(!currentPayload.equals(previousPayload)) {
                previousPayload = currentPayload;
                return currentPayload;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        return null;
    }

    private void archiveMessage(String payload) {
        try {
            Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}
在實現自定義繫結器時,此步驟並非嚴格必需,因為您始終可以使用已有的 MessageProducer 實現!

提供 MessageHandler 實現

MessageHandler 提供生成事件所需的邏輯。

這裡是 MessageHandler 實現示例

public class FileMessageHandler implements MessageHandler{

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        //write message to file
    }

}
在實現自定義繫結器時,此步驟並非嚴格必需,因為您始終可以使用已有的 MessageHandler 實現!

提供 Binder 實現

您現在可以提供自己的 Binder 抽象實現。這可以透過以下方式輕鬆完成:

  • 擴充套件 AbstractMessageChannelBinder

  • 將您的 ProvisioningProvider 指定為 AbstractMessageChannelBinder 的泛型引數

  • 覆蓋 createProducerMessageHandlercreateConsumerEndpoint 方法

例如:

public class FileMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, FileMessageBinderProvisioner> {

    public FileMessageBinder(
            String[] headersToEmbed,
            FileMessageBinderProvisioner provisioningProvider) {

        super(headersToEmbed, provisioningProvider);
    }

    @Override
    protected MessageHandler createProducerMessageHandler(
            final ProducerDestination destination,
            final ProducerProperties producerProperties,
            final MessageChannel errorChannel) throws Exception {

        return message -> {
            String fileName = destination.getName();
            String payload = new String((byte[])message.getPayload()) + "\n";

            try {
                Files.write(Paths.get(fileName), payload.getBytes(), CREATE, APPEND);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }

    @Override
    protected MessageProducer createConsumerEndpoint(
            final ConsumerDestination destination,
            final String group,
            final ConsumerProperties properties) throws Exception {

        return new FileMessageProducer(destination);
    }

}

建立繫結器配置

嚴格要求您建立一個 Spring 配置來初始化您的繫結器實現所需的 bean *(以及您可能需要的其他所有 bean)*

@Configuration
public class FileMessageBinderConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public FileMessageBinderProvisioner fileMessageBinderProvisioner() {
        return new FileMessageBinderProvisioner();
    }

    @Bean
    @ConditionalOnMissingBean
    public FileMessageBinder fileMessageBinder(FileMessageBinderProvisioner fileMessageBinderProvisioner) {
        return new FileMessageBinder(null, fileMessageBinderProvisioner);
    }

}

在 META-INF/spring.binders 中定義您的繫結器

最後,您必須在類路徑上的 META-INF/spring.binders 檔案中定義您的繫結器,指定繫結器的名稱以及您的繫結器配置類的完全限定名。

myFileBinder:\
com.example.springcloudstreamcustombinder.config.FileMessageBinderConfiguration

配置選項

Spring Cloud Stream 支援通用配置選項以及繫結和繫結器的配置。某些繫結器允許額外的繫結屬性支援中介軟體特定功能。

可以透過 Spring Boot 支援的任何機制向 Spring Cloud Stream 應用程式提供配置選項。這包括應用程式引數、環境變數以及 YAML 或 .properties 檔案。

繫結服務屬性

這些屬性透過 org.springframework.cloud.stream.config.BindingServiceProperties 公開

spring.cloud.stream.instanceCount

應用程式的部署例項數。在生產者端進行分割槽時必須設定。使用 RabbitMQ 和 Kafka(如果 autoRebalanceEnabled=false)時,必須在消費者端設定。

預設值:1

spring.cloud.stream.instanceIndex

應用程式的例項索引:從 0instanceCount - 1 的數字。用於 RabbitMQ 和 Kafka(如果 autoRebalanceEnabled=false)的分割槽。在 Cloud Foundry 中自動設定以匹配應用程式的例項索引。

spring.cloud.stream.dynamicDestinations

可以動態繫結的目的地列表(例如,在動態路由場景中)。如果設定,則只能繫結列出的目的地。

預設值:空(允許繫結任何目的地)。

spring.cloud.stream.defaultBinder

如果配置了多個繫結器,要使用的預設繫結器。參見 類路徑上的多個繫結器

預設值:空。

spring.cloud.stream.overrideCloudConnectors

此屬性僅在 cloud profile 啟用且應用程式提供了 Spring Cloud Connectors 時適用。如果屬性為 false(預設值),繫結器會檢測合適的已繫結服務(例如,在 Cloud Foundry 中為 RabbitMQ 繫結器繫結的 RabbitMQ 服務)並使用它來建立連線(通常透過 Spring Cloud Connectors)。當設定為 true 時,此屬性指示繫結器完全忽略已繫結服務並依賴 Spring Boot 屬性(例如,對於 RabbitMQ 繫結器,依賴環境中提供的 spring.rabbitmq.* 屬性)。此屬性的典型用法是在自定義環境中 連線到多個系統時進行巢狀。

預設值:false

spring.cloud.stream.bindingRetryInterval

重試繫結建立的間隔(秒),例如當繫結器不支援後期繫結且 broker(例如 Apache Kafka)宕機時。將其設定為零會將此類情況視為致命錯誤,阻止應用程式啟動。

預設值:30

繫結屬性

繫結屬性透過使用 spring.cloud.stream.bindings.<bindingName>.<property>=<value> 的格式提供。<bindingName> 表示正在配置的繫結的名稱。

例如,對於以下函式

@Bean
public Function<String, String> uppercase() {
	return v -> v.toUpperCase();
}

對於輸入,有兩個繫結,名為 uppercase-in-0;對於輸出,有一個繫結,名為 uppercase-out-0。更多詳情請參見 繫結和繫結名稱

為避免重複,Spring Cloud Stream 支援為所有繫結設定值,通用繫結屬性使用 spring.cloud.stream.default.<property>=<value>spring.cloud.stream.default.<producer|consumer>.<property>=<value> 的格式。

對於擴充套件繫結屬性,為避免重複,應使用以下格式 - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>

通用繫結屬性

這些屬性透過 org.springframework.cloud.stream.config.BindingProperties 公開

以下繫結屬性適用於輸入和輸出繫結,並且必須以 spring.cloud.stream.bindings.<bindingName>. 為字首(例如,spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock)。

可以透過使用 spring.cloud.stream.default 字首設定預設值(例如 spring.cloud.stream.default.contentType=application/json)。

destination

繫結在已繫結中介軟體上的目標目的地(例如,RabbitMQ exchange 或 Kafka topic)。如果繫結代表消費者繫結(輸入),它可以繫結到多個目的地,目的地名稱可以指定為逗號分隔的 String 值。如果未指定,則使用實際的繫結名稱。此屬性的預設值無法覆蓋。

group

繫結的消費者組。僅適用於入站繫結。參見 消費者組

預設值:null(表示匿名消費者)。

contentType

此繫結的內容型別。參見 內容型別協商

預設值:application/json

binder

此繫結使用的繫結器。更多詳情請參見 類路徑上的多個繫結器

預設值:null(如果存在,則使用預設繫結器)。

消費者屬性

這些屬性透過 org.springframework.cloud.stream.binder.ConsumerProperties 公開

以下繫結屬性僅適用於輸入繫結,並且必須以 spring.cloud.stream.bindings.<bindingName>.consumer. 為字首(例如,spring.cloud.stream.bindings.input.consumer.concurrency=3)。

可以透過使用 spring.cloud.stream.default.consumer 字首設定預設值(例如,spring.cloud.stream.default.consumer.headerMode=none)。

autoStartup

指示此消費者是否需要自動啟動

預設值:true

concurrency

入站消費者的併發度。

預設值:1

partitioned

消費者是否接收來自已分割槽生產者的資料。

預設值:false

headerMode

當設定為 none 時,停用輸入的 header 解析。僅對不原生支援訊息 header 且需要嵌入 header 的訊息中介軟體有效。此選項在消費來自非 Spring Cloud Stream 應用程式的資料(當不支援原生 header 時)時很有用。當設定為 headers 時,它使用中介軟體的原生 header 機制。當設定為 embeddedHeaders 時,它將 header 嵌入到訊息負載中。

預設值:取決於繫結器實現。

maxAttempts

如果處理失敗,處理訊息的嘗試次數(包括第一次)。設定為 1 以停用重試。

預設值:3

backOffInitialInterval

重試時的退避初始間隔。

預設值:1000

backOffMaxInterval

最大退避間隔。

預設值:10000

backOffMultiplier

退避乘數。

預設值:2.0

defaultRetryable

監聽器丟擲的未列在 retryableExceptions 中的異常是否可重試。

預設值:true

instanceCount

當設定為大於等於零的值時,允許自定義此消費者的例項數(如果與 spring.cloud.stream.instanceCount 不同)。當設定為負值時,預設使用 spring.cloud.stream.instanceCount。更多資訊請參見 例項索引和例項數

預設值:-1

instanceIndex

當設定為大於等於零的值時,允許自定義此消費者的例項索引(如果與 spring.cloud.stream.instanceIndex 不同)。當設定為負值時,預設使用 spring.cloud.stream.instanceIndex。如果提供了 instanceIndexList,則忽略。更多資訊請參見 例項索引和例項數

預設值:-1

instanceIndexList

與不支援原生分割槽(例如 RabbitMQ)的繫結器一起使用;允許應用程式例項從多個分割槽消費。

預設值:空。

retryableExceptions

一個以 Throwable 類名為鍵、布林值為值的對映。指定哪些異常(及其子類)將或不會被重試。另請參閱 defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

預設值:空。

useNativeDecoding

當設定為 true 時,入站訊息由客戶端庫直接反序列化,客戶端庫必須進行相應的配置(例如,設定適當的 Kafka 生產者值反序列化器)。使用此配置時,入站訊息的解組不基於繫結的 contentType。使用原生解碼時,生產者有責任使用適當的編碼器(例如,Kafka 生產者值序列化器)來序列化出站訊息。此外,使用原生編碼和解碼時,headerMode=embeddedHeaders 屬性將被忽略,header 不會嵌入到訊息中。參見生產者屬性 useNativeEncoding

預設值:false

multiplex

當設定為 true 時,底層繫結器將在同一輸入繫結上原生複用目的地。

預設值:false

高階消費者配置

對於訊息驅動消費者底層訊息監聽容器的高階配置,請向應用程式上下文新增一個 ListenerContainerCustomizer bean。它將在應用上述屬性後呼叫,可用於設定附加屬性。類似地,對於輪詢消費者,請新增一個 MessageSourceCustomizer bean。

以下是 RabbitMQ 繫結器示例

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
    return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}

@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
    return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}

生產者屬性

這些屬性透過 org.springframework.cloud.stream.binder.ProducerProperties 公開

以下繫結屬性僅適用於輸出繫結,並且必須以 spring.cloud.stream.bindings.<bindingName>.producer. 為字首(例如,spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id)。

可以透過使用字首 spring.cloud.stream.default.producer 設定預設值(例如,spring.cloud.stream.default.producer.partitionKeyExpression=headers.id)。

autoStartup

指示此消費者是否需要自動啟動

預設值:true

partitionKeyExpression

一個 SpEL 表示式,用於確定如何對出站資料進行分割槽。如果設定,此繫結的出站資料將進行分割槽。partitionCount 必須設定為大於 1 的值才能生效。參見 分割槽支援

預設值:null。

partitionKeyExtractorName

實現 PartitionKeyExtractorStrategy 的 bean 名稱。用於提取一個鍵,該鍵用於計算分割槽 ID(參見 'partitionSelector*')。與 'partitionKeyExpression' 互斥。

預設值:null。

partitionSelectorName

實現 PartitionSelectorStrategy 的 bean 名稱。用於根據分割槽鍵確定分割槽 ID(參見 'partitionKeyExtractor*')。與 'partitionSelectorExpression' 互斥。

預設值:null。

partitionSelectorExpression

一個 SpEL 表示式,用於自定義分割槽選擇。如果兩者都未設定,則分割槽選擇方式為 hashCode(key) % partitionCount,其中 key 透過 partitionKeyExpression 計算得出。

預設值:null

partitionCount

資料的目標分割槽數,如果啟用了分割槽。如果生產者已分割槽,則必須設定為大於 1 的值。在 Kafka 上,它被解釋為一個提示。將使用此值和目標 topic 分割槽數的較大者。

預設值:1

requiredGroups

一個逗號分隔的組列表,生產者必須確保訊息傳遞給這些組,即使它們在生產者建立後才啟動(例如,透過在 RabbitMQ 中預先建立持久佇列)。

headerMode

當設定為 none 時,停用輸出的 header 嵌入。僅對不原生支援訊息 header 且需要嵌入 header 的訊息中介軟體有效。此選項在為非 Spring Cloud Stream 應用程式生成資料(當不支援原生 header 時)時很有用。當設定為 headers 時,它使用中介軟體的原生 header 機制。當設定為 embeddedHeaders 時,它將 header 嵌入到訊息負載中。

預設值:取決於繫結器實現。

useNativeEncoding

當設定為 true 時,出站訊息由客戶端庫直接序列化,客戶端庫必須進行相應的配置(例如,設定適當的 Kafka 生產者值序列化器)。使用此配置時,出站訊息的編組不基於繫結的 contentType。使用原生編碼時,消費者有責任使用適當的解碼器(例如,Kafka 消費者值反序列化器)來反序列化入站訊息。此外,使用原生編碼和解碼時,headerMode=embeddedHeaders 屬性將被忽略,header 不會嵌入到訊息中。參見消費者屬性 useNativeDecoding

預設值:false

errorChannelEnabled

當設定為 true 時,如果繫結器支援非同步傳送結果,傳送失敗會發送到目標錯誤通道。更多資訊請參見錯誤處理。

預設值:false。

高階生產者配置

在某些情況下,生產者屬性不足以正確配置繫結器中的生產 MessageHandler,或者您可能更喜歡以程式設計方式配置此類生產 MessageHandler。無論原因如何,Spring Cloud Stream 都提供了 ProducerMessageHandlerCustomizer 來實現此目的。

@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {

	/**
	 * Configure a {@link MessageHandler} that is being created by the binder for the
	 * provided destination name.
	 * @param handler the {@link MessageHandler} from the binder.
	 * @param destinationName the bound destination name.
	 */
	void configure(H handler, String destinationName);

}

如您所見,它使您可以訪問實際的生產 MessageHandler 例項,您可以根據需要對其進行配置。您只需提供此策略的實現並將其配置為 `@Bean` 即可。

內容型別協商

資料轉換是任何訊息驅動微服務架構的核心功能之一。鑑於在 Spring Cloud Stream 中,此類資料表示為 Spring 的 Message,訊息在到達其目的地之前可能需要轉換為期望的形狀或大小。這出於兩個原因:

  1. 將入站訊息的內容轉換為匹配應用程式提供的處理器的簽名。

  2. 將出站訊息的內容轉換為線格式。

線格式通常是 byte[](對於 Kafka 和 Rabbit 繫結器確實如此),但它由繫結器實現決定。

在 Spring Cloud Stream 中,訊息轉換是透過 org.springframework.messaging.converter.MessageConverter 完成的。

作為後續詳情的補充,您可能還想閱讀以下 部落格文章

機制

為了更好地理解內容型別協商的機制和必要性,我們以以下訊息處理器為例,來看一個非常簡單的用例:

public Function<Person, String> personFunction {..}
為簡單起見,我們假設這是應用程式中唯一的處理器函式(我們假設沒有內部管道)。

前例中所示的處理器期望以 Person 物件作為引數,並生成 String 型別作為輸出。為了使框架成功地將入站 Message 作為引數傳遞給此處理器,它必須以某種方式將 Message 型別的 payload 從線格式轉換為 Person 型別。換句話說,框架必須找到並應用適當的 MessageConverter。為了實現這一點,框架需要使用者的一些指示。其中一個指示已經由處理器方法本身的簽名(Person 型別)提供。因此,理論上,這應該(並且在某些情況下)足夠了。然而,對於大多數用例,為了選擇適當的 MessageConverter,框架需要額外的資訊。缺失的部分就是 contentType

Spring Cloud Stream 提供了三種機制來定義 contentType(按優先順序排序):

  1. HEADER:內容型別可以透過 Message 本身傳遞。透過提供 contentType header,您可以宣告要用於查詢和應用適當 MessageConverter 的內容型別。

  2. BINDING:內容型別可以按目的地繫結設定,透過設定 spring.cloud.stream.bindings.input.content-type 屬性。

    屬性名稱中的 input 段對應於實際目的地的名稱(在本例中為“input”)。這種方法允許您按繫結宣告要用於查詢和應用適當 MessageConverter 的內容型別。
  3. DEFAULT:如果 contentType 不存在於 Message header 或繫結中,則使用預設的 application/json 內容型別來查詢和應用適當 MessageConverter

如前所述,前述列表還展示了在衝突情況下的優先順序順序。例如,由 header 提供的內容型別優先於任何其他內容型別。按繫結設定的內容型別也同樣適用,它本質上允許您覆蓋預設內容型別。然而,它也提供了一個合理的預設值(這是根據社群反饋確定的)。

application/json 設定為預設值的另一個原因源於分散式微服務架構帶來的互操作性要求,在這種架構中,生產者和消費者不僅可以在不同的 JVM 中執行,還可以在不同的非 JVM 平臺上執行。

當非 void 處理器方法返回時,如果返回值已經是 Message,則該 Message 將成為 payload。然而,當返回值不是 Message 時,將使用返回值作為 payload 構建新的 Message,同時繼承來自入站 Message 的 header,但排除由 SpringIntegrationProperties.messageHandlerNotPropagatedHeaders 定義或過濾的 header。預設情況下,那裡只設置了一個 header:contentType。這意味著新的 Message 沒有設定 contentType header,從而確保 contentType 可以演進。您始終可以選擇不從處理器方法返回 Message,在那裡您可以注入任何您想要的 header。

如果存在內部管道,則 Message 將透過相同的轉換過程傳送到下一個處理器。然而,如果不存在內部管道或您已到達其末尾,則 Message 將傳送回輸出目的地。

內容型別與引數型別

如前所述,為了使框架選擇適當的 MessageConverter,它需要引數型別資訊,並且可選地需要內容型別資訊。選擇適當 MessageConverter 的邏輯位於引數解析器(HandlerMethodArgumentResolvers)中,它們在呼叫使用者定義的處理器方法之前觸發(此時框架才知道實際的引數型別)。如果引數型別與當前 payload 的型別不匹配,框架會委託給預配置 MessageConverter 棧,看是否有任何一個可以轉換 payload。如您所見,MessageConverter 的 Object fromMessage(Message<?> message, Class<?> targetClass); 操作將 targetClass 作為其引數之一。框架還確保提供的 Message 始終包含 contentType header。當不存在 contentType header 時,它會注入按繫結設定的 contentType header 或預設的 contentType header。contentType 和引數型別的組合是框架確定訊息是否可以轉換為目標型別的機制。如果找不到適當的 MessageConverter,則會丟擲異常,您可以透過新增自定義 MessageConverter 來處理(參見 使用者自定義訊息轉換器)。

但是,如果 payload 型別與處理器方法宣告的目標型別匹配怎麼辦?在這種情況下,無需轉換,payload 會原封不動地傳遞。儘管這聽起來相當直接且合乎邏輯,但請記住將 Message<?>Object 作為引數的處理器方法。透過將目標型別宣告為 Object(它是 Java 中所有內容的 instanceof),您實際上放棄了轉換過程。

不要期望僅基於 contentType 將 Message 轉換為其他型別。請記住,contentType 是對目標型別的補充。如果您願意,可以提供一個提示,MessageConverter 可能會考慮,也可能不考慮。

訊息轉換器

MessageConverters 定義了兩個方法

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

理解這些方法的契約及其用法非常重要,尤其是在 Spring Cloud Stream 的上下文中。

fromMessage 方法將入站 Message 轉換為引數型別。Message 的 payload 可以是任何型別,並且 MessageConverter 的實際實現決定了是否支援多種型別。例如,某些 JSON 轉換器可能支援 byte[]String 等 payload 型別。這在應用程式包含內部管道(即 input → handler1 → handler2 →. . . → output)且上游處理器的輸出結果為可能不是初始線格式的 Message 時非常重要。

然而,toMessage 方法具有更嚴格的契約,並且必須始終將 Message 轉換為線格式:byte[]

因此,為了所有目的(特別是在實現您自己的轉換器時),您可以將這兩個方法視為具有以下簽名:

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);

提供的訊息轉換器

如前所述,框架已經提供了一系列 MessageConverters 來處理最常見的用例。以下列表描述了提供的 MessageConverters,按優先順序排序(第一個起作用的 MessageConverter 將被使用):

  1. JsonMessageConverter:顧名思義,當 contentTypeapplication/json(預設)時,它支援將 Message 的 payload 轉換為 POJO 或從 POJO 轉換。

  2. ByteArrayMessageConverter:當 contentTypeapplication/octet-stream 時,它支援將 Message 的 payload 從 byte[] 轉換為 byte[]。它本質上是直通的,主要為了向後相容而存在。

  3. ObjectStringMessageConverter:當 contentTypetext/plain 時,它支援將任何型別轉換為 String。它會呼叫 Object 的 toString() 方法,或者如果 payload 是 byte[],則呼叫 new String(byte[])

如果找不到適當的轉換器,框架會丟擲異常。發生這種情況時,您應該檢查程式碼和配置,確保沒有遺漏任何內容(即,確保透過使用繫結或 header 提供了 contentType)。然而,最有可能的情況是,您發現了一些不常見的用例(例如,自定義 contentType),並且當前提供的 MessageConverters 棧不知道如何轉換。如果是這種情況,您可以新增自定義 MessageConverter。參見 使用者自定義訊息轉換器

使用者自定義訊息轉換器

Spring Cloud Stream 提供了一種機制來定義和註冊額外的 MessageConverter。要使用它,請實現 org.springframework.messaging.converter.MessageConverter,並將其配置為 `@Bean`。然後,它會被新增到現有的 MessageConverter 棧中。

重要的是要理解,自定義 MessageConverter 實現會新增到現有棧的頭部。因此,自定義 MessageConverter 實現優先於現有實現,這使您可以覆蓋現有轉換器並新增新的轉換器。

以下示例展示瞭如何建立一個訊息轉換器 bean 以支援名為 application/bar 的新內容型別:

@SpringBootApplication
public static class SinkApplication {

    ...

    @Bean
    public MessageConverter customMessageConverter() {
        return new MyCustomMessageConverter();
    }
}

public class MyCustomMessageConverter extends AbstractMessageConverter {

    public MyCustomMessageConverter() {
        super(new MimeType("application", "bar"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (Bar.class.equals(clazz));
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
    }
}

應用間通訊

Spring Cloud Stream 實現了應用程式之間的通訊。應用間通訊是一個複雜的問題,涵蓋了以下主題所述的多個方面:

連線多個應用程式例項

雖然 Spring Cloud Stream 使單個 Spring Boot 應用程式易於連線到訊息系統,但 Spring Cloud Stream 的典型場景是建立多應用程式管道,其中微服務應用程式相互發送資料。您可以透過關聯“相鄰”應用程式的輸入和輸出目的地來實現此場景。

假設設計要求 Time Source 應用程式將資料傳送到 Log Sink 應用程式。您可以使用名為 ticktock 的通用目的地用於兩個應用程式內的繫結。

Time Source(具有名為 output 的繫結)將設定以下屬性

spring.cloud.stream.bindings.output.destination=ticktock

Log Sink(具有名為 input 的繫結)將設定以下屬性

spring.cloud.stream.bindings.input.destination=ticktock

例項索引和例項數

在擴充套件 Spring Cloud Stream 應用程式時,每個例項都可以接收關於同應用程式存在多少其他例項以及其自身的例項索引是什麼的資訊。Spring Cloud Stream 透過 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 屬性來實現這一點。例如,如果一個 HDFS sink 應用程式有三個例項,所有三個例項的 spring.cloud.stream.instanceCount 都設定為 3,而各個應用程式的 spring.cloud.stream.instanceIndex 分別設定為 012

當透過 Spring Cloud Data Flow 部署 Spring Cloud Stream 應用程式時,這些屬性會自動配置;當 Spring Cloud Stream 應用程式獨立啟動時,這些屬性必須正確設定。預設情況下,spring.cloud.stream.instanceCount1spring.cloud.stream.instanceIndex0

在擴充套件場景中,正確配置這兩個屬性對於總體而言處理分割槽行為(參見下文)非常重要,並且某些繫結器(例如 Kafka 繫結器)總是需要這兩個屬性,以確保資料在多個消費者例項之間正確拆分。

分割槽

Spring Cloud Stream 中的分割槽包括兩個任務:

配置輸出繫結以進行分割槽

您可以透過設定 partitionKeyExpressionpartitionKeyExtractorName 屬性中的一個且僅一個,以及 partitionCount 屬性,來配置輸出繫結以傳送已分割槽資料。

例如,以下是一個有效且典型的配置:

spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id
spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5

根據該示例配置,資料透過以下邏輯傳送到目標分割槽。

對於傳送到已分割槽輸出繫結的每條訊息,根據 partitionKeyExpression 計算分割槽鍵的值。partitionKeyExpression 是一個 SpEL 表示式,它對出站訊息進行評估(在前例中是訊息 header 中 id 的值),以提取分割槽鍵。

如果 SpEL 表示式不足以滿足您的需求,您可以提供 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的實現,並將其配置為 Bean (使用 @Bean 註解),以此來計算分割槽鍵值。如果 Application Context 中存在多個型別為 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的 Bean,您可以透過指定其名稱並使用 partitionKeyExtractorName 屬性進一步篩選,如下例所示:

--spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
    return new CustomPartitionKeyExtractorClass();
}
在 Spring Cloud Stream 的早期版本中,您可以透過設定 spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass 屬性來指定 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的實現。自版本 3.0 起,此屬性已移除。

計算出訊息鍵後,分割槽選擇過程會確定目標分割槽,其值為 0partitionCount - 1 之間。大多數場景下適用的預設計算基於以下公式:key.hashCode() % partitionCount。這可以在繫結上進行自定義,方法是設定一個針對“key”進行評估的 SpEL 表示式(透過 partitionSelectorExpression 屬性),或者將 org.springframework.cloud.stream.binder.PartitionSelectorStrategy 的實現配置為 Bean(使用 @Bean 註解)。與 PartitionKeyExtractorStrategy 類似,當 Application Context 中存在多個此型別的 Bean 時,您可以使用 spring.cloud.stream.bindings.output.producer.partitionSelectorName 屬性進一步篩選,如下例所示:

--spring.cloud.stream.bindings.func-out-0.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
    return new CustomPartitionSelectorClass();
}
在 Spring Cloud Stream 的早期版本中,您可以透過設定 spring.cloud.stream.bindings.output.producer.partitionSelectorClass 屬性來指定 org.springframework.cloud.stream.binder.PartitionSelectorStrategy 的實現。自版本 3.0 起,此屬性已移除。

配置用於分割槽的輸入繫結

透過設定其 partitioned 屬性以及應用程式自身的 instanceIndexinstanceCount 屬性,可以將輸入繫結(繫結名稱為 uppercase-in-0)配置為接收分割槽資料,如下例所示:

spring.cloud.stream.bindings.uppercase-in-0.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

instanceCount 值表示資料應在其中分割槽的應用程式例項總數。instanceIndex 在多個例項中必須是唯一值,其值在 0instanceCount - 1 之間。例項索引幫助每個應用程式例項識別其接收資料的唯一分割槽。需要支援本地不支援分割槽技術的 Binder。例如,對於 RabbitMQ,每個分割槽都有一個佇列,佇列名稱包含例項索引。對於 Kafka,如果 autoRebalanceEnabledtrue(預設),Kafka 會負責在例項之間分發分割槽,此時無需這些屬性。如果 autoRebalanceEnabled 設定為 false,Binder 將使用 instanceCountinstanceIndex 來確定例項訂閱哪些分割槽(您必須至少擁有與例項數量一樣多的分割槽)。Binder 代替 Kafka 分配分割槽。如果您希望特定分割槽的訊息總是傳送到同一例項,這可能會很有用。當 Binder 配置需要它們時,正確設定這兩個值非常重要,以確保所有資料都被消費,並且應用程式例項接收到互斥的資料集。

儘管在使用多個例項進行分割槽資料處理的獨立場景中設定可能很複雜,但 Spring Cloud Dataflow 可以透過正確填充輸入和輸出值,並讓您依賴執行時基礎設施提供有關例項索引和例項計數的資訊,從而顯著簡化這一過程。

測試

Spring Cloud Stream 提供了對微服務應用程式進行測試的支援,無需連線到訊息系統。

Spring Integration 測試 Binder

Spring Cloud Stream 提供了一個測試 Binder,您可以使用它來測試各種應用程式元件,而無需實際的真實世界 Binder 實現或訊息代理。

這個測試 Binder 充當**單元測試**和**整合測試**之間的橋樑,它基於 Spring Integration 框架,本質上是一個 JVM 內的訊息代理,為您提供了兩全其美的優勢——一個真實的 Binder,但無需網路。

測試 Binder 配置

要啟用 Spring Integration 測試 Binder,您只需將其新增為依賴項即可。

新增所需的依賴項

以下是所需的 Maven POM 條目示例。

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream-test-binder</artifactId>
	<scope>test</scope>
</dependency>

或者用於 build.gradle.kts

testImplementation("org.springframework.cloud:spring-cloud-stream-test-binder")

測試 Binder 用法

現在您可以將微服務作為一個簡單的單元測試進行測試

@SpringBootTest
public class SampleStreamTests {

	@Autowired
	private InputDestination input;

	@Autowired
	private OutputDestination output;

	@Test
	public void testEmptyConfiguration() {
		this.input.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}

	@SpringBootApplication
	@Import(TestChannelBinderConfiguration.class)
	public static class SampleConfiguration {
		@Bean
		public Function<String, String> uppercase() {
			return v -> v.toUpperCase();
		}
	}
}

如果您需要更多控制或想在同一個測試套件中測試多種配置,您也可以這樣做:

@EnableAutoConfiguration
public static class MyTestConfiguration {
	@Bean
	public Function<String, String> uppercase() {
			return v -> v.toUpperCase();
	}
}

. . .

@Test
public void sampleTest() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration.getCompleteConfiguration(
						MyTestConfiguration.class))
				.run("--spring.cloud.function.definition=uppercase")) {
		InputDestination source = context.getBean(InputDestination.class);
		OutputDestination target = context.getBean(OutputDestination.class);
		source.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(target.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}
}

對於具有多個繫結和/或多個輸入和輸出,或者只是想明確指定傳送或接收的目標名稱的場景,InputDestinationOutputDestinationsend()receive() 方法被過載,允許您提供輸入和輸出目標的名稱。

考慮以下示例

@EnableAutoConfiguration
public static class SampleFunctionConfiguration {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

以及實際的測試

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

對於具有額外對映屬性(如 destination)的場景,您應該使用這些名稱。例如,考慮前面測試的一個不同版本,我們在其中將 uppercase 函式的輸入和輸出顯式地對映到 myInputmyOutput 繫結名稱:

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleFunctionConfiguration.class))
							.run(
							"--spring.cloud.function.definition=uppercase;reverse",
							"--spring.cloud.stream.bindings.uppercase-in-0.destination=myInput",
							"--spring.cloud.stream.bindings.uppercase-out-0.destination=myOutput"
							)) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "myInput");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "myOutput");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

測試 Binder 和 PollableMessageSource

Spring Integration 測試 Binder 也允許您在使用 PollableMessageSource 時編寫測試(更多詳細資訊請參閱 使用輪詢消費者)。

然而,需要理解的重要一點是,輪詢不是事件驅動的,並且 PollableMessageSource 是一種策略,它暴露了用於生產(輪詢)**一個**訊息(Message,單數)的操作。您多久輪詢一次、使用多少執行緒或者從哪裡輪詢(訊息佇列還是檔案系統)完全取決於您;換句話說,配置輪詢器 (Poller)、執行緒或訊息 (Message) 的實際源是您的責任。幸運的是,Spring 有許多抽象來精確地配置這些。

讓我們看一個示例

@Test
public void samplePollingTest() {
	ApplicationContext context = new SpringApplicationBuilder(SamplePolledConfiguration.class)
				.web(WebApplicationType.NONE)
				.run("--spring.jmx.enabled=false", "--spring.cloud.stream.pollable-source=myDestination");
	OutputDestination destination = context.getBean(OutputDestination.class);
	System.out.println("Message 1: " + new String(destination.receive().getPayload()));
	System.out.println("Message 2: " + new String(destination.receive().getPayload()));
	System.out.println("Message 3: " + new String(destination.receive().getPayload()));
}

@Import(TestChannelBinderConfiguration.class)
@EnableAutoConfiguration
public static class SamplePolledConfiguration {
	@Bean
	public ApplicationRunner poller(PollableMessageSource polledMessageSource, StreamBridge output, TaskExecutor taskScheduler) {
		return args -> {
			taskScheduler.execute(() -> {
				for (int i = 0; i < 3; i++) {
					try {
						if (!polledMessageSource.poll(m -> {
							String newPayload = ((String) m.getPayload()).toUpperCase();
							output.send("myOutput", newPayload);
						})) {
							Thread.sleep(2000);
						}
					}
					catch (Exception e) {
						// handle failure
					}
				}
			});
		};
	}
}

上面的(非常基本的)示例將以 2 秒的間隔產生 3 條訊息,並將它們傳送到 Source 的輸出目標,Binder 將這些訊息傳送到 OutputDestination,我們在那裡檢索它們(進行斷言)。目前,它會列印以下內容:

Message 1: POLLED DATA
Message 2: POLLED DATA
Message 3: POLLED DATA

如您所見,資料是相同的。這是因為該 Binder 定義了實際的 MessageSource 的預設實現——訊息透過 poll() 操作從中輪詢的源。雖然對於大多數測試場景來說足夠了,但在某些情況下您可能想定義自己的 MessageSource。為此,只需在測試配置中配置一個型別為 MessageSource 的 Bean,提供您自己的訊息源實現。

示例如下:

@Bean
public MessageSource<?> source() {
	return () -> new GenericMessage<>("My Own Data " + UUID.randomUUID());
}

生成以下輸出:

Message 1: MY OWN DATA 1C180A91-E79F-494F-ABF4-BA3F993710DA
Message 2: MY OWN DATA D8F3A477-5547-41B4-9434-E69DA7616FEE
Message 3: MY OWN DATA 20BF2E64-7FF4-4CB6-A823-4053D30B5C74
**請勿**將此 Bean 命名為 messageSource,因為它會與 Spring Boot 為無關原因提供的同名(不同型別)的 Bean 衝突。

關於在測試中混合使用測試 Binder 和常規中介軟體 Binder 的特別說明

基於 Spring Integration 的測試 Binder 用於在不涉及實際中介軟體 Binder(如 Kafka 或 RabbitMQ Binder)的情況下測試應用程式。如上文所述,測試 Binder 透過依賴記憶體中的 Spring Integration 通道幫助您快速驗證應用程式的行為。當測試類路徑中存在測試 Binder 時,Spring Cloud Stream 將嘗試使用此 Binder 進行所有需要 Binder 進行通訊的測試目的。換句話說,您不能在同一模組中混合使用測試 Binder 和常規中介軟體 Binder 進行測試。在使用測試 Binder 測試應用程式後,如果您想繼續使用實際中介軟體 Binder 進行進一步的整合測試,建議將使用實際 Binder 的測試放在單獨的模組中,以便這些測試能夠與實際中介軟體建立正確的連線,而不是依賴測試 Binder 提供的記憶體通道。

健康指示器

Spring Cloud Stream 為 Binder 提供了健康指示器。它註冊在名稱 binders 下,可以透過設定 management.health.binders.enabled 屬性來啟用或停用。

要啟用健康檢查,首先需要透過包含其依賴項來同時啟用“web”和“actuator”(參閱 繫結視覺化和控制)。

如果應用程式未顯式設定 management.health.binders.enabled,則 management.health.defaults.enabled 將被匹配為 true,Binder 健康指示器將啟用。如果您想完全停用健康指示器,則必須將 management.health.binders.enabled 設定為 false

您可以使用 Spring Boot actuator 健康端點訪問健康指示器 - /actuator/health。預設情況下,當您訪問上述端點時,只會收到頂層應用程式狀態。為了接收 Binder 特定健康指示器的完整詳細資訊,您需要在應用程式中包含屬性 management.endpoint.health.show-details,並將其值設定為 ALWAYS

健康指示器是 Binder 特定的,某些 Binder 實現可能不一定提供健康指示器。

如果您想完全停用所有開箱即用的健康指示器並提供自己的健康指示器,可以透過將屬性 management.health.binders.enabled 設定為 false,然後在應用程式中提供自己的 HealthIndicator Bean 來實現。在這種情況下,Spring Boot 的健康指示器基礎設施仍然會拾取這些自定義 Bean。即使您沒有停用 Binder 健康指示器,您仍然可以透過在開箱即用健康檢查之外提供自己的 HealthIndicator Bean 來增強健康檢查。

當同一個應用程式中有多個 Binder 時,除非應用程式透過將 management.health.binders.enabled 設定為 false 來關閉它們,否則預設會啟用健康指示器。在這種情況下,如果使用者想停用部分 Binder 的健康檢查,則應該在多 Binder 配置的環境中將 management.health.binders.enabled 設定為 false。有關如何提供特定於環境的屬性的詳細資訊,請參閱 連線到多個系統

如果在類路徑中存在多個 Binder,但並非所有 Binder 都被應用程式使用,這可能會在健康指示器方面引起一些問題。具體的實現細節可能因 Binder 如何執行健康檢查而異。例如,Kafka Binder 可能在沒有 Binder 註冊任何目標 (destinations) 的情況下判定狀態為 DOWN

舉一個具體的例子。假設您的類路徑中同時存在 Kafka 和 Kafka Streams Binder,但在應用程式程式碼中只使用了 Kafka Streams Binder,即只提供了使用 Kafka Streams Binder 的繫結。由於 Kafka Binder 未被使用,並且它有特定的檢查來檢視是否註冊了任何目標,因此該 Binder 的健康檢查將失敗。頂層應用程式健康檢查狀態將報告為 DOWN。在這種情況下,由於您沒有使用 Kafka Binder,可以直接從應用程式中移除對它的依賴。

示例

有關 Spring Cloud Stream 示例,請參閱 GitHub 上的 spring-cloud-stream-samples 倉庫。

在 CloudFoundry 上部署流應用程式

在 CloudFoundry 上,服務通常透過一個名為 VCAP_SERVICES 的特殊環境變數暴露。

配置 Binder 連線時,您可以按照 dataflow Cloud Foundry Server 文件中的說明,使用環境變數中的值。

Binder 實現

以下是可用的 Binder 實現列表:

如前所述,Binder 抽象也是框架的擴充套件點之一。因此,如果您在前面的列表中找不到合適的繫結器,您可以在 Spring Cloud Stream 之上實現自己的繫結器。在如何從零開始建立 Spring Cloud Stream Binder 一文中,社群成員詳細記錄了實現自定義繫結器所需的一系列步驟,並提供了示例。這些步驟也在 實現自定義繫結器 部分中突出顯示。