4.0.5
前言
Spring 資料整合之旅的簡史
Spring 的資料整合之旅始於 Spring Integration。憑藉其程式設計模型,它提供了始終如一的開發人員體驗,以構建可以採用 企業整合模式 來連線外部系統(例如資料庫、訊息代理等)的應用程式。
快進到雲時代,微服務在企業環境中變得突出。Spring Boot 改變了開發人員構建應用程式的方式。憑藉 Spring 的程式設計模型和 Spring Boot 處理的執行時職責,開發獨立的、生產級的基於 Spring 的微服務變得無縫。
為了將此擴充套件到資料整合工作負載,Spring Integration 和 Spring Boot 被組合成一個新專案。Spring Cloud Stream 應運而生。
使用 Spring Cloud Stream,開發人員可以
-
獨立構建、測試和部署以資料為中心的應用程式。
-
應用現代微服務架構模式,包括透過訊息傳遞進行組合。
-
透過以事件為中心的思維解耦應用程式職責。事件可以表示在特定時間發生的事情,下游消費者應用程式可以對其做出反應,而無需知道其來源或生產者的身份。
-
將業務邏輯移植到訊息代理(如 RabbitMQ、Apache Kafka、Amazon Kinesis)。
-
依賴框架對常見用例的自動內容型別支援。可以擴充套件到不同的資料轉換型別。
-
以及更多。。。
快速入門
您可以透過遵循此三步指南,在不到 5 分鐘的時間內嘗試 Spring Cloud Stream,甚至在深入瞭解任何細節之前。
我們向您展示如何建立一個 Spring Cloud Stream 應用程式,該應用程式接收來自您選擇的訊息中介軟體(稍後會詳細介紹)的訊息,並將接收到的訊息記錄到控制檯。我們稱之為 LoggingConsumer。雖然不實用,但它很好地介紹了它的一些主要概念和抽象,使消化本使用者指南的其餘部分變得更容易。
這三個步驟如下:
使用 Spring Initializr 建立示例應用程式
要開始,請訪問 Spring Initializr。從那裡,您可以生成我們的 LoggingConsumer 應用程式。為此
-
在Dependencies 部分,開始輸入
stream。當出現“Cloud Stream”選項時,選擇它。 -
開始輸入“kafka”或“rabbit”。
-
選擇“Kafka”或“RabbitMQ”。
基本上,您選擇應用程式繫結的訊息中介軟體。我們建議使用您已安裝或更喜歡安裝和執行的那個。此外,正如您從 Initilaizer 螢幕中看到的,還有其他一些選項可供選擇。例如,您可以選擇 Gradle 作為構建工具而不是 Maven(預設)。
-
在Artifact 欄位中,輸入 'logging-consumer'。
Artifact 欄位的值成為應用程式名稱。如果您選擇 RabbitMQ 作為中介軟體,您的 Spring Initializr 現在應該如下所示
-
單擊Generate Project 按鈕。
這樣做會將生成的專案的壓縮版本下載到您的硬碟。
-
將檔案解壓到您要用作專案目錄的資料夾中。
| 我們鼓勵您探索 Spring Initializr 中提供的許多可能性。它允許您建立許多不同型別的 Spring 應用程式。 |
將專案匯入到 IDE
現在您可以將專案匯入到 IDE 中。請記住,根據 IDE 的不同,您可能需要遵循特定的匯入過程。例如,根據專案生成方式(Maven 或 Gradle),您可能需要遵循特定的匯入過程(例如,在 Eclipse 或 STS 中,您需要使用 File → Import → Maven → Existing Maven Project)。
匯入後,專案不得有任何錯誤。此外,src/main/java 應該包含 com.example.loggingconsumer.LoggingConsumerApplication。
從技術上講,此時您可以執行應用程式的主類。它已經是有效的 Spring Boot 應用程式。但是,它什麼都不做,所以我們想新增一些程式碼。
新增訊息處理程式、構建和執行
修改 com.example.loggingconsumer.LoggingConsumerApplication 類,使其看起來如下所示
@SpringBootApplication
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args);
}
@Bean
public Consumer<Person> log() {
return person -> {
System.out.println("Received: " + person);
};
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
}
從前面的列表中可以看到
-
我們使用函數語言程式設計模型(參見 Spring Cloud Function 支援)將單個訊息處理程式定義為
Consumer。 -
我們依賴框架約定將此類處理程式繫結到繫結器公開的輸入目標繫結。
這樣做還讓您可以看到框架的核心功能之一:它嘗試自動將傳入訊息負載轉換為 Person 型別。
您現在有一個功能齊全的 Spring Cloud Stream 應用程式,它偵聽訊息。為簡單起見,我們假設您在 第一步 中選擇了 RabbitMQ。假設您已安裝並執行 RabbitMQ,您可以透過在 IDE 中執行其 main 方法來啟動應用程式。
您應該看到以下輸出
--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
--- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
--- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
. . .
--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
. . .
--- [ main] c.e.l.LoggingConsumerApplication : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)
轉到 RabbitMQ 管理控制檯或任何其他 RabbitMQ 客戶端,並向 input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg 傳送一條訊息。anonymous.CbMIwdkJSBO1ZoPDOtHtCg 部分表示組名並是生成的,因此在您的環境中它肯定會有所不同。為了更具可預測性,您可以透過設定 spring.cloud.stream.bindings.input.group=hello(或您喜歡的任何名稱)來使用顯式組名。
訊息的內容應該是 Person 類的 JSON 表示,如下所示
{"name":"Sam Spade"}
然後,在您的控制檯中,您應該會看到
收到:山姆·斯佩德
您還可以將應用程式構建並打包成一個引導 jar(使用 ./mvnw clean install),然後使用 java -jar 命令執行構建好的 JAR。
現在您有了一個正在執行的(儘管非常基本)Spring Cloud Stream 應用程式。
流資料上下文中的 Spring Expression Language (SpEL)
在整個參考手冊中,您將遇到許多可以使用 Spring Expression Language (SpEL) 的功能和示例。理解使用它時的某些限制很重要。
SpEL 允許您訪問當前訊息以及您正在執行的應用程式上下文。但是,瞭解 SpEL 可以看到的資料型別很重要,尤其是在傳入訊息的上下文中。從代理,訊息以位元組陣列的形式到達。然後由繫結器將其轉換為 Message<byte[]>,您可以看到訊息的有效負載保持其原始形式。訊息的頭是 <String, Object>,其中值通常是另一個原始型別或原始型別的集合/陣列,因此是 Object。這是因為繫結器不知道所需的輸入型別,因為它無法訪問使用者程式碼(函式)。因此,繫結器有效地傳遞了一個帶有有效負載和一些可讀元資料(以訊息頭的形式)的信封,就像透過郵件傳送的信件一樣。這意味著雖然可以訪問訊息的有效負載,但您只能將其作為原始資料(即位元組陣列)訪問。雖然開發人員要求 SpEL 訪問有效負載物件的欄位作為具體型別(例如 Foo、Bar 等)可能很常見,但您可以看到實現起來有多麼困難甚至不可能。這裡有一個示例來演示這個問題;想象一下您有一個路由表示式,根據有效負載型別路由到不同的函式。這個要求意味著有效負載從位元組陣列轉換為特定型別,然後應用 SpEL。但是,為了執行這種轉換,我們需要知道要傳遞給轉換器的實際型別,而這來自函式簽名,我們不知道是哪個。解決此要求的更好方法是將型別資訊作為訊息頭傳遞(例如,application/json;type=foo.bar.Baz)。您將獲得一個清晰可讀的字串值,可以在一年內訪問和評估,並且易於閱讀的 SpEL 表示式。
此外,將負載用於路由決策被認為是非常糟糕的做法,因為負載被認為是特權資料——只有其最終接收者才能讀取的資料。同樣,使用郵件投遞類比,您不會希望郵遞員開啟您的信封並閱讀信件內容以做出某些投遞決策。同樣的原則也適用於此處,尤其是在生成訊息時包含此類資訊相對容易的情況下。它強制執行與透過網路傳輸資料設計相關的某種程度的紀律,以及哪些資料部分可以被視為公開的,哪些是特權資料。
Spring Cloud Stream 簡介
Spring Cloud Stream 是一個用於構建訊息驅動微服務應用程式的框架。Spring Cloud Stream 構建於 Spring Boot 之上,用於建立獨立的、生產級的 Spring 應用程式,並使用 Spring Integration 提供與訊息代理的連線。它提供了多種供應商中介軟體的規範配置,引入了持久釋出-訂閱語義、消費者組和分割槽等概念。
透過將 spring-cloud-stream 依賴項新增到應用程式的類路徑中,您可以立即連線到所提供的 spring-cloud-stream 繫結器公開的訊息代理(稍後會詳細介紹),並且您可以實現您的功能需求,該需求由 java.util.function.Function(根據傳入訊息)執行。
以下列表顯示了一個快速示例
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class, args);
}
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
以下列表顯示了相應的測試
@SpringBootTest(classes = SampleApplication.class)
@Import({TestChannelBinderConfiguration.class})
class BootTestStreamApplicationTests {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
@Test
void contextLoads() {
input.send(new GenericMessage<byte[]>("hello".getBytes()));
assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
}
}
主要概念
Spring Cloud Stream 提供了許多抽象和原語,可以簡化訊息驅動微服務應用程式的編寫。本節概述以下內容
應用程式模型
一個 Spring Cloud Stream 應用程式由一個與中介軟體無關的核心組成。應用程式透過在外部代理公開的目標和程式碼中的輸入/輸出引數之間建立繫結來與外部世界通訊。建立繫結所需的代理特定細節由中介軟體特定的繫結器實現處理。
Fat JAR
Spring Cloud Stream 應用程式可以從您的 IDE 以獨立模式執行進行測試。要在生產環境中執行 Spring Cloud Stream 應用程式,您可以使用 Maven 或 Gradle 提供的標準 Spring Boot 工具建立可執行(或“fat”)JAR。有關詳細資訊,請參閱 Spring Boot 參考指南。
繫結器抽象
Spring Cloud Stream 為 Kafka 和 Rabbit MQ 提供了 Binder 實現。該框架還包括一個測試繫結器,用於將您的應用程式作為 spring-cloud-stream 應用程式進行整合測試。有關詳細資訊,請參閱 測試 部分。
Binder 抽象也是框架的擴充套件點之一,這意味著您可以在 Spring Cloud Stream 之上實現自己的 Binder。在 How to create a Spring Cloud Stream Binder from scratch 一文中,社群成員詳細記錄了實現自定義 Binder 所需的步驟,並附帶了一個示例。這些步驟也在 實現自定義繫結器 部分中突出顯示。
Spring Cloud Stream 使用 Spring Boot 進行配置,Binder 抽象使 Spring Cloud Stream 應用程式能夠靈活地連線到中介軟體。例如,部署人員可以在執行時動態選擇外部目標(如 Kafka 主題或 RabbitMQ 交換機)與訊息處理程式(如函式的輸入引數及其返回引數)的輸入和輸出之間的對映。此類配置可以透過外部配置屬性以及 Spring Boot 支援的任何形式(包括應用程式引數、環境變數和 application.yml 或 application.properties 檔案)提供。在 Spring Cloud Stream 簡介 部分的 sink 示例中,將 spring.cloud.stream.bindings.input.destination 應用程式屬性設定為 raw-sensor-data 會使其從 raw-sensor-data Kafka 主題或繫結到 raw-sensor-data RabbitMQ 交換機的佇列中讀取。
Spring Cloud Stream 自動檢測並使用類路徑上找到的繫結器。您可以使用相同的程式碼處理不同型別的中介軟體。為此,請在構建時包含不同的繫結器。對於更復雜的用例,您還可以將多個繫結器與應用程式一起打包,並在執行時讓它選擇繫結器(甚至為不同的繫結使用不同的繫結器)。
持久釋出-訂閱支援
應用程式之間的通訊遵循釋出-訂閱模型,資料透過共享主題廣播。這可以在下圖中看到,它顯示了一組互動式 Spring Cloud Stream 應用程式的典型部署。
感測器向 HTTP 端點報告的資料被髮送到一個名為 raw-sensor-data 的公共目的地。從該目的地,它由一個計算時間窗平均值的微服務應用程式和一個將原始資料攝取到 HDFS(Hadoop 分散式檔案系統)的另一個微服務應用程式獨立處理。為了處理資料,這兩個應用程式在執行時都將該主題宣告為它們的輸入。
釋出-訂閱通訊模型降低了生產者和消費者的複雜性,並允許將新應用程式新增到拓撲中而不會中斷現有流。例如,在平均值計算應用程式的下游,您可以新增一個應用程式來計算要顯示和監控的最高溫度值。然後,您可以新增另一個應用程式來解釋相同的平均值流以進行故障檢測。透過共享主題而不是點對點佇列進行所有通訊減少了微服務之間的耦合。
雖然釋出-訂閱訊息傳遞的概念並不新鮮,但 Spring Cloud Stream 進一步將其作為其應用程式模型的首選。透過使用原生中介軟體支援,Spring Cloud Stream 還簡化了跨不同平臺使用釋出-訂閱模型。
消費者組
雖然釋出-訂閱模型使透過共享主題連線應用程式變得容易,但透過建立給定應用程式的多個例項來擴充套件同樣重要。這樣做時,應用程式的不同例項被放置在競爭消費者關係中,其中只有一個例項有望處理給定訊息。
Spring Cloud Stream 透過消費者組的概念來建模這種行為。(Spring Cloud Stream 消費者組與 Kafka 消費者組相似並受其啟發。)每個消費者繫結都可以使用 spring.cloud.stream.bindings.<bindingName>.group 屬性指定一個組名。對於下圖中所示的消費者,此屬性將設定為 spring.cloud.stream.bindings.<bindingName>.group=hdfsWrite 或 spring.cloud.stream.bindings.<bindingName>.group=average。
所有訂閱給定目標的組都會收到釋出資料的一份副本,但每個組中只有一個成員從該目標接收給定訊息。預設情況下,當未指定組時,Spring Cloud Stream 會將應用程式分配給一個匿名且獨立的單成員消費者組,該組與所有其他消費者組處於釋出-訂閱關係。
消費者型別
支援兩種型別的消費者
-
訊息驅動(有時稱為非同步)
-
輪詢(有時稱為同步)
在 2.0 版本之前,只支援非同步消費者。訊息一可用,且執行緒可用,就會立即傳遞。
當您希望控制訊息處理速率時,您可能需要使用同步消費者。
永續性
與 Spring Cloud Stream 的規範應用程式模型一致,消費者組訂閱是持久的。也就是說,繫結器實現確保組訂閱是持久的,並且一旦為一個組建立了至少一個訂閱,即使所有組中的應用程式都已停止,該組也會接收訊息。
|
匿名訂閱本質上是非持久的。對於某些繫結器實現(如 RabbitMQ),可以擁有非持久組訂閱。 |
通常,在將應用程式繫結到給定目標時,最好始終指定一個消費者組。在擴充套件 Spring Cloud Stream 應用程式時,您必須為其每個輸入繫結指定一個消費者組。這樣做可以防止應用程式例項接收重複訊息(除非需要這種行為,這很不尋常)。
分割槽支援
Spring Cloud Stream 支援在給定應用程式的多個例項之間進行資料分割槽。在分割槽場景中,物理通訊介質(例如代理主題)被視為被構造成多個分割槽。一個或多個生產者應用程式例項向多個消費者應用程式例項傳送資料,並確保由共同特徵標識的資料由相同的消費者例項處理。
Spring Cloud Stream 提供了一個通用抽象,以統一的方式實現分割槽處理用例。因此,無論代理本身是否自然分割槽(例如 Kafka)或不分割槽(例如 RabbitMQ),都可以使用分割槽。
分割槽是有狀態處理中的一個關鍵概念,在有狀態處理中,確保所有相關資料一起處理至關重要(出於效能或一致性原因)。例如,在時間窗平均值計算示例中,任何給定感測器的所有測量值都必須由相同的應用程式例項處理。
| 要設定分割槽處理場景,您必須同時配置資料生產端和資料消費端。 |
程式設計模型
要理解程式設計模型,您應該熟悉以下核心概念
-
目標繫結器:負責提供與外部訊息系統整合的元件。
-
繫結:外部訊息系統與應用程式提供的訊息生產者和消費者之間的橋樑(由目標繫結器建立)。
-
訊息:生產者和消費者用於與目標繫結器(並透過外部訊息系統與其他應用程式)通訊的規範資料結構。
目標繫結器
目標繫結器是 Spring Cloud Stream 的擴充套件元件,負責提供必要的配置和實現,以促進與外部訊息系統的整合。這種整合負責連線、訊息到生產者和消費者之間的委託和路由、資料型別轉換、使用者程式碼呼叫等。
繫結器處理了許多原本會落在您肩上的樣板職責。但是,為了實現這一點,繫結器仍然需要使用者提供一些幫助,以最少但必需的指令集的形式,這些指令通常以某種繫結配置的形式出現。
雖然本節超出範圍,無法討論所有可用的繫結器和繫結配置選項(手冊的其餘部分廣泛涵蓋了它們),但繫結作為一個概念確實需要特別注意。下一節將詳細討論它。
繫結
如前所述,繫結提供了外部訊息系統(例如,佇列、主題等)與應用程式提供的生產者和消費者之間的橋樑。
以下示例顯示了一個完全配置和執行的 Spring Cloud Stream 應用程式,該應用程式以 String 型別接收訊息負載(參見 內容型別協商 部分),將其記錄到控制檯並在將其轉換為大寫後將其傳送到下游。
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class, args);
}
@Bean
public Function<String, String> uppercase() {
return value -> {
System.out.println("Received: " + value);
return value.toUpperCase();
};
}
}
上述示例與任何普通的 Spring Boot 應用程式沒有什麼不同。它定義了一個 Function 型別的單個 bean,僅此而已。那麼,它是如何成為 Spring Cloud Stream 應用程式的呢?它成為 Spring Cloud Stream 應用程式僅僅是因為類路徑上存在 spring-cloud-stream 和繫結器依賴項以及自動配置類,有效地將您的 boot 應用程式的上下文設定為 Spring Cloud Stream 應用程式。在這種上下文中,Supplier、Function 或 Consumer 型別的 bean 被視為實際的訊息處理程式,觸發繫結到透過提供的繫結器公開的目標,遵循某些命名約定和規則,以避免額外的配置。
繫結和繫結名稱
繫結是一種抽象,它表示繫結器和使用者程式碼公開的源和目標之間的橋樑。此抽象有一個名稱,雖然我們盡力限制執行 spring-cloud-stream 應用程式所需的配置,但在需要額外每繫結配置的情況下,瞭解此類名稱是必要的。
在本手冊中,您將看到配置屬性的示例,例如 spring.cloud.stream.bindings.input.destination=myQueue。此屬性名稱中的 input 部分就是我們所說的繫結名稱,它可以透過多種機制派生。以下小節將描述 spring-cloud-stream 用於控制繫結名稱的命名約定和配置元素。
如果您的繫結名稱包含特殊字元,例如 . 字元,則需要用括號 ([]) 將繫結鍵括起來,然後用引號括起來。例如 spring.cloud.stream.bindings."[my.output.binding.key]".destination。 |
函式式繫結名稱
與 Spring Cloud Stream 早期版本中使用的基於註解(傳統)的顯式命名不同,函數語言程式設計模型在繫結名稱方面預設採用簡單的約定,從而大大簡化了應用程式配置。讓我們看第一個示例
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
在前面的示例中,我們有一個應用程式,其中包含一個作為訊息處理程式的單個函式。作為一個 Function,它有一個輸入和輸出。用於命名輸入和輸出繫結的命名約定如下
-
輸入 -
<functionName> + -in- + <index> -
輸出 -
<functionName> + -out- + <index>
in 和 out 對應於繫結的型別(例如輸入或輸出)。index 是輸入或輸出繫結的索引。對於典型的單輸入/輸出函式,它始終為 0,因此僅與具有多個輸入和輸出引數的函式相關。
因此,例如,如果您想將此函式的輸入對映到名為“my-topic”的遠端目標(例如主題、佇列等),您可以使用以下屬性完成此操作
--spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic
請注意 uppercase-in-0 如何用作屬性名中的一個部分。uppercase-out-0 也是如此。
描述性繫結名稱
有時為了提高可讀性,您可能希望為繫結提供更具描述性的名稱(例如“account”、“orders”等)。另一種看待它的方式是,您可以將隱式繫結名稱對映到顯式繫結名稱。您可以使用 spring.cloud.stream.function.bindings.<binding-name> 屬性來完成此操作。此屬性還為依賴自定義基於介面的繫結(需要顯式名稱)的現有應用程式提供了遷移路徑。
例如,
--spring.cloud.stream.function.bindings.uppercase-in-0=input
在前面的示例中,您對映並有效地將 uppercase-in-0 繫結名稱重新命名為 input。現在所有配置屬性都可以引用 input 繫結名稱(例如,--spring.cloud.stream.bindings.input.destination=my-topic)。
儘管描述性繫結名稱可以增強配置的可讀性方面,但它們也透過將隱式繫結名稱對映到顯式繫結名稱而建立了另一層誤導。並且由於所有後續配置屬性都將使用顯式繫結名稱,因此您必須始終引用此“bindings”屬性以關聯它實際對應哪個函式。我們認為對於大多數情況(函式組合除外),這可能有些過度,因此,我們建議完全避免使用它,特別是因為不使用它可以在繫結器目標和繫結名稱之間提供清晰的路徑,例如 spring.cloud.stream.bindings.uppercase-in-0.destination=sample-topic,在這裡您清楚地將 uppercase 函式的輸入與 sample-topic 目標關聯起來。 |
有關屬性和其他配置選項的更多資訊,請參閱 配置選項 部分。
顯式繫結建立
在上一節中,我們解釋瞭如何根據應用程式提供的 Function、Supplier 或 Consumer bean 的名稱隱式建立繫結。但是,有時您可能需要顯式建立繫結,其中繫結不與任何函式相關聯。這通常是為了支援透過 StreamBridge 與其他框架整合而完成的。
Spring Cloud Stream 允許您透過 spring.cloud.stream.input-bindings 和 spring.cloud.stream.output-bindings 屬性顯式定義輸入和輸出繫結。請注意屬性名稱中的複數形式,允許您透過簡單地使用 ; 作為分隔符來定義多個繫結。只需將以下測試用例作為示例
@Test
public void testExplicitBindings() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class))
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false",
"--spring.cloud.stream.input-bindings=fooin;barin",
"--spring.cloud.stream.output-bindings=fooout;barout")) {
. . .
}
}
@EnableAutoConfiguration
@Configuration
public static class EmptyConfiguration {
}
如您所見,我們聲明瞭兩個輸入繫結和兩個輸出繫結,而我們的配置中沒有定義任何函式,但我們仍然能夠成功建立這些繫結並訪問其相應的通道。
生產和消費訊息
您可以透過簡單地編寫函式並將它們作為 @Bean 公開來編寫 Spring Cloud Stream 應用程式。您還可以使用基於 Spring Integration 註解的配置或基於 Spring Cloud Stream 註解的配置,儘管從 spring-cloud-stream 3.x 開始,我們建議使用函式式實現。
Spring Cloud Function 支援
概述
自 Spring Cloud Stream v2.1 以來,定義流處理程式和源的另一種替代方法是使用對 Spring Cloud Function 的內建支援,其中它們可以表示為 java.util.function.[Supplier/Function/Consumer] 型別的 bean。
要指定哪個函式式 bean 繫結到由繫結公開的外部目標,您必須提供 spring.cloud.function.definition 屬性。
如果您只有一個 java.util.function.[Supplier/Function/Consumer] 型別的 bean,則可以跳過 spring.cloud.function.definition 屬性,因為此類函式式 bean 將被自動發現。但是,使用此類屬性以避免任何混淆被認為是最佳實踐。有時,這種自動發現可能會妨礙,因為單個 java.util.function.[Supplier/Function/Consumer] 型別的 bean 可能存在用於處理訊息以外的目的,但由於是單個,它會被自動發現和自動繫結。對於這些罕見的場景,您可以透過提供 spring.cloud.stream.function.autodetect 屬性並將其值設定為 false 來停用自動發現。 |
這是應用程式公開訊息處理程式作為 java.util.function.Function 的示例,透過充當資料的消費者和生產者,有效地支援直通語義。
@SpringBootApplication
public class MyFunctionBootApp {
public static void main(String[] args) {
SpringApplication.run(MyFunctionBootApp.class);
}
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
}
在前面的示例中,我們定義了一個名為 toUpperCase 的 java.util.function.Function 型別的 bean,作為訊息處理程式,其“input”和“output”必須繫結到由提供的目標繫結器公開的外部目標。預設情況下,“input”和“output”繫結名稱將為 toUpperCase-in-0 和 toUpperCase-out-0。有關用於建立繫結名稱的命名約定的詳細資訊,請參閱 函式式繫結名稱 部分。
以下是支援其他語義的簡單函式式應用程式示例
以下是公開為 java.util.function.Supplier 的源語義的示例
@SpringBootApplication
public static class SourceFromSupplier {
@Bean
public Supplier<Date> date() {
return () -> new Date(12345L);
}
}
以下是公開為 java.util.function.Consumer 的接收器語義的示例
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Consumer<String> sink() {
return System.out::println;
}
}
供應器(Sources)
Function 和 Consumer 在其呼叫觸發方式上非常簡單。它們根據傳送到它們繫結的目標的資料(事件)觸發。換句話說,它們是經典的事件驅動元件。
然而,Supplier 在觸發方面屬於其自己的類別。由於它本質上是資料的來源(起點),因此它不訂閱任何入站目標,因此必須由其他機制觸發。此外,還有一個關於 Supplier 實現的問題,它可能是命令式或響應式,這直接關係到此類供應器的觸發。
考慮以下示例
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<String> stringSupplier() {
return () -> "Hello from Supplier";
}
}
每當呼叫其 get() 方法時,前面的 Supplier bean 都會生成一個字串。但是,誰呼叫此方法,以及多久呼叫一次?該框架提供了一個預設的輪詢機制(回答“誰?”的問題),它將觸發供應商的呼叫,並且預設情況下每秒執行一次(回答“多久?”的問題)。換句話說,上述配置每秒生成一條訊息,每條訊息都會發送到由繫結器公開的 output 目標。要了解如何自定義輪詢機制,請參閱 輪詢配置屬性 部分。
考慮一個不同的例子
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
}
前面的 Supplier bean 採用了響應式程式設計風格。通常,與命令式供應商不同,它應該只觸發一次,因為呼叫其 get() 方法會產生(供應)連續的訊息流,而不是單個訊息。
該框架識別程式設計風格的差異,並保證此類供應商只觸發一次。
然而,想象一下這樣的用例:您希望輪詢某個資料來源並返回一個表示結果集的有限資料流。響應式程式設計風格是此類 Supplier 的完美機制。然而,鑑於生成流的有限性質,此類 Supplier 仍然需要定期呼叫。
考慮以下示例,它透過生成有限資料流來模擬此類用例
@SpringBootApplication
public static class SupplierConfiguration {
@PollableBean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.just("hello", "bye");
}
}
bean 本身用 PollableBean 註解(@Bean 的子集)進行註解,從而向框架發出訊號,表示儘管此類供應商的實現是響應式的,但它仍然需要輪詢。
PollableBean 中定義了一個 splittable 屬性,它向此註解的後處理器發出訊號,表示由註解元件產生的結果必須拆分,並且預設設定為 true。這意味著框架將拆分返回的,將每個項作為單獨的訊息傳送出去。如果這不是期望的行為,您可以將其設定為 false,此時此類供應商將簡單地返回生成的 Flux 而不進行拆分。 |
供應器和執行緒
正如您現在所瞭解的,與由事件觸發(它們有輸入資料)的 Function 和 Consumer 不同,Supplier 沒有任何輸入,因此由不同的機制——輪詢器觸發,它可能具有不可預測的執行緒機制。雖然執行緒機制的細節在大多數情況下與函式的下游執行無關,但在某些情況下可能會出現問題,特別是與可能對執行緒親和性有特定期望的整合框架。例如,依賴於儲存線上程區域性中的跟蹤資料的 Spring Cloud Sleuth。對於這些情況,我們透過 StreamBridge 提供了另一種機制,使用者可以更好地控制執行緒機制。您可以在 向輸出傳送任意資料(例如,外部事件驅動源) 部分獲取更多詳細資訊。 |
消費者(Reactive)
響應式 Consumer 有點特殊,因為它具有 void 返回型別,導致框架沒有可訂閱的引用。您很可能不需要編寫 Consumer<Flux<?>>,而是將其編寫為 Function<Flux<?>, Mono<Void>>,將 then 運算子作為流上的最後一個運算子呼叫。
例如:
public Function<Flux<?>, Mono<Void>> consumer() {
return flux -> flux.map(..).filter(..).then();
}
但是,如果您確實需要編寫顯式的 Consumer<Flux<?>>,請記住訂閱傳入的 Flux。
此外,請記住,在混合響應式和命令式函式時,函式組合也適用相同的規則。Spring Cloud Function 確實支援將響應式函式與命令式函式組合,但是您必須注意某些限制。例如,假設您已經將響應式函式與命令式消費者組合。這種組合的結果是一個響應式 Consumer。然而,如本節前面所討論的,沒有辦法訂閱此類消費者,因此此限制只能透過使您的消費者響應式並手動訂閱(如前所述)或將您的函式更改為命令式來解決。
輪詢配置屬性
Spring Cloud Stream 公開了以下屬性,並以 spring.integration.poller. 為字首
- fixedDelay
-
預設輪詢器的固定延遲,單位為毫秒。
預設值:1000L。
- maxMessagesPerPoll
-
預設輪詢器每次輪詢事件的最大訊息數。
預設值:1L。
- cron
-
Cron Trigger 的 Cron 表示式值。
預設值:無。
- initialDelay
-
週期性觸發器的初始延遲。
預設值:0。
- timeUnit
-
應用於延遲值的時間單位。
預設值:MILLISECONDS。
例如 --spring.integration.poller.fixed-delay=2000 將輪詢器間隔設定為每兩秒輪詢一次。
每繫結輪詢配置
上一節展示瞭如何配置一個將應用於所有繫結的單個預設輪詢器。雖然它非常適合微服務的模型,即每個微服務代表一個元件(例如 Supplier),因此預設輪詢器配置就足夠了,但存在一些特殊情況,您可能需要幾個需要不同輪詢配置的元件
對於這種情況,請使用每繫結方式配置輪詢器。例如,假設您有一個輸出繫結 supply-out-0。在這種情況下,您可以使用 spring.cloud.stream.bindings.supply-out-0.producer.poller.. 字首來配置此類繫結的輪詢器(例如,spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000)。
向輸出傳送任意資料(例如,外部事件驅動源)
在某些情況下,資料的實際來源可能來自非繫結器的外部(外部)系統。例如,資料來源可能是一個經典的 REST 端點。我們如何將此類來源與 spring-cloud-stream 使用的功能機制連線起來?
Spring Cloud Stream 提供了兩種機制,所以讓我們更詳細地瞭解它們
在這裡,對於這兩個示例,我們將使用一個名為 delegateToSupplier 的標準 MVC 端點方法,繫結到根 Web 上下文,透過 StreamBridge 機制將傳入請求委託給流。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("toStream", body);
}
}
這裡我們自動裝配一個 StreamBridge bean,它允許我們將資料傳送到輸出繫結,有效地將非流應用程式與 Spring Cloud Stream 連線起來。請注意,前面的示例沒有定義任何源函式(例如,Supplier bean),使得框架無法提前建立源繫結,這在配置包含函式 bean 的情況下很常見。這沒關係,因為 StreamBridge 將在第一次呼叫其 send(..) 操作時為不存在的繫結啟動輸出繫結的建立(以及必要的目的地自動配置),並將其快取以供後續重用(有關詳細資訊,請參閱 StreamBridge 和動態目的地)。
但是,如果您想在初始化(啟動)時預先建立輸出繫結,您可以利用 spring.cloud.stream.output-bindings 屬性來宣告您的源名稱。提供的名稱將用作觸發器來建立源繫結。您可以使用 ; 表示多個源(多個輸出繫結)(例如,--spring.cloud.stream.output-bindings=foo;bar)
此外,請注意 streamBridge.send(..) 方法接受 Object 作為資料。這意味著您可以向其傳送 POJO 或 Message,並且它在傳送輸出時將經歷與任何 Function 或 Supplier 相同的例程,提供與函式相同的一致性級別。這意味著輸出型別轉換、分割槽等將像函式產生的輸出一樣得到遵守。
StreamBridge 和動態目的地
StreamBridge 也可以用於提前不知道輸出目的地的情況,類似於 從消費者路由 部分中描述的用例。
讓我們來看一個例子
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, args);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("myDestination", body);
}
}
如您所見,前面的示例與前一個非常相似,只是沒有透過 spring.cloud.stream.output-bindings 屬性提供顯式繫結指令。在這裡,我們將資料傳送到 myDestination 名稱,該名稱不存在作為繫結。因此,此類名稱將被視為動態目的地,如 從消費者路由 部分所述。
在前面的示例中,我們使用 ApplicationRunner 作為外部源來為流提供資料。
一個更實際的例子,其中外部源是 REST 端點。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
streamBridge.send("myBinding", body);
}
}
如您所見,在 delegateToSupplier 方法內部,我們使用 StreamBridge 將資料傳送到 myBinding 繫結。在這裡,您也受益於 StreamBridge 的動態功能,如果 myBinding 不存在,它將自動建立並快取,否則將使用現有繫結。
快取動態目的地(繫結)可能會導致記憶體洩漏,如果動態目的地很多。為了在一定程度上進行控制,我們為輸出繫結提供了自清除快取機制,預設快取大小為 10。這意味著如果您的動態目的地大小超過該數字,則可能會逐出現有繫結,從而需要重新建立,這可能會導致輕微的效能下降。您可以透過 spring.cloud.stream.dynamic-destination-cache-size 屬性將其設定為所需值來增加快取大小。 |
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" https://:8080/
透過展示兩個示例,我們強調該方法適用於任何型別的外部源。
如果您正在使用 Solace PubSub+ 繫結器,Spring Cloud Stream 已保留了 scst_targetDestination 標頭(可透過 BinderHeaders.TARGET_DESTINATION 獲取),該標頭允許訊息從其繫結配置的目的地重定向到此標頭指定的目標目的地。這允許繫結器管理釋出到動態目的地所需的資源,從而減輕了框架的負擔,並避免了前面注意事項中提到的快取問題。更多資訊請參見此處。 |
StreamBridge 的輸出內容型別
您還可以透過以下方法簽名 public boolean send(String bindingName, Object data, MimeType outputContentType) 提供特定的內容型別。或者,如果您將資料作為 Message 傳送,則其內容型別將得到遵守。
使用 StreamBridge 的特定繫結器型別
Spring Cloud Stream 支援多種繫結器場景。例如,您可能正在從 Kafka 接收資料並將其傳送到 RabbitMQ。
如果您打算使用 StreamBridge 並且您的應用程式中配置了多個繫結器,您還必須告訴 StreamBridge 使用哪個繫結器。為此,send 方法還有另外兩種變體
public boolean send(String bindingName, @Nullable String binderType, Object data)
public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)
如您所見,您可以提供一個額外的引數 - binderType,告訴 BindingService 在建立動態繫結時使用哪個繫結器。
對於使用 spring.cloud.stream.output-bindings 屬性或繫結已在不同繫結器下建立的情況,binderType 引數將不起作用。 |
使用 StreamBridge 的通道攔截器
由於 StreamBridge 使用 MessageChannel 建立輸出繫結,因此在透過 StreamBridge 傳送資料時可以啟用通道攔截器。由應用程式決定將哪些通道攔截器應用於 StreamBridge。Spring Cloud Stream 不會將檢測到的所有通道攔截器注入到 StreamBridge 中,除非它們用 @GlobalChannelInterceptor(patterns = "*") 進行註解。
讓我們假設您的應用程式中有以下兩個不同的 StreamBridge 繫結。
streamBridge.send("foo-out-0", message);
和
streamBridge.send("bar-out-0", message);
現在,如果您想在兩個 StreamBridge 繫結上都應用通道攔截器,那麼您可以宣告以下 GlobalChannelInterceptor bean。
@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
然而,如果您不喜歡上述全域性方法,並且希望為每個繫結提供專用攔截器,那麼您可以執行以下操作。
@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
和
@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
您可以靈活地使模式更嚴格或根據您的業務需求進行定製。
透過這種方法,應用程式能夠決定將哪些攔截器注入到 StreamBridge 中,而不是應用所有可用的攔截器。
StreamBridge 透過 StreamOperations 介面提供了一個契約,該介面包含 StreamBridge 的所有 send 方法。因此,應用程式可以選擇使用 StreamOperations 進行自動裝配。這在透過為 StreamOperations 介面提供模擬或類似機制來對使用 StreamBridge 的程式碼進行單元測試時非常方便。 |
響應式函式支援
由於 Spring Cloud Function 是構建在 Project Reactor 之上的,因此在實現 Supplier、Function 或 Consumer 時,您無需做太多即可受益於響應式程式設計模型。
例如:
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
return flux -> flux.map(val -> val.toUpperCase());
}
}
|
在選擇響應式或指令式程式設計模型時,必須理解幾個重要事項。 完全響應式還是僅 API? 使用響應式 API 不一定意味著您可以受益於此類 API 的所有響應式功能。換句話說,像背壓和其他高階功能只有在與相容系統(例如 Reactive Kafka 繫結器)一起使用時才能工作。如果您使用常規 Kafka 或 Rabbit 或任何其他非響應式繫結器,您只能從響應式 API 本身的便利性中受益,而不是其高階功能,因為流的實際源或目標不是響應式的。 錯誤處理和重試 在本手冊中,您將看到一些關於基於框架的錯誤處理、重試和其他功能以及與其相關的配置屬性的引用。重要的是要理解它們隻影響命令式函式,並且您不應期望響應式函式具有相同的行為。原因如下:響應式函式與命令式函式之間存在根本區別。命令式函式是一個訊息處理程式,它由框架在接收到每條訊息時呼叫。因此,對於 N 條訊息,將有 N 次此類函式的呼叫,正因為如此,我們可以封裝此類函式並新增額外的功能,例如錯誤處理、重試等。響應式函式是初始化函式。它只調用一次,以獲取使用者提供的 Flux/Mono 的引用,以便與框架提供的 Flux/Mono 連線。之後,我們(框架)對流完全沒有可見性或控制。因此,對於響應式函式,您必須依靠響應式 API 的豐富性來進行錯誤處理和重試(即 |
函式組合
使用函數語言程式設計模型,您還可以受益於函式式組合,透過它您可以從一組簡單函式動態組合複雜的處理程式。舉例來說,讓我們將以下函式 bean 新增到上面定義的應用程式中
@Bean
public Function<String, String> wrapInQuotes() {
return s -> "\"" + s + "\"";
}
並修改 spring.cloud.function.definition 屬性以反映您組合新函式(由“toUpperCase”和“wrapInQuotes”組成)的意圖。為此,Spring Cloud Function 依賴於 |(管道)符號。因此,為了完成我們的示例,我們的屬性現在將如下所示
--spring.cloud.function.definition=toUpperCase|wrapInQuotes
| Spring Cloud Function 提供的函式組合支援的最大好處之一是您可以組合響應式和命令式函式。 |
組合的結果是一個單一函式,正如你可能猜到的那樣,它可能有一個很長且相當神秘的名稱(例如,foo|bar|baz|xyz. . .),這在處理其他配置屬性時會帶來很大的不便。這就是 函式式繫結名稱 部分中描述的描述性繫結名稱功能可以提供幫助的地方。
例如,如果我們想給 toUpperCase|wrapInQuotes 一個更具描述性的名稱,我們可以使用以下屬性 spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput 來實現,允許其他配置屬性引用該繫結名稱(例如,spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination)。
函式組合與橫切關注點
函式組合有效地允許您透過將其分解為一組簡單且可單獨管理/測試的元件來解決複雜性,這些元件在執行時仍然可以表示為一個。但這並非唯一的好處。
您還可以使用組合來處理某些橫切非功能性關注點,例如內容豐富。例如,假設您有一個傳入訊息可能缺少某些標頭,或者某些標頭不符合您的業務函式期望的精確狀態。您現在可以實現一個單獨的函式來解決這些關注點,然後將其與主要業務函式組合起來。
讓我們來看一個例子
@SpringBootApplication
public class DemoStreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemoStreamApplication.class,
"--spring.cloud.function.definition=enrich|echo",
"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
"--spring.cloud.stream.bindings.input.destination=myDestination",
"--spring.cloud.stream.bindings.input.group=myGroup");
}
@Bean
public Function<Message<String>, Message<String>> enrich() {
return message -> {
Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
};
}
@Bean
public Function<Message<String>, Message<String>> echo() {
return message -> {
Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
System.out.println("Incoming message " + message);
return message;
};
}
}
雖然微不足道,但此示例演示了一個函式如何透過附加標頭(非功能性關注點)豐富傳入訊息,以便另一個函式(echo)可以從中受益。echo 函式保持簡潔,只關注業務邏輯。您還可以看到 spring.cloud.stream.function.bindings 屬性的使用,以簡化組合繫結名稱。
具有多個輸入和輸出引數的函式
從 3.0 版本開始,spring-cloud-stream 支援具有多個輸入和/或多個輸出(返回值)的函式。這到底意味著什麼,它針對哪些用例?
-
大資料:想象您正在處理的資料來源是高度無組織的,包含各種型別的資料元素(例如訂單、事務等),並且您需要有效地對其進行排序。
-
資料聚合:另一個用例可能需要您合併來自 2 個或更多傳入_流_的資料元素.
上述只是描述了您可能需要使用單個函式來接受和/或生成多個資料流的一些用例。這正是我們在此處關注的用例型別。
此外,請注意此處對流概念的略微不同強調。假設這些函式只有在它們被賦予訪問實際資料流(而不是單個元素)的許可權時才有價值。為此,我們依賴於 Project Reactor 提供的抽象(即 Flux 和 Mono),這些抽象作為 spring-cloud-functions 引入的依賴項的一部分已在類路徑上可用。
另一個重要的方面是多個輸入和輸出的表示。雖然 Java 提供了各種不同的抽象來表示多個事物,但這些抽象是 a) 無界的、b) 缺乏元數 和 c) 缺乏型別資訊,這些在上下文中都很重要。舉例來說,讓我們看看 Collection 或陣列,它們只允許我們描述單個型別的多個,或者將所有內容向上轉換為 Object,從而影響 spring-cloud-stream 的透明型別轉換功能等等。
因此,為了滿足所有這些要求,最初的支援依賴於使用 Project Reactor 提供的另一個抽象——元組的簽名。但是,我們正在努力允許更靈活的簽名。
| 請參閱 繫結和繫結名稱 部分,以瞭解此類應用程式使用的建立繫結名稱的命名約定。 |
讓我們看幾個示例
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
return tuple -> {
Flux<String> stringStream = tuple.getT1();
Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
return Flux.merge(stringStream, intStream);
};
}
}
上述示例演示了一個函式,它接受兩個輸入(第一個是 String 型別,第二個是 Integer 型別),並生成一個 String 型別的輸出。
因此,對於上述示例,兩個輸入繫結將是 gather-in-0 和 gather-in-1,為了保持一致性,輸出繫結也遵循相同的約定,並命名為 gather-out-0。
瞭解這些將允許您設定繫結特定的屬性。例如,以下內容將覆蓋gather-in-0繫結的內容型別
--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {
@Bean
public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
return flux -> {
Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
UnicastProcessor even = UnicastProcessor.create();
UnicastProcessor odd = UnicastProcessor.create();
Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));
return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
};
}
}
上述示例與前一個示例有些相反,它演示了一個函式,該函式接收一個Integer型別的輸入,併產生兩個輸出(都為String型別)。
因此,對於上述示例,輸入繫結是scatter-in-0,輸出繫結是scatter-out-0和scatter-out-1。
您可以使用以下程式碼進行測試
@Test
public void testSingleInputMultiOutput() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
SampleApplication.class))
.run("--spring.cloud.function.definition=scatter")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
for (int i = 0; i < 10; i++) {
inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
}
int counter = 0;
for (int i = 0; i < 5; i++) {
Message<byte[]> even = outputDestination.receive(0, 0);
assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
Message<byte[]> odd = outputDestination.receive(0, 1);
assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
}
}
}
單個應用程式中的多個函式
可能還需要在單個應用程式中對多個訊息處理程式進行分組。您可以透過定義多個函式來實現。
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> reverse() {
return value -> new StringBuilder(value).reverse().toString();
}
}
在上述示例中,我們有一個定義了兩個函式uppercase和reverse的配置。因此,首先,如前所述,我們需要注意到存在衝突(多於一個函式),因此我們需要透過提供指向我們想要繫結的實際函式的spring.cloud.function.definition屬性來解決它。但是在這裡,我們將使用;分隔符來指向這兩個函式(參見下面的測試用例)。
| 與具有多個輸入/輸出的函式一樣,請參閱繫結和繫結名稱部分,以瞭解此類應用程式使用的繫結名稱的命名約定。 |
您可以使用以下程式碼進行測試
@Test
public void testMultipleFunctions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
ReactiveFunctionConfiguration.class))
.run("--spring.cloud.function.definition=uppercase;reverse")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage, "uppercase-in-0");
inputDestination.send(inputMessage, "reverse-in-0");
Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());
outputMessage = outputDestination.receive(0, "reverse-out-1");
assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
}
}
批次消費者
當使用支援批次偵聽器且為消費者繫結啟用了該功能的MessageChannelBinder時,您可以將spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode設定為true,以將整個訊息批次以List形式傳遞給函式。
@Bean
public Function<List<Person>, Person> findFirstPerson() {
return persons -> persons.get(0);
}
批次生產者
您也可以在生產者端使用批處理的概念,透過返回一個訊息集合,這實際上提供了相反的效果,即集合中的每條訊息都將由繫結器單獨傳送。
考慮以下函式
@Bean
public Function<String, List<Message<String>>> batch() {
return p -> {
List<Message<String>> list = new ArrayList<>();
list.add(MessageBuilder.withPayload(p + ":1").build());
list.add(MessageBuilder.withPayload(p + ":2").build());
list.add(MessageBuilder.withPayload(p + ":3").build());
list.add(MessageBuilder.withPayload(p + ":4").build());
return list;
};
}
返回列表中的每條訊息都將單獨傳送,導致四條訊息傳送到輸出目的地。
Spring Integration 流作為函式
當您實現一個函式時,您可能有一些複雜的請求,這些請求符合企業整合模式(EIP)的類別。這些請求最好透過使用像Spring Integration(SI)這樣的框架來處理,它是EIP的參考實現。
幸運的是,SI 已經透過 整合流作為閘道器 提供了將整合流作為函式公開的支援。考慮以下示例
@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {
public static void main(String[] args) {
SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
}
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows.from(MessageFunction.class, "uppercase")
.<String, String>transform(String::toUpperCase)
.logAndReply(LoggingHandler.Level.WARN);
}
public interface MessageFunction extends Function<Message<String>, Message<String>> {
}
}
對於熟悉 SI 的人來說,您可以看到我們定義了一個 IntegrationFlow 型別的 bean,其中我們聲明瞭一個要公開為 Function<String, String>(使用 SI DSL)的整合流,名為 uppercase。MessageFunction 介面允許我們明確宣告輸入和輸出的型別,以進行正確的型別轉換。有關型別轉換的更多資訊,請參閱內容型別協商部分。
要接收原始輸入,您可以使用from(Function.class, …)。
生成的函式繫結到目標繫結器公開的輸入和輸出目的地。
| 請參閱 繫結和繫結名稱 部分,以瞭解此類應用程式使用的建立繫結名稱的命名約定。 |
有關 Spring Integration 和 Spring Cloud Stream 之間互操作性的更多詳細資訊,特別是關於函數語言程式設計模型,您可能會發現這篇文章非常有趣,因為它更深入地探討了透過融合 Spring Integration 和 Spring Cloud Stream/Functions 的優點可以應用的各種模式。
使用輪詢消費者
概述
當使用輪詢消費者時,您按需輪詢PollableMessageSource。要為輪詢消費者定義繫結,您需要提供spring.cloud.stream.pollable-source屬性。
考慮以下輪詢消費者繫結的示例
--spring.cloud.stream.pollable-source=myDestination
在前面的例子中,輪詢源名稱myDestination將導致myDestination-in-0繫結名稱,以與函數語言程式設計模型保持一致。
鑑於前面示例中的輪詢消費者,您可能會按如下方式使用它
@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
return args -> {
while (someCondition()) {
try {
if (!destIn.poll(m -> {
String newPayload = ((String) m.getPayload()).toUpperCase();
destOut.send(new GenericMessage<>(newPayload));
})) {
Thread.sleep(1000);
}
}
catch (Exception e) {
// handle failure
}
}
};
}
一種更少手動且更像 Spring 的替代方案是配置一個排程任務 bean。例如,
@Scheduled(fixedDelay = 5_000)
public void poll() {
System.out.println("Polling...");
this.source.poll(m -> {
System.out.println(m.getPayload());
}, new ParameterizedTypeReference<Foo>() { });
}
PollableMessageSource.poll()方法接受一個MessageHandler引數(通常是lambda表示式,如這裡所示)。如果訊息被接收併成功處理,它返回true。
與訊息驅動的消費者一樣,如果MessageHandler丟擲異常,訊息將釋出到錯誤通道,如錯誤處理中所述。
通常,poll()方法在MessageHandler退出時確認訊息。如果該方法異常退出,訊息將被拒絕(不重新排隊),但請參閱處理錯誤。您可以透過負責確認來覆蓋該行為,如以下示例所示
@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
return args -> {
while (someCondition()) {
if (!dest1In.poll(m -> {
StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
// e.g. hand off to another thread which can perform the ack
// or acknowledge(Status.REQUEUE)
})) {
Thread.sleep(1000);
}
}
};
}
您必須在某個時刻ack(或nack)訊息,以避免資源洩漏。 |
一些訊息系統(例如 Apache Kafka)維護一個簡單的日誌偏移量。如果交付失敗並使用StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE);重新排隊,任何後續成功確認的訊息都將被重新交付。 |
還有一個過載的poll方法,其定義如下
poll(MessageHandler handler, ParameterizedTypeReference<?> type)
type是一個轉換提示,允許對傳入訊息的有效負載進行轉換,如下例所示
boolean result = pollableSource.poll(received -> {
Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
...
}, new ParameterizedTypeReference<Map<String, Foo>>() {});
處理錯誤
預設情況下,輪詢源配置了一個錯誤通道;如果回撥丟擲異常,一個ErrorMessage將被髮送到錯誤通道(<destination>.<group>.errors);此錯誤通道也橋接到全域性 Spring Integration errorChannel。
您可以使用@ServiceActivator訂閱任何一個錯誤通道來處理錯誤;如果沒有訂閱,錯誤將簡單地記錄下來,訊息將被確認為成功。如果錯誤通道服務啟用器丟擲異常,訊息將被拒絕(預設情況下)並且不會重新發送。如果服務啟用器丟擲RequeueCurrentMessageException,訊息將在代理處重新排隊,並在後續的輪詢中再次檢索。
如果監聽器直接丟擲RequeueCurrentMessageException,訊息將被重新排隊,如上所述,並且不會發送到錯誤通道。
事件路由
在 Spring Cloud Stream 的上下文中,事件路由是指a) 將事件路由到特定的事件訂閱者或b) 將事件訂閱者生成的事件路由到特定的目的地的能力。這裡我們將其稱為“路由到”和“路由來自”。
路由到消費者
透過依賴 Spring Cloud Function 3.0 中提供的 RoutingFunction 可以實現路由。您所需要做的就是透過 --spring.cloud.stream.function.routing.enabled=true 應用程式屬性啟用它,或者提供 spring.cloud.function.routing-expression 屬性。一旦啟用,RoutingFunction 將繫結到輸入目的地,接收所有訊息並根據提供的指令將它們路由到其他函式。
為了繫結的目的,路由目的地的名稱是functionRouter-in-0(參見RoutingFunction.FUNCTION_NAME和繫結命名約定函式繫結名稱)。 |
指令可以透過單獨的訊息和應用程式屬性提供。
這裡有幾個例子
使用訊息頭
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class,
"--spring.cloud.stream.function.routing.enabled=true");
}
@Bean
public Consumer<String> even() {
return value -> {
System.out.println("EVEN: " + value);
};
}
@Bean
public Consumer<String> odd() {
return value -> {
System.out.println("ODD: " + value);
};
}
}
透過向繫結器公開的functionRouter-in-0目的地(即 rabbit, kafka)傳送訊息,該訊息將被路由到適當的(“even”或“odd”)消費者。
預設情況下,RoutingFunction將查詢spring.cloud.function.definition或spring.cloud.function.routing-expression(對於具有 SpEL 的更動態場景)頭部,如果找到,其值將被視為路由指令。
例如,將spring.cloud.function.routing-expression頭設定為值T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd'將最終半隨機地將請求路由到odd或even函式。此外,對於 SpEL,評估上下文的根物件是Message,因此您也可以對單個頭(或訊息)進行評估,例如….routing-expression=headers['type']
使用應用程式屬性
spring.cloud.function.routing-expression和/或spring.cloud.function.definition可以作為應用程式屬性傳遞(例如,spring.cloud.function.routing-expression=headers['type'])。
@SpringBootApplication
public class RoutingStreamApplication {
public static void main(String[] args) {
SpringApplication.run(RoutingStreamApplication.class,
"--spring.cloud.function.routing-expression="
+ "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
}
@Bean
public Consumer<Integer> even() {
return value -> System.out.println("EVEN: " + value);
}
@Bean
public Consumer<Integer> odd() {
return value -> System.out.println("ODD: " + value);
}
}
| 透過應用程式屬性傳遞指令對於響應式函式尤其重要,因為響應式函式只調用一次以傳遞Publisher,因此對單個專案的訪問受到限制。 |
路由函式和輸出繫結
RoutingFunction是一個Function,因此與其他任何函式都沒有什麼不同。嗯…幾乎沒有。
當RoutingFunction路由到另一個Function時,其輸出將按預期傳送到RoutingFunction的輸出繫結,即functionRouter-in-0。但是如果RoutingFunction路由到Consumer呢?換句話說,呼叫RoutingFunction的結果可能不會產生任何內容傳送到輸出繫結,因此甚至可能不需要輸出繫結。因此,我們在建立繫結時對待RoutingFunction有點不同。儘管作為使用者您是透明的(您實際上什麼都不需要做),但瞭解一些機制將有助於您理解其內部工作原理。
所以,規則是:我們從不為RoutingFunction建立輸出繫結,只建立輸入繫結。因此,當您路由到Consumer時,RoutingFunction透過沒有輸出繫結而有效地成為一個Consumer。但是,如果RoutingFunction碰巧路由到另一個生成輸出的Function,則會動態建立RoutingFunction的輸出繫結,此時RoutingFunction將像常規Function一樣,擁有輸入和輸出繫結。
路由來自消費者
除了靜態目的地,Spring Cloud Stream 還允許應用程式將訊息傳送到動態繫結的目的地。這在例如需要在執行時確定目標目的地時非常有用。應用程式可以透過兩種方式實現這一點。
spring.cloud.stream.sendto.destination
您還可以透過指定spring.cloud.stream.sendto.destination頭並將其設定為要解析的目標名稱,將解析輸出目標的任務委託給框架。
考慮以下示例
@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {
@Bean
public Function<String, Message<String>> destinationAsPayload() {
return value -> {
return MessageBuilder.withPayload(value)
.setHeader("spring.cloud.stream.sendto.destination", value).build();};
}
}
儘管微不足道,您在此示例中可以清楚地看到,我們的輸出是一個訊息,其spring.cloud.stream.sendto.destination頭設定為輸入引數的值。框架將查詢此頭,並嘗試建立或發現具有該名稱的目標並向其傳送輸出。
如果目標名稱是事先已知的,您可以像配置任何其他目標一樣配置生產者屬性。或者,如果您註冊一個NewDestinationBindingCallback<> bean,它會在繫結建立之前被呼叫。回撥接受繫結器使用的擴充套件生產者屬性的通用型別。它有一個方法
void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
T extendedProducerProperties);
以下示例展示瞭如何使用 RabbitMQ 繫結器
@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
return (name, channel, props, extended) -> {
props.setRequiredGroups("bindThisQueue");
extended.setQueueNameGroupOnly(true);
extended.setAutoBindDlq(true);
extended.setDeadLetterQueueName("myDLQ");
};
}
如果需要支援多種繫結器型別的動態目標,請使用Object作為泛型型別,並根據需要轉換extended引數。 |
另外,請參閱[使用 StreamBridge]部分,瞭解StreamBridge如何用於類似情況的另一個選項。
訊息傳送後的後處理
函式被呼叫後,其結果由框架傳送到目標目的地,這有效地完成了函式呼叫週期。
然而,從業務角度來看,在完成此週期**之後**執行一些額外任務之前,此類週期可能不完全完成。雖然這可以透過簡單的Consumer和StreamBridge組合來實現,如Stack Overflow 帖子中所述,但自 4.0.3 版本以來,該框架提供了一種更慣用的方法來解決此問題,即透過 Spring Cloud Function 專案提供的PostProcessingFunction。PostProcessingFunction是一個特殊的半標記函式,它包含一個附加方法postProcess(Message>),旨在為實現此類後處理任務提供一個位置。
package org.springframework.cloud.function.context
. . .
public interface PostProcessingFunction<I, O> extends Function<I, O> {
default void postProcess(Message<O> result) {
}
}
所以,現在你有兩個選擇。
選項 1:您可以將您的函式實現為 PostProcessingFunction,並透過實現其 postProcess(Message>) 方法來包含額外的後處理行為。
private static class Uppercase implements PostProcessingFunction<String, String> {
@Override
public String apply(String input) {
return input.toUpperCase();
}
@Override
public void postProcess(Message<String> result) {
System.out.println("Function Uppercase has been successfully invoked and its result successfully sent to target destination");
}
}
. . .
@Bean
public Function<String, String> uppercase() {
return new Uppercase();
}
選項 2:如果您已經有一個現有函式,並且不想更改其實現,或者希望將您的函式保留為 POJO,您只需實現 postProcess(Message>) 方法,並將此新的後處理函式與您的其他函式組合。
private static class Logger implements PostProcessingFunction<?, String> {
@Override
public void postProcess(Message<String> result) {
System.out.println("Function has been successfully invoked and its result successfully sent to target destination");
}
}
. . .
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
@Bean
public Function<String, String> logger() {
return new Logger();
}
. . .
// and then have your function definition as such `uppercase|logger`
注意:在函式組合的情況下,只有最後一個PostProcessingFunction例項(如果存在)會生效。例如,假設您有以下函式定義 - foo|bar|baz,並且foo和baz都是PostProcessingFunction的例項。只有baz.postProcess(Message>)會被呼叫。如果baz不是PostProcessingFunction的例項,則不會執行任何後處理功能。
有人可能會爭辯說,您可以透過函式組合輕鬆實現這一點,只需將後處理器組合成另一個Function即可。這確實是一種可能性,但在這種情況下,後處理功能將在上一個函式呼叫之後立即呼叫,並且在訊息傳送到目標目的地之前,即在函式呼叫週期完成之前。
錯誤處理
在本節中,我們將解釋框架提供的錯誤處理機制背後的總體思想。我們將以 Rabbit 繫結器為例,因為各個繫結器為某些支援的機制定義了不同的屬性集,這些機制特定於底層代理功能(例如 Kafka 繫結器)。
錯誤總會發生,Spring Cloud Stream 提供了幾種靈活的機制來處理它們。請注意,這些技術取決於繫結器實現、底層訊息中介軟體的能力以及程式設計模型(稍後會詳細介紹)。
當訊息處理程式(函式)丟擲異常時,它會傳播回繫結器,此時繫結器會使用Spring Retry庫提供的RetryTemplate多次嘗試重試相同的訊息(預設3次)。如果重試不成功,則由錯誤處理機制決定是丟棄訊息、重新排隊訊息進行重新處理還是將失敗訊息傳送到DLQ。
Rabbit 和 Kafka 都支援這些概念(尤其是 DLQ)。但是,其他繫結器可能不支援,因此請參閱您的單個繫結器的文件,瞭解受支援的錯誤處理選項的詳細資訊。
但請記住,響應式函式不符合訊息處理程式的條件,因為它不處理單個訊息,而是提供一種將框架提供的流(即 Flux)與使用者提供的流連線起來的方式。為什麼這很重要?那是因為您在本節後面閱讀的關於重試模板、丟棄失敗訊息、重試、DLQ 以及協助所有這些的配置屬性**只**適用於訊息處理程式(即命令式函式)。
響應式 API 提供了非常豐富的自身運算子和機制庫,可幫助您處理特定於各種響應式用例的錯誤,這些用例比簡單的訊息處理程式用例複雜得多。因此,請使用它們,例如您可以在 reactor.core.publisher.Flux 中找到的 public final Flux<T> retryWhen(Retry retrySpec);。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux
.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
.map(v -> v.toUpperCase());
}
丟棄失敗訊息
預設情況下,系統提供了錯誤處理程式。第一個錯誤處理程式將簡單地記錄錯誤訊息。第二個錯誤處理程式是繫結器特定的錯誤處理程式,它負責在特定訊息系統(例如,傳送到 DLQ)的上下文中處理錯誤訊息。但是由於未提供額外的錯誤處理配置(在此當前場景中),此處理程式將不執行任何操作。因此,在記錄之後,訊息將被丟棄。
雖然在某些情況下可以接受,但對於大多數情況來說,這是不可接受的,我們需要某種恢復機制來避免訊息丟失。
處理錯誤訊息
在上一節中,我們提到預設情況下,導致錯誤的訊息會有效地記錄並丟棄。框架還為您提供了提供自定義錯誤處理程式的機制(即傳送通知或寫入資料庫等)。您可以透過新增專門設計用於接受ErrorMessage的Consumer來實現,該ErrorMessage除了包含有關錯誤的所有資訊(例如,堆疊跟蹤等)之外,還包含原始訊息(觸發錯誤的訊息)。注意:自定義錯誤處理程式與框架提供的錯誤處理程式(即日誌記錄和繫結器錯誤處理程式 - 請參閱上一節)互斥,以確保它們不會相互干擾。
@Bean
public Consumer<ErrorMessage> myErrorHandler() {
return v -> {
// send SMS notification code
};
}
要將此類消費者標識為錯誤處理程式,您只需提供error-handler-definition屬性,指向函式名稱 - spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler。
例如,對於繫結名稱uppercase-in-0,該屬性將如下所示
spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler
如果您使用特殊對映指令將繫結對映到更具可讀性的名稱 - spring.cloud.stream.function.bindings.uppercase-in-0=upper,那麼此屬性將如下所示
spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
如果您不小心將此類處理程式宣告為Function,它仍然會工作,只是其輸出不會做任何事情。但是,鑑於此類處理程式仍然依賴於 Spring Cloud Function 提供的功能,您也可以在處理程式具有一些複雜性時從函式組合中受益,您希望透過函式組合來解決這些複雜性(儘管不太可能)。 |
預設錯誤處理程式
如果您希望為所有函式 bean 使用單個錯誤處理程式,可以使用 Spring Cloud Stream 的標準機制來定義預設屬性 spring.cloud.stream.default.error-handler-definition=myErrorHandler
DLQ - 死信佇列
DLQ 可能是最常見的機制,它允許失敗的訊息傳送到一個特殊目的地:死信佇列。
配置後,失敗的訊息將被髮送到此目的地,以進行後續的重新處理、審計和對賬。
考慮以下示例
@SpringBootApplication
public class SimpleStreamApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleStreamApplication.class,
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
"--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
);
}
@Bean
public Function<Person, Person> uppercase() {
return personIn -> {
throw new RuntimeException("intentional");
});
};
}
}
提醒一下,在此示例中,屬性的uppercase-in-0段對應於輸入目標繫結的名稱。consumer段表示它是一個消費者屬性。
當使用 DLQ 時,至少必須提供 group 屬性才能正確命名 DLQ 目的地。但是,group 通常與 destination 屬性一起使用,如我們的示例所示。 |
除了一些標準屬性,我們還將auto-bind-dlq設定為指示繫結器為uppercase-in-0繫結(對應於uppercase目的地,參見相應屬性)建立和配置DLQ目的地,這將導致一個名為uppercase.myGroup.dlq的附加Rabbit佇列(有關Kafka特定的DLQ屬性,請參閱Kafka文件)。
配置完成後,所有失敗的訊息都將路由到此目的地,並保留原始訊息以供後續操作。
您可以看到錯誤訊息包含更多與原始錯誤相關的資訊,如下所示
. . . .
x-exception-stacktrace: org.springframework.messaging.MessageHandlingException: nested exception is
org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
at. . . . .
Payload: blah
您還可以透過將max-attempts設定為'1'來促進立即排程到DLQ(無需重試)。例如,
--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1
重試模板
在本節中,我們將介紹與重試功能配置相關的配置屬性。
RetryTemplate是Spring Retry庫的一部分。雖然本文件超出範圍,無法涵蓋RetryTemplate的所有功能,但我們將提及以下與RetryTemplate專門相關的消費者屬性
- maxAttempts
-
處理訊息的嘗試次數。
預設值:3。
- backOffInitialInterval
-
重試時的回退初始間隔。
預設 1000 毫秒。
- backOffMaxInterval
-
最大回退間隔。
預設 10000 毫秒。
- backOffMultiplier
-
回退乘數。
預設 2.0。
- defaultRetryable
-
偵聽器丟擲的未列在
retryableExceptions中的異常是否可重試。預設值:
true。 - retryableExceptions
-
一個對映,鍵是 Throwable 類名,值是布林值。指定那些將或不會重試的異常(及其子類)。另請參見
defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false。預設值:空。
儘管上述設定足以滿足大多數定製需求,但它們可能無法滿足某些複雜需求,此時您可能希望提供自己的RetryTemplate例項。為此,請在您的應用程式配置中將其配置為bean。應用程式提供的例項將覆蓋框架提供的例項。此外,為了避免衝突,您必須將要由繫結器使用的RetryTemplate例項限定為@StreamRetryTemplate。例如,
@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
return new RetryTemplate();
}
如上例所示,您無需使用@Bean對其進行註解,因為@StreamRetryTemplate是一個合格的@Bean。
如果您的RetryTemplate需要更精確,您可以在ConsumerProperties中按名稱指定bean,以將特定的重試bean與每個繫結關聯起來。
spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>
繫結器
Spring Cloud Stream 提供了一個 Binder 抽象,用於連線到外部中介軟體的物理目的地。本節提供了有關 Binder SPI 背後的主要概念、其主要元件和特定於實現細節的資訊。
生產者和消費者
下圖顯示了生產者和消費者的一般關係
生產者是任何向繫結目標傳送訊息的元件。繫結目標可以透過該代理的Binder實現繫結到外部訊息代理。呼叫bindProducer()方法時,第一個引數是代理中的目標名稱,第二個引數是生產者傳送訊息的本地目標例項,第三個引數包含用於為該繫結目標建立的介面卡中的屬性(例如分割槽鍵表示式)。
消費者是任何從繫結目的地接收訊息的元件。與生產者一樣,消費者可以繫結到外部訊息代理。呼叫bindConsumer()方法時,第一個引數是目的地名稱,第二個引數提供了一個邏輯消費者組的名稱。對於給定目的地的消費者繫結所代表的每個組,都會收到生產者傳送到該目的地的每條訊息的副本(即,它遵循正常的釋出-訂閱語義)。如果存在多個以相同組名繫結的消費者例項,則訊息會在這些消費者例項之間進行負載均衡,以便生產者傳送的每條訊息都只由每個組中的單個消費者例項消費(即,它遵循正常的佇列語義)。
繫結器 SPI
Binder SPI 由許多介面、開箱即用的實用程式類和發現策略組成,它們提供了一種可插拔的機制,用於連線到外部中介軟體。
SPI 的關鍵是Binder介面,它是一種將輸入和輸出連線到外部中介軟體的策略。以下列表顯示了Binder介面的定義
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
Binding<T> bindConsumer(String bindingName, String group, T inboundBindTarget, C consumerProperties);
Binding<T> bindProducer(String bindingName, T outboundBindTarget, P producerProperties);
}
介面已引數化,提供了許多擴充套件點
-
輸入和輸出繫結目標。
-
擴充套件消費者和生產者屬性,允許特定的繫結器實現新增補充屬性,這些屬性可以以型別安全的方式支援。
典型的繫結器實現包括以下內容
-
一個實現
Binder介面的類; -
一個 Spring
@Configuration類,它建立了一個Binder型別的 bean 以及中介軟體連線基礎設施。 -
在類路徑中找到一個
META-INF/spring.binders檔案,其中包含一個或多個繫結器定義,如以下示例所示kafka:\ org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
如前所述,繫結器抽象也是框架的擴充套件點之一。因此,如果您在上述列表中找不到合適的繫結器,您可以在 Spring Cloud Stream 之上實現自己的繫結器。在《如何從零開始建立 Spring Cloud Stream Binder》一文中,一位社群成員詳細記錄了實現自定義繫結器所需的步驟,並提供了示例。這些步驟也在實現自定義繫結器部分中突出顯示。 |
繫結器檢測
Spring Cloud Stream 依賴於 Binder SPI 的實現來完成將使用者程式碼連線(繫結)到訊息代理的任務。每個 Binder 實現通常連線到一種型別的訊息系統。
類路徑檢測
預設情況下,Spring Cloud Stream 依賴於 Spring Boot 的自動配置來配置繫結過程。如果在類路徑中找到單個 Binder 實現,Spring Cloud Stream 會自動使用它。例如,旨在僅繫結到 RabbitMQ 的 Spring Cloud Stream 專案可以新增以下依賴項
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
有關其他繫結器依賴項的特定 Maven 座標,請參閱該繫結器實現的文件。
類路徑上的多個繫結器
當類路徑上存在多個繫結器時,應用程式必須指明每個目的地繫結使用哪個繫結器。每個繫結器配置都包含一個META-INF/spring.binders檔案,這是一個簡單的屬性檔案,如以下示例所示
rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration
其他提供的繫結器實現(如 Kafka)也有類似的檔案,並且自定義繫結器實現也應提供它們。鍵表示繫結器實現的標識名稱,而值是配置類的逗號分隔列表,每個配置類都包含一個且只有一個org.springframework.cloud.stream.binder.Binder型別的 bean 定義。
繫結器選擇可以全域性執行,使用spring.cloud.stream.defaultBinder屬性(例如,spring.cloud.stream.defaultBinder=rabbit),也可以單獨執行,透過在每個繫結上配置繫結器。例如,一個處理器應用程式(分別具有名為input和output的讀寫繫結)從 Kafka 讀取並寫入 RabbitMQ,可以指定以下配置
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit
連線到多個系統
預設情況下,繫結器共享應用程式的 Spring Boot 自動配置,因此會建立類路徑上找到的每個繫結器的一個例項。如果您的應用程式應連線到多個相同型別的代理,您可以指定多個繫結器配置,每個配置都有不同的環境設定。
開啟顯式繫結器配置會完全停用預設繫結器配置過程。如果您這樣做,所有使用的繫結器都必須包含在配置中。旨在透明使用 Spring Cloud Stream 的框架可能會建立可以透過名稱引用的繫結器配置,但它們不會影響預設繫結器配置。為了做到這一點,繫結器配置可以將其defaultCandidate標誌設定為 false(例如,spring.cloud.stream.binders.<configurationName>.defaultCandidate=false)。這表示一個獨立於預設繫結器配置過程而存在的配置。 |
以下示例顯示了一個處理器應用程式的典型配置,該應用程式連線到兩個 RabbitMQ 代理例項
spring:
cloud:
stream:
bindings:
input:
destination: thing1
binder: rabbit1
output:
destination: thing2
binder: rabbit2
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: <host1>
rabbit2:
type: rabbit
environment:
spring:
rabbitmq:
host: <host2>
特定繫結器的environment屬性也可以用於任何 Spring Boot 屬性,包括spring.main.sources,這對於為特定繫結器新增額外配置很有用,例如覆蓋自動配置的 bean。 |
例如;
environment:
spring:
main:
sources: com.acme.config.MyCustomBinderConfiguration
要為特定繫結器環境啟用特定配置檔案,您應該使用spring.profiles.active屬性
environment:
spring:
profiles:
active: myBinderProfile
在多繫結器應用程式中自定義繫結器
當應用程式中有多個繫結器並希望自定義繫結器時,可以透過提供BinderCustomizer實現來實現。在單個繫結器的應用程式中,不需要此特殊自定義器,因為繫結器上下文可以直接訪問自定義 bean。但是,在多繫結器場景中,情況並非如此,因為不同的繫結器存在於不同的應用程式上下文中。透過提供BinderCustomizer介面的實現,繫結器,儘管位於不同的應用程式上下文中,將接收到自定義。Spring Cloud Stream 確保在應用程式開始使用繫結器之前進行自定義。使用者必須檢查繫結器型別,然後應用必要的自定義。
這是一個提供BinderCustomizer bean 的示例。
@Bean
public BinderCustomizer binderCustomizer() {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setRebalanceListener(...);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}
請注意,當存在多個相同型別的繫結器例項時,可以使用繫結器名稱來過濾自定義。
繫結視覺化和控制
Spring Cloud Stream 透過 Actuator 端點和程式設計方式支援繫結的視覺化和控制。
程式設計方式
自版本 3.1 起,我們公開了 org.springframework.cloud.stream.binding.BindingsLifecycleController,它註冊為 bean,一旦注入即可用於控制單個繫結的生命週期
例如,檢視一個測試用例的片段。如您所見,我們從 spring 應用程式上下文檢索 BindingsLifecycleController 並執行單個方法來控制 echo-in-0 繫結的生命週期。
BindingsLifecycleController bindingsController = context.getBean(BindingsLifecycleController.class);
Binding binding = bindingsController.queryState("echo-in-0");
assertThat(binding.isRunning()).isTrue();
bindingsController.changeState("echo-in-0", State.STOPPED);
//Alternative way of changing state. For convenience we expose start/stop and pause/resume operations.
//bindingsController.stop("echo-in-0")
assertThat(binding.isRunning()).isFalse();
執行器
由於 Actuator 和 Web 是可選的,您必須首先新增其中一個 Web 依賴項以及手動新增 Actuator 依賴項。以下示例展示瞭如何新增 Web 框架的依賴項
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
以下示例展示瞭如何新增 WebFlux 框架的依賴項
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
您可以如下新增 Actuator 依賴項
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
要在 Cloud Foundry 中執行 Spring Cloud Stream 2.0 應用程式,您必須將 spring-boot-starter-web 和 spring-boot-starter-actuator 新增到類路徑中。否則,應用程式將因健康檢查失敗而無法啟動。 |
您還必須透過設定以下屬性來啟用bindings執行器端點:--management.endpoints.web.exposure.include=bindings。
一旦這些先決條件得到滿足。應用程式啟動時,您應該在日誌中看到以下內容
: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .
要視覺化當前繫結,請訪問以下 URL:http://<host>:<port>/actuator/bindings
或者,要檢視單個繫結,請訪問以下類似 URL 之一:http://<host>:<port>/actuator/bindings/<bindingName>;
您還可以透過向相同的 URL 傳送 POST 請求,並在 JSON 中提供 state 引數來停止、啟動、暫停和恢復單個繫結,如以下示例所示
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
PAUSED 和 RESUMED 僅在相應的繫結器及其底層技術支援時才有效。否則,您會在日誌中看到警告訊息。目前,只有 Kafka 和 [Solace](https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pauseresume) 繫結器支援 PAUSED 和 RESUMED 狀態。 |
繫結器配置屬性
以下屬性在自定義繫結器配置時可用。這些屬性透過org.springframework.cloud.stream.config.BinderProperties公開
它們必須以spring.cloud.stream.binders.<configurationName>為字首。
- 型別
-
繫結器型別。它通常引用類路徑上找到的繫結器之一——特別是
META-INF/spring.binders檔案中的一個鍵。預設情況下,它與配置名稱具有相同的值。
- 繼承環境
-
配置是否繼承應用程式自身的環境。
預設值:
true。 - 環境
-
用於定製繫結器環境的一組屬性的根。當設定此屬性時,建立繫結器的上下文不是應用程式上下文的子上下文。此設定允許繫結器元件和應用程式元件之間完全分離。
預設值:
空。 - 預設候選
-
繫結器配置是否是作為預設繫結器的候選,或者只能在明確引用時使用。此設定允許新增繫結器配置而不干擾預設處理。
預設值:
true。
實現自定義繫結器
為了實現自定義Binder,您只需要
-
新增所需的依賴項
-
提供 ProvisioningProvider 實現
-
提供 MessageProducer 實現
-
提供 MessageHandler 實現
-
提供 Binder 實現
-
建立繫結器配置
-
在 META-INF/spring.binders 中定義您的繫結器
新增所需的依賴項
將spring-cloud-stream依賴項新增到您的專案中(例如,對於 Maven)
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>${spring.cloud.stream.version}</version>
</dependency>
提供 ProvisioningProvider 實現
ProvisioningProvider負責消費者和生產者目標的供應,並且需要將 application.yml 或 application.properties 檔案中包含的邏輯目標轉換為物理目標引用。
以下是 ProvisioningProvider 實現的示例,它只修剪透過輸入/輸出繫結配置提供的目的地
public class FileMessageBinderProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {
@Override
public ProducerDestination provisionProducerDestination(
final String name,
final ProducerProperties properties) {
return new FileMessageDestination(name);
}
@Override
public ConsumerDestination provisionConsumerDestination(
final String name,
final String group,
final ConsumerProperties properties) {
return new FileMessageDestination(name);
}
private class FileMessageDestination implements ProducerDestination, ConsumerDestination {
private final String destination;
private FileMessageDestination(final String destination) {
this.destination = destination;
}
@Override
public String getName() {
return destination.trim();
}
@Override
public String getNameForPartition(int partition) {
throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
}
}
}
提供 MessageProducer 實現
MessageProducer負責消費事件並將其作為訊息傳遞給配置為消費此類事件的客戶端應用程式。
以下是 MessageProducer 實現的一個示例,它擴充套件了 MessageProducerSupport 抽象,以便輪詢與修剪後的目標名稱匹配且位於專案路徑中的檔案,同時還歸檔已讀取的訊息並丟棄後續相同的訊息
public class FileMessageProducer extends MessageProducerSupport {
public static final String ARCHIVE = "archive.txt";
private final ConsumerDestination destination;
private String previousPayload;
public FileMessageProducer(ConsumerDestination destination) {
this.destination = destination;
}
@Override
public void doStart() {
receive();
}
private void receive() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(() -> {
String payload = getPayload();
if(payload != null) {
Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
archiveMessage(payload);
sendMessage(receivedMessage);
}
}, 0, 50, MILLISECONDS);
}
private String getPayload() {
try {
List<String> allLines = Files.readAllLines(Paths.get(destination.getName()));
String currentPayload = allLines.get(allLines.size() - 1);
if(!currentPayload.equals(previousPayload)) {
previousPayload = currentPayload;
return currentPayload;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
private void archiveMessage(String payload) {
try {
Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
| 在實現自定義繫結器時,此步驟並非嚴格強制,因為您始終可以使用已存在的 MessageProducer 實現! |
提供 MessageHandler 實現
MessageHandler 提供生成事件所需的邏輯。
以下是 MessageHandler 實現的一個示例
public class FileMessageHandler implements MessageHandler{
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//write message to file
}
}
| 在實現自定義繫結器時,此步驟並非嚴格強制,因為您始終可以使用已存在的 MessageHandler 實現! |
提供 Binder 實現
您現在能夠提供自己的Binder抽象實現。這可以透過以下方式輕鬆完成
-
擴充套件
AbstractMessageChannelBinder類 -
將您的 ProvisioningProvider 指定為 AbstractMessageChannelBinder 的泛型引數
-
覆蓋
createProducerMessageHandler和createConsumerEndpoint方法
例如。
public class FileMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, FileMessageBinderProvisioner> {
public FileMessageBinder(
String[] headersToEmbed,
FileMessageBinderProvisioner provisioningProvider) {
super(headersToEmbed, provisioningProvider);
}
@Override
protected MessageHandler createProducerMessageHandler(
final ProducerDestination destination,
final ProducerProperties producerProperties,
final MessageChannel errorChannel) throws Exception {
return message -> {
String fileName = destination.getName();
String payload = new String((byte[])message.getPayload()) + "\n";
try {
Files.write(Paths.get(fileName), payload.getBytes(), CREATE, APPEND);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}
@Override
protected MessageProducer createConsumerEndpoint(
final ConsumerDestination destination,
final String group,
final ConsumerProperties properties) throws Exception {
return new FileMessageProducer(destination);
}
}
建立繫結器配置
嚴格要求您建立一個 Spring 配置來初始化繫結器實現的 bean(以及您可能需要的其他所有 bean)
@Configuration
public class FileMessageBinderConfiguration {
@Bean
@ConditionalOnMissingBean
public FileMessageBinderProvisioner fileMessageBinderProvisioner() {
return new FileMessageBinderProvisioner();
}
@Bean
@ConditionalOnMissingBean
public FileMessageBinder fileMessageBinder(FileMessageBinderProvisioner fileMessageBinderProvisioner) {
return new FileMessageBinder(null, fileMessageBinderProvisioner);
}
}
在 META-INF/spring.binders 中定義您的繫結器
最後,您必須在類路徑上的META-INF/spring.binders檔案中定義您的繫結器,指定繫結器的名稱和您的繫結器配置類的完全限定名稱
myFileBinder:\
com.example.springcloudstreamcustombinder.config.FileMessageBinderConfiguration
配置選項
Spring Cloud Stream 支援通用配置選項以及繫結和繫結器的配置。一些繫結器允許額外的繫結屬性支援中介軟體特定的功能。
配置選項可以透過 Spring Boot 支援的任何機制提供給 Spring Cloud Stream 應用程式。這包括應用程式引數、環境變數以及 YAML 或 .properties 檔案。
繫結服務屬性
這些屬性透過org.springframework.cloud.stream.config.BindingServiceProperties公開
- spring.cloud.stream.instanceCount
-
應用程式的已部署例項數。必須在生產者端進行分割槽設定。當使用 RabbitMQ 且 Kafka 的
autoRebalanceEnabled=false時,必須在消費者端進行設定。預設值:
1。 - spring.cloud.stream.instanceIndex
-
應用程式的例項索引:一個從
0到instanceCount - 1的數字。用於 RabbitMQ 和 Kafka(如果autoRebalanceEnabled=false)的分割槽。在 Cloud Foundry 中自動設定以匹配應用程式的例項索引。 - spring.cloud.stream.dynamicDestinations
-
可以動態繫結(例如,在動態路由場景中)的目標列表。如果設定,則只能繫結列出的目標。
預設值:空(允許繫結任何目的地)。
- spring.cloud.stream.defaultBinder
-
如果配置了多個繫結器,則使用的預設繫結器。參見類路徑上的多個繫結器。
預設值:空。
- spring.cloud.stream.overrideCloudConnectors
-
此屬性僅適用於
cloud配置檔案處於活動狀態且應用程式提供了 Spring Cloud Connectors 的情況。如果屬性為false(預設值),繫結器會檢測合適的繫結服務(例如,Cloud Foundry 中為 RabbitMQ 繫結器繫結的 RabbitMQ 服務)並使用它建立連線(通常透過 Spring Cloud Connectors)。當設定為true時,此屬性指示繫結器完全忽略繫結服務並依賴 Spring Boot 屬性(例如,依賴環境中為 RabbitMQ 繫結器提供的spring.rabbitmq.*屬性)。此屬性的典型用法是在連線到多個系統時巢狀在自定義環境中。預設值:
false。 - spring.cloud.stream.bindingRetryInterval
-
重試繫結建立的間隔(以秒為單位),例如,當繫結器不支援延遲繫結並且代理(例如 Apache Kafka)關閉時。將其設定為零,將此類情況視為致命,阻止應用程式啟動。
預設值:
30
繫結屬性
繫結屬性透過spring.cloud.stream.bindings.<bindingName>.<property>=<value>格式提供。<bindingName>表示正在配置的繫結的名稱。
例如,對於以下函式
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
輸入繫結名為uppercase-in-0,輸出繫結名為uppercase-out-0。更多詳細資訊請參見繫結和繫結名稱。
為避免重複,Spring Cloud Stream 支援為所有繫結設定值,格式為spring.cloud.stream.default.<property>=<value>和spring.cloud.stream.default.<producer|consumer>.<property>=<value>,用於通用繫結屬性。
當涉及避免擴充套件繫結屬性的重複時,應使用此格式 - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>。
通用繫結屬性
這些屬性透過org.springframework.cloud.stream.config.BindingProperties公開
以下繫結屬性適用於輸入和輸出繫結,並且必須以spring.cloud.stream.bindings.<bindingName>.為字首(例如,spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock)。
預設值可以透過使用spring.cloud.stream.default字首設定(例如spring.cloud.stream.default.contentType=application/json)。
- 目的地
-
繫結在已繫結中介軟體上的目標目的地(例如,RabbitMQ 交換器或 Kafka 主題)。如果繫結表示消費者繫結(輸入),則可以繫結到多個目的地,目的地名稱可以指定為逗號分隔的
String值。否則,將使用實際的繫結名稱。此屬性的預設值不能被覆蓋。 - group
-
繫結的消費者組。僅適用於入站繫結。參見消費者組。
預設值:
null(表示匿名消費者)。 - contentType
-
此繫結的內容型別。參見
內容型別協商。預設值:
application/json。 - 繫結器
-
此繫結使用的繫結器。有關詳細資訊,請參見
類路徑上的多個繫結器。預設值:
null(如果存在,則使用預設繫結器)。
消費者屬性
這些屬性透過org.springframework.cloud.stream.binder.ConsumerProperties公開
以下繫結屬性僅適用於輸入繫結,並且必須以spring.cloud.stream.bindings.<bindingName>.consumer.為字首(例如,spring.cloud.stream.bindings.input.consumer.concurrency=3)。
預設值可以透過使用spring.cloud.stream.default.consumer字首設定(例如,spring.cloud.stream.default.consumer.headerMode=none)。
- 自動啟動
-
指示此消費者是否需要自動啟動
預設值:
true。 - concurrency
-
入站消費者的併發性。
預設值:
1。 - 已分割槽
-
消費者是否從分割槽生產者接收資料。
預設值:
false。 - 頭模式
-
當設定為
none時,停用輸入端的頭解析。僅對不支援原生訊息頭且需要頭嵌入的訊息中介軟體有效。此選項在從非 Spring Cloud Stream 應用程式消費資料且不支援原生頭時非常有用。當設定為headers時,它使用中介軟體的原生頭機制。當設定為embeddedHeaders時,它將頭嵌入到訊息 payload 中。預設值:取決於繫結器實現。
- maxAttempts
-
如果處理失敗,處理訊息的嘗試次數(包括第一次)。設定為
1以停用重試。預設值:
3。 - backOffInitialInterval
-
重試時的回退初始間隔。
預設值:
1000。 - backOffMaxInterval
-
最大回退間隔。
預設值:
10000。 - backOffMultiplier
-
回退乘數。
預設值:
2.0。 - defaultRetryable
-
偵聽器丟擲的未列在
retryableExceptions中的異常是否可重試。預設值:
true。 - 例項計數
-
當設定為大於或等於零的值時,它允許自定義此消費者的例項計數(如果與
spring.cloud.stream.instanceCount不同)。當設定為負值時,它預設為spring.cloud.stream.instanceCount。如果提供了instanceIndexList則忽略。有關更多資訊,請參見例項索引和例項計數。預設值:
-1。 - 例項索引
-
當設定為大於或等於零的值時,它允許自定義此消費者的例項索引(如果與
spring.cloud.stream.instanceIndex不同)。當設定為負值時,它預設為spring.cloud.stream.instanceIndex。如果提供了instanceIndexList則忽略。有關更多資訊,請參見例項索引和例項計數。預設值:
-1。 - 例項索引列表
-
與不支援原生分割槽(如 RabbitMQ)的繫結器一起使用;允許應用程式例項從多個分割槽消費。
預設值:空。
- retryableExceptions
-
一個對映,鍵是 Throwable 類名,值是布林值。指定那些將或不會重試的異常(及其子類)。另請參見
defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false。預設值:空。
- useNativeDecoding
-
當設定為
true時,入站訊息由客戶端庫直接反序列化,客戶端庫必須相應地配置(例如,設定適當的 Kafka 生產者值反序列化器)。當使用此配置時,入站訊息的解組不基於繫結的contentType。當使用原生解碼時,生產者有責任使用適當的編碼器(例如,Kafka 生產者值序列化器)來序列化出站訊息。此外,當使用原生編碼和解碼時,headerMode=embeddedHeaders屬性將被忽略,並且頭不會嵌入到訊息中。參見生產者屬性useNativeEncoding。預設值:
false。 - 多路複用
-
當設定為 true 時,底層繫結器將原生複用相同輸入繫結上的目的地。
預設值:
false。
高階消費者配置
對於訊息驅動消費者的底層訊息監聽器容器的高階配置,請向應用程式上下文新增單個ListenerContainerCustomizer bean。它將在應用上述屬性後被呼叫,並可用於設定其他屬性。類似地,對於輪詢消費者,新增一個MessageSourceCustomizer bean。
以下是 RabbitMQ 繫結器的一個示例
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}
@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}
生產者屬性
這些屬性透過org.springframework.cloud.stream.binder.ProducerProperties公開
以下繫結屬性僅適用於輸出繫結,並且必須以spring.cloud.stream.bindings.<bindingName>.producer.為字首(例如,spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id)。
預設值可以透過使用字首spring.cloud.stream.default.producer設定(例如,spring.cloud.stream.default.producer.partitionKeyExpression=headers.id)。
- 自動啟動
-
指示此消費者是否需要自動啟動
預設值:
true。 - 分割槽鍵表示式
-
一個 SpEL 表示式,用於確定如何分割槽出站資料。如果設定,此繫結上的出站資料將進行分割槽。
partitionCount必須設定為大於 1 的值才能生效。參見分割槽支援。預設值:null。
- 分割槽鍵提取器名稱
-
實現
PartitionKeyExtractorStrategy的 bean 的名稱。用於提取用於計算分割槽 ID 的鍵(參見“partitionSelector*”)。與“partitionKeyExpression”互斥。預設值:null。
- 分割槽選擇器名稱
-
實現
PartitionSelectorStrategy的 bean 的名稱。用於根據分割槽鍵(參見“partitionKeyExtractor*”)確定分割槽 ID。與“partitionSelectorExpression”互斥。預設值:null。
- 分割槽選擇器表示式
-
用於自定義分割槽選擇的 SpEL 表示式。如果兩者均未設定,則分割槽選擇為
hashCode(key) % partitionCount,其中key透過partitionKeyExpression計算。預設值:
null。 - 分割槽計數
-
如果啟用了分割槽,則資料的目標分割槽數。如果生產者已分割槽,則必須設定為大於 1 的值。在 Kafka 上,它被解釋為提示。將使用此值和目標主題的分割槽計數中較大的一個。
預設值:
1。 - 所需組
-
一個逗號分隔的組列表,生產者必須確保訊息傳遞到這些組,即使它們是在建立之後啟動的(例如,透過在 RabbitMQ 中預建立持久佇列)。
- 頭模式
-
當設定為
none時,它停用輸出端的頭部嵌入。它僅對不支援訊息頭部原生且需要頭部嵌入的訊息中介軟體有效。此選項在為非 Spring Cloud Stream 應用程式生成資料且不支援原生頭部時非常有用。當設定為headers時,它使用中介軟體的原生頭部機制。當設定為embeddedHeaders時,它將頭部嵌入到訊息 payload 中。預設值:取決於繫結器實現。
- useNativeEncoding
-
當設定為
true時,出站訊息由客戶端庫直接序列化,該客戶端庫必須相應地配置(例如,設定適當的 Kafka 生產者值序列化器)。當使用此配置時,出站訊息的編組不基於繫結的contentType。當使用原生編碼時,消費者有責任使用適當的解碼器(例如,Kafka 消費者值反序列化器)來反序列化入站訊息。此外,當使用原生編碼和解碼時,headerMode=embeddedHeaders屬性將被忽略,並且頭不會嵌入到訊息中。參見消費者屬性useNativeDecoding。預設值:
false。 - 錯誤通道已啟用
-
當設定為 true 時,如果繫結器支援非同步傳送結果,傳送失敗將被髮送到目標錯誤通道。有關詳細資訊,請參見錯誤處理。
預設值: false。
高階生產者配置
在某些情況下,生產者屬性不足以在繫結器中正確配置生產訊息處理程式,或者您可能更喜歡在配置此類生產訊息處理程式時採用程式設計方法。無論出於何種原因,spring-cloud-stream 都提供了ProducerMessageHandlerCustomizer來實現它。
@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {
/**
* Configure a {@link MessageHandler} that is being created by the binder for the
* provided destination name.
* @param handler the {@link MessageHandler} from the binder.
* @param destinationName the bound destination name.
*/
void configure(H handler, String destinationName);
}
如您所見,它允許您訪問實際的生產MessageHandler例項,您可以根據需要進行配置。您所需要做的就是提供此策略的實現並將其配置為@Bean。
內容型別協商
資料轉換是任何訊息驅動微服務架構的核心功能之一。鑑於在 Spring Cloud Stream 中,此類資料表示為 Spring Message,訊息在到達目的地之前可能需要轉換為所需的形狀或大小。這需要兩個原因
-
將傳入訊息的內容轉換為與應用程式提供的處理程式的簽名匹配。
-
將傳出訊息的內容轉換為線路格式。
線路格式通常是byte[](Kafka 和 Rabbit 繫結器都是如此),但它由繫結器實現控制。
在 Spring Cloud Stream 中,訊息轉換透過org.springframework.messaging.converter.MessageConverter完成。
| 作為後續詳細資訊的補充,您可能還希望閱讀以下部落格文章。 |
機制
為了更好地理解內容型別協商的機制和必要性,我們以以下訊息處理程式為例,看一個非常簡單的用例
public Function<Person, String> personFunction {..}
| 為簡單起見,我們假設這是應用程式中唯一的處理程式函式(我們假設沒有內部管道)。 |
前面示例中顯示的處理程式期望一個Person物件作為引數,並生成一個String型別作為輸出。為了使框架成功地將傳入的Message作為引數傳遞給此處理程式,它必須以某種方式將Message型別的有效負載從線路格式轉換為Person型別。換句話說,框架必須找到並應用適當的MessageConverter。為了實現這一點,框架需要使用者提供一些指令。其中一個指令已經由處理程式方法本身的簽名(Person型別)提供。因此,理論上,這應該(在某些情況下也確實是)足夠的。但是,對於大多數用例,為了選擇適當的MessageConverter,框架需要額外的資訊。缺失的這部分資訊是contentType。
Spring Cloud Stream 提供了三種機制來定義contentType(按優先順序順序)
-
HEADER:
contentType可以透過訊息本身進行通訊。透過提供contentType頭,您可以宣告用於定位和應用適當MessageConverter的內容型別。 -
繫結:
contentType可以透過設定spring.cloud.stream.bindings.input.content-type屬性來為每個目標繫結設定。屬性名中的 input部分對應於目的地的實際名稱(在本例中為“input”)。這種方法允許您根據每個繫結宣告用於定位和應用適當MessageConverter的內容型別。 -
預設值:如果
contentType不存在於Message頭或繫結中,則使用預設的application/json內容型別來定位和應用適當的MessageConverter。
如前所述,前面的列表還展示了在發生衝突時的優先順序順序。例如,透過頭提供的內容型別優先於任何其他內容型別。對於每個繫結設定的內容型別也同樣適用,這實際上允許您覆蓋預設內容型別。但是,它也提供了一個合理的預設值(這是根據社群反饋確定的)。
將application/json設為預設值的另一個原因是,分散式微服務架構驅動的互操作性需求,其中生產者和消費者不僅執行在不同的 JVM 中,而且還可以執行在不同的非 JVM 平臺上。
當非空處理程式方法返回時,如果返回值為Message,則該Message成為有效載荷。但是,當返回值不是Message時,新的Message將以返回值作為有效載荷構建,同時繼承輸入Message中的頭,減去由SpringIntegrationProperties.messageHandlerNotPropagatedHeaders定義或過濾的頭。預設情況下,那裡只有一個頭集:contentType。這意味著新的Message沒有設定contentType頭,從而確保contentType可以演變。您始終可以選擇不從處理程式方法返回Message,在那裡您可以注入您希望的任何頭。
如果存在內部管道,訊息將透過相同的轉換過程傳送到下一個處理程式。但是,如果沒有內部管道或已到達管道末尾,則訊息將傳送回輸出目的地。
內容型別與引數型別
如前所述,為了使框架選擇適當的MessageConverter,它需要引數型別,並且可選地需要內容型別資訊。選擇適當MessageConverter的邏輯駐留在引數解析器(HandlerMethodArgumentResolvers)中,這些解析器在呼叫使用者定義處理程式方法之前觸發(此時框架知道實際引數型別)。如果引數型別與當前有效負載的型別不匹配,框架將委託給預配置的MessageConverter堆疊,以檢視它們中是否有任何一個可以轉換有效負載。如您所見,MessageConverter 的Object fromMessage(Message<?> message, Class<?> targetClass);操作將targetClass作為其引數之一。框架還確保提供的Message始終包含contentType頭。當之前沒有contentType頭時,它會注入每個繫結contentType頭或預設contentType頭。contentType引數型別的組合是框架確定訊息是否可以轉換為目標型別的機制。如果沒有找到適當的MessageConverter,則會丟擲異常,您可以透過新增自定義MessageConverter來處理(請參閱使用者定義的訊息轉換器)。
但是,如果有效負載型別與處理程式方法宣告的目標型別匹配呢?在這種情況下,無需進行轉換,有效負載將原樣傳遞。儘管這聽起來非常簡單和合乎邏輯,但請記住將Message<?>或Object作為引數的處理程式方法。透過將目標型別宣告為Object(在 Java 中是所有物件的instanceof),您實際上放棄了轉換過程。
不要期望Message僅根據contentType轉換為其他型別。請記住,contentType是對目標型別的補充。如果您願意,可以提供一個提示,MessageConverter可能會或可能不會將其考慮在內。 |
訊息轉換器
MessageConverters 定義了兩種方法
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
理解這些方法的契約及其用法非常重要,特別是在 Spring Cloud Stream 的上下文中。
fromMessage方法將傳入的Message轉換為引數型別。Message的有效負載可以是任何型別,並且由MessageConverter的實際實現來支援多種型別。例如,一些 JSON 轉換器可能支援byte[]、String等有效負載型別。當應用程式包含內部管道(即,輸入 → 處理程式1 → 處理程式2 →. . . → 輸出)且上游處理程式的輸出導致的訊息可能不是初始線路格式時,這一點很重要。
但是,toMessage 方法有一個更嚴格的契約,並且必須始終將 Message 轉換為線路格式:byte[]。
因此,出於所有目的(尤其是在實現自己的轉換器時),您將這兩種方法視為具有以下簽名
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);
提供的訊息轉換器
如前所述,框架已經提供了一系列 MessageConverter 來處理大多數常見用例。以下列表描述了提供的 MessageConverter,按優先順序順序(使用第一個起作用的 MessageConverter)
-
JsonMessageConverter:顧名思義,當contentType為application/json(預設)時,它支援將Message的有效負載轉換為/從 POJO。 -
ByteArrayMessageConverter:支援當contentType為application/octet-stream時,將Message的 payload 從byte[]轉換為byte[]。它本質上是一個直通,主要用於向後相容性。 -
ObjectStringMessageConverter:當contentType為text/plain時,支援將任何型別轉換為String。它呼叫 Object 的toString()方法,或者如果 payload 是byte[],則呼叫新的String(byte[])。
當找不到合適的轉換器時,框架會丟擲異常。發生這種情況時,您應該檢查您的程式碼和配置,並確保您沒有遺漏任何東西(即,確保您透過繫結或頭提供了contentType)。但是,最有可能的是,您發現了一些不常見的情況(例如自定義contentType),並且當前提供的MessageConverters堆疊不知道如何轉換。如果是這種情況,您可以新增自定義MessageConverter。請參閱使用者定義的訊息轉換器。
使用者定義的訊息轉換器
Spring Cloud Stream 公開了一種機制來定義和註冊額外的MessageConverter。要使用它,請實現org.springframework.messaging.converter.MessageConverter,並將其配置為@Bean。然後它將被新增到現有的MessageConverter堆疊中。
重要的是要理解自定義 MessageConverter 實現被新增到現有棧的頭部。因此,自定義 MessageConverter 實現優先於現有實現,這使您可以覆蓋和新增現有轉換器。 |
以下示例展示瞭如何建立訊息轉換器 Bean 以支援名為 application/bar 的新內容型別
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
應用間通訊
Spring Cloud Stream 實現了應用程式之間的通訊。應用程式間通訊是一個複雜的問題,涉及以下幾個方面
連線多個應用程式例項
雖然 Spring Cloud Stream 使單個 Spring Boot 應用程式輕鬆連線到訊息系統,但 Spring Cloud Stream 的典型場景是建立多應用程式管道,其中微服務應用程式相互發送資料。您可以透過關聯“相鄰”應用程式的輸入和輸出目的地來實現此場景。
假設設計要求時間源應用程式向日志接收器應用程式傳送資料。您可以為這兩個應用程式中的繫結使用一個名為ticktock的公共目的地。
時間源(繫結名為output)將設定以下屬性
spring.cloud.stream.bindings.output.destination=ticktock
日誌接收器(繫結名為input)將設定以下屬性
spring.cloud.stream.bindings.input.destination=ticktock
例項索引和例項計數
當 Spring Cloud Stream 應用程式進行橫向擴充套件時,每個例項都可以接收有關有多少其他相同應用程式例項以及其自身的例項索引的資訊。Spring Cloud Stream 透過spring.cloud.stream.instanceCount和spring.cloud.stream.instanceIndex屬性來實現這一點。例如,如果有一個 HDFS sink 應用程式的三個例項,所有三個例項的spring.cloud.stream.instanceCount都設定為3,並且各個應用程式的spring.cloud.stream.instanceIndex分別設定為0、1和2。
當 Spring Cloud Stream 應用程式透過 Spring Cloud Data Flow 部署時,這些屬性會自動配置;當 Spring Cloud Stream 應用程式獨立啟動時,這些屬性必須正確設定。預設情況下,spring.cloud.stream.instanceCount為1,spring.cloud.stream.instanceIndex為0。
在橫向擴充套件場景中,正確配置這兩個屬性對於解決分割槽行為(參見下文)至關重要,並且某些繫結器(例如 Kafka 繫結器)始終需要這兩個屬性,以確保資料在多個消費者例項之間正確拆分。
分割槽
Spring Cloud Stream 中的分割槽包含兩個任務
為分割槽配置輸出繫結
您可以透過設定其partitionKeyExpression或partitionKeyExtractorName屬性之一以及partitionCount屬性來配置輸出繫結以傳送分割槽資料。
例如,以下是有效且典型的配置
spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
根據上述示例配置,資料將使用以下邏輯傳送到目標分割槽。
對於傳送到分割槽輸出繫結的每條訊息,都會根據partitionKeyExpression計算分割槽鍵的值。partitionKeyExpression是一個 SpEL 表示式,它根據出站訊息進行評估(在前面的示例中,它是訊息頭中id的值),用於提取分割槽鍵。
如果 SpEL 表示式不足以滿足您的需求,您可以透過提供org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的實現並將其配置為 bean(透過使用@Bean註解)來計算分割槽鍵值。如果應用程式上下文中存在多個org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy型別的 bean,您可以透過partitionKeyExtractorName屬性指定其名稱來進一步過濾,如以下示例所示
--spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
return new CustomPartitionKeyExtractorClass();
}
在 Spring Cloud Stream 的早期版本中,您可以透過設定spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass屬性來指定org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的實現。自 3.0 版本起,此屬性已移除。 |
一旦計算出訊息鍵,分割槽選擇過程將確定目標分割槽,其值介於0和partitionCount - 1之間。適用於大多數場景的預設計算基於以下公式:key.hashCode() % partitionCount。這可以在繫結上進行自定義,方法是設定一個 SpEL 表示式以根據“key”進行評估(透過partitionSelectorExpression屬性),或者將org.springframework.cloud.stream.binder.PartitionSelectorStrategy的實現配置為 bean(透過使用@Bean註解)。與PartitionKeyExtractorStrategy類似,當應用程式上下文中存在多個此型別的 bean 時,您可以使用spring.cloud.stream.bindings.output.producer.partitionSelectorName屬性進一步過濾,如以下示例所示
--spring.cloud.stream.bindings.func-out-0.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
return new CustomPartitionSelectorClass();
}
在 Spring Cloud Stream 的早期版本中,您可以透過設定spring.cloud.stream.bindings.output.producer.partitionSelectorClass屬性來指定org.springframework.cloud.stream.binder.PartitionSelectorStrategy的實現。自 3.0 版本起,此屬性已移除。 |
為分割槽配置輸入繫結
透過設定其partitioned屬性以及應用程式本身的instanceIndex和instanceCount屬性,配置輸入繫結(繫結名稱為uppercase-in-0)以接收分割槽資料,如以下示例所示
spring.cloud.stream.bindings.uppercase-in-0.consumer.partitioned=true spring.cloud.stream.instanceIndex=3 spring.cloud.stream.instanceCount=5
instanceCount 值表示資料應在其中分割槽的應用程式例項總數。instanceIndex 在多個例項中必須是唯一值,其值介於 0 和 instanceCount - 1 之間。例項索引有助於每個應用程式例項識別其接收資料的唯一分割槽。使用不原生支援分割槽的技術的繫結器需要它。例如,對於 RabbitMQ,每個分割槽都有一個佇列,佇列名稱包含例項索引。對於 Kafka,如果 autoRebalanceEnabled 為 true(預設值),Kafka 會負責在例項之間分發分割槽,並且不需要這些屬性。如果 autoRebalanceEnabled 設定為 false,繫結器將使用 instanceCount 和 instanceIndex 來確定例項訂閱的分割槽(您必須至少有與例項一樣多的分割槽)。繫結器會分配分割槽而不是 Kafka。如果您希望特定分割槽的訊息始終傳送到同一個例項,這可能會很有用。當繫結器配置需要它們時,正確設定這兩個值以確保所有資料都被消費並且應用程式例項接收到互斥的資料集非常重要。
雖然在獨立情況下為分割槽資料處理使用多個例項可能設定複雜,但 Spring Cloud Dataflow 可以透過正確填充輸入和輸出值,並讓您依賴執行時基礎設施提供例項索引和例項計數資訊來顯著簡化該過程。
測試
Spring Cloud Stream 提供支援,無需連線到訊息系統即可測試您的微服務應用程式。
Spring Integration 測試繫結器
Spring Cloud Stream 提供了一個測試繫結器,您可以使用它來測試各種應用程式元件,而無需實際的真實繫結器實現或訊息代理。
這個測試繫結器充當單元測試和整合測試之間的橋樑,它基於 Spring Integration 框架作為 JVM 內的訊息代理,本質上為您提供了兩全其美的好處——一個真實的繫結器,但無需網路。
測試繫結器配置
要啟用 Spring Integration 測試繫結器,您只需將其新增為依賴項。
新增所需依賴項
下面是所需 Maven POM 條目的示例。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
或者對於 build.gradle.kts
testImplementation("org.springframework.cloud:spring-cloud-stream-test-binder")
測試繫結器使用
現在您可以將微服務作為簡單的單元測試進行測試
@SpringBootTest
public class SampleStreamTests {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
@Test
public void testEmptyConfiguration() {
this.input.send(new GenericMessage<byte[]>("hello".getBytes()));
assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
}
@SpringBootApplication
@Import(TestChannelBinderConfiguration.class)
public static class SampleConfiguration {
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
}
}
如果您需要更多控制權,或者想在同一個測試套件中測試多個配置,您也可以執行以下操作
@EnableAutoConfiguration
public static class MyTestConfiguration {
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
}
. . .
@Test
public void sampleTest() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
MyTestConfiguration.class))
.run("--spring.cloud.function.definition=uppercase")) {
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
source.send(new GenericMessage<byte[]>("hello".getBytes()));
assertThat(target.receive().getPayload()).isEqualTo("HELLO".getBytes());
}
}
對於具有多個繫結和/或多個輸入和輸出,或者只是想明確指定要傳送或接收的目標名稱的情況,InputDestination 和 OutputDestination 的 send() 和 receive() 方法已被重寫,允許您提供輸入和輸出目標的名稱。
考慮以下示例
@EnableAutoConfiguration
public static class SampleFunctionConfiguration {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> reverse() {
return value -> new StringBuilder(value).reverse().toString();
}
}
以及實際的測試
@Test
public void testMultipleFunctions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
SampleFunctionConfiguration.class))
.run("--spring.cloud.function.definition=uppercase;reverse")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage, "uppercase-in-0");
inputDestination.send(inputMessage, "reverse-in-0");
Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());
outputMessage = outputDestination.receive(0, "reverse-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
}
}
對於具有額外對映屬性(例如 destination)的情況,您應該使用這些名稱。例如,考慮前面測試的不同版本,其中我們明確地將 uppercase 函式的輸入和輸出對映到 myInput 和 myOutput 繫結名稱
@Test
public void testMultipleFunctions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
SampleFunctionConfiguration.class))
.run(
"--spring.cloud.function.definition=uppercase;reverse",
"--spring.cloud.stream.bindings.uppercase-in-0.destination=myInput",
"--spring.cloud.stream.bindings.uppercase-out-0.destination=myOutput"
)) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage, "myInput");
inputDestination.send(inputMessage, "reverse-in-0");
Message<byte[]> outputMessage = outputDestination.receive(0, "myOutput");
assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());
outputMessage = outputDestination.receive(0, "reverse-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
}
}
測試繫結器和 PollableMessageSource
Spring Integration 測試繫結器還允許您在使用 PollableMessageSource 時編寫測試(有關詳細資訊,請參閱使用輪詢消費者)。
但需要理解的重要一點是,輪詢不是事件驅動的,PollableMessageSource 是一種公開操作以生成(輪詢)訊息(單數)的策略。您輪詢的頻率、使用的執行緒數或從何處輪詢(訊息佇列或檔案系統)完全取決於您;換句話說,您有責任配置 Poller 或 Threads 或訊息的實際來源。幸運的是,Spring 有大量的抽象來精確配置這些。
讓我們來看一個例子
@Test
public void samplePollingTest() {
ApplicationContext context = new SpringApplicationBuilder(SamplePolledConfiguration.class)
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false", "--spring.cloud.stream.pollable-source=myDestination");
OutputDestination destination = context.getBean(OutputDestination.class);
System.out.println("Message 1: " + new String(destination.receive().getPayload()));
System.out.println("Message 2: " + new String(destination.receive().getPayload()));
System.out.println("Message 3: " + new String(destination.receive().getPayload()));
}
@Import(TestChannelBinderConfiguration.class)
@EnableAutoConfiguration
public static class SamplePolledConfiguration {
@Bean
public ApplicationRunner poller(PollableMessageSource polledMessageSource, StreamBridge output, TaskExecutor taskScheduler) {
return args -> {
taskScheduler.execute(() -> {
for (int i = 0; i < 3; i++) {
try {
if (!polledMessageSource.poll(m -> {
String newPayload = ((String) m.getPayload()).toUpperCase();
output.send("myOutput", newPayload);
})) {
Thread.sleep(2000);
}
}
catch (Exception e) {
// handle failure
}
}
});
};
}
}
上面的(非常粗糙的)示例將以 2 秒間隔生成 3 條訊息,將它們傳送到 Source 的輸出目標,此繫結器將其傳送到 OutputDestination,我們從那裡檢索它們(用於任何斷言)。目前,它列印以下內容
Message 1: POLLED DATA
Message 2: POLLED DATA
Message 3: POLLED DATA
如您所見,資料是相同的。這是因為此繫結器定義了實際 MessageSource 的預設實現——使用 poll() 操作輪詢訊息的源。雖然對於大多數測試場景來說已經足夠,但在某些情況下您可能希望定義自己的 MessageSource。為此,只需在您的測試配置中配置一個型別為 MessageSource 的 bean,提供您自己的訊息源實現。
這是示例
@Bean
public MessageSource<?> source() {
return () -> new GenericMessage<>("My Own Data " + UUID.randomUUID());
}
呈現以下輸出;
Message 1: MY OWN DATA 1C180A91-E79F-494F-ABF4-BA3F993710DA
Message 2: MY OWN DATA D8F3A477-5547-41B4-9434-E69DA7616FEE
Message 3: MY OWN DATA 20BF2E64-7FF4-4CB6-A823-4053D30B5C74
不要將此 bean 命名為 messageSource,因為它會與 Spring Boot 出於無關原因提供的同名(不同型別)bean 衝突。 |
關於混合使用測試繫結器和常規中介軟體繫結器進行測試的特別說明
提供基於 Spring Integration 的測試繫結器是為了測試應用程式,而無需涉及實際的基於中介軟體的繫結器,例如 Kafka 或 RabbitMQ 繫結器。如上文所述,測試繫結器透過依賴記憶體中的 Spring Integration 通道幫助您快速驗證應用程式行為。當測試繫結器存在於測試類路徑中時,Spring Cloud Stream 將嘗試在需要繫結器進行通訊的所有測試目的中使用此繫結器。換句話說,您不能在同一模組中混合使用測試繫結器和常規中介軟體繫結器進行測試。在使用測試繫結器測試應用程式後,如果您想繼續使用實際的中介軟體繫結器進行進一步的整合測試,建議將使用實際繫結器的測試新增到單獨的模組中,以便這些測試可以與實際中介軟體建立適當的連線,而不是依賴測試繫結器提供的記憶體通道。
健康指標
Spring Cloud Stream 為繫結器提供了一個健康指標。它以 binders 名稱註冊,可以透過設定 management.health.binders.enabled 屬性來啟用或停用。
要啟用健康檢查,您首先需要透過包含其依賴項來同時啟用“web”和“actuator”(請參閱繫結視覺化和控制)
如果應用程式沒有明確設定 management.health.binders.enabled,則 management.health.defaults.enabled 將匹配為 true,並且繫結器健康指標將被啟用。如果您想完全停用健康指標,則必須將 management.health.binders.enabled 設定為 false。
您可以使用 Spring Boot 執行器健康端點來訪問健康指標 - /actuator/health。預設情況下,當您訪問上述端點時,您將只收到頂級應用程式狀態。為了從繫結器特定的健康指標中接收完整詳細資訊,您需要在應用程式中包含值為 ALWAYS 的屬性 management.endpoint.health.show-details。
健康指標是繫結器特定的,某些繫結器實現可能不一定會提供健康指標。
如果您想完全停用所有開箱即用的健康指標,並提供自己的健康指標,您可以透過將屬性 management.health.binders.enabled 設定為 false,然後在應用程式中提供自己的 HealthIndicator bean 來實現。在這種情況下,Spring Boot 的健康指標基礎設施仍將識別這些自定義 bean。即使您不停用繫結器健康指標,您仍然可以透過在開箱即用健康檢查的基礎上提供自己的 HealthIndicator bean 來增強健康檢查。
當同一個應用程式中有多個繫結器時,健康指標預設啟用,除非應用程式透過將 management.health.binders.enabled 設定為 false 來關閉它們。在這種情況下,如果使用者希望停用部分繫結器的健康檢查,則應在多繫結器配置的環境中將 management.health.binders.enabled 設定為 false。有關如何提供特定於環境的屬性的詳細資訊,請參閱連線到多個系統。
如果類路徑中存在多個繫結器,但並非所有繫結器都在應用程式中使用,這可能會在健康指標方面導致一些問題。關於如何執行健康檢查可能存在特定於實現的詳細資訊。例如,如果繫結器沒有註冊任何目標,Kafka 繫結器可能會將狀態判定為 DOWN。
我們舉一個具體的例子。假設您的類路徑中同時存在 Kafka 和 Kafka Streams 繫結器,但在應用程式程式碼中只使用 Kafka Streams 繫結器,即只使用 Kafka Streams 繫結器提供繫結。由於 Kafka 繫結器未使用,並且它有特定的檢查來檢視是否註冊了任何目標,因此繫結器健康檢查將失敗。頂級應用程式健康檢查狀態將報告為 DOWN。在這種情況下,您可以簡單地從應用程式中刪除 Kafka 繫結器的依賴項,因為您沒有使用它。
示例
有關 Spring Cloud Stream 示例,請參閱 GitHub 上的 spring-cloud-stream-samples 倉庫。
在 CloudFoundry 上部署流應用程式
在 CloudFoundry 上,服務通常透過一個名為 VCAP_SERVICES 的特殊環境變數暴露。
在配置繫結器連線時,您可以使用環境變數中的值,如 dataflow Cloud Foundry Server 文件中所述。
繫結器實現
以下是可用繫結器實現的列表
如前所述,繫結器抽象也是框架的擴充套件點之一。因此,如果您在上述列表中找不到合適的繫結器,您可以在 Spring Cloud Stream 之上實現自己的繫結器。在《如何從零開始建立 Spring Cloud Stream Binder》一文中,一位社群成員詳細記錄了實現自定義繫結器所需的步驟,並提供了示例。這些步驟也在實現自定義繫結器部分中突出顯示。