Spring Cloud Stream Binder for Apache Pulsar
Spring for Apache Pulsar 為 Spring Cloud Stream 提供了一個 Binder,我們可以用它來構建基於釋出/訂閱模式的事件驅動微服務。在本節中,我們將介紹此 Binder 的基本細節。
用法
我們需要在您的應用程式中包含以下依賴項才能使用 Apache Pulsar Binder for Spring Cloud Stream。
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}
概覽
Spring Cloud Stream Binder for Apache Pulsar 允許應用程式專注於業務邏輯,而不是處理管理和維護 Pulsar 的底層細節。該 Binder 為應用程式開發人員處理所有這些細節。Spring Cloud Stream 基於 Spring Cloud Function 帶來了強大的程式設計模型,允許應用程式開發人員使用函式式風格編寫複雜的事件驅動應用程式。應用程式可以從與中介軟體無關的方式開始,然後透過 Spring Boot 配置屬性將 Pulsar 主題對映為 Spring Cloud Stream 中的目標。Spring Cloud Stream 構建在 Spring Boot 之上,使用 Spring Cloud Stream 編寫事件驅動微服務時,您本質上是在編寫一個 Boot 應用程式。這是一個簡單的 Spring Cloud Stream 應用程式。
@SpringBootApplication
public class SpringPulsarBinderSampleApp {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
}
@Bean
public Supplier<Time> timeSupplier() {
return () -> new Time(String.valueOf(System.currentTimeMillis()));
}
@Bean
public Function<Time, EnhancedTime> timeProcessor() {
return (time) -> {
EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
return enhancedTime;
};
}
@Bean
public Consumer<EnhancedTime> timeLogger() {
return (time) -> this.logger.info("SINK: {}", time);
}
record Time(String time) {
}
record EnhancedTime(Time time, String extra) {
}
}
上面的示例應用程式是一個完整的 Spring Boot 應用程式,值得解釋一下。然而,初看起來,您會發現這只是普通的 Java 程式碼和一些 Spring 及 Spring Boot 註解。我們這裡有三個 Bean
方法 - 一個 java.util.function.Supplier
,一個 java.util.function.Function
,最後是一個 java.util.function.Consumer
。Supplier 產生當前時間的毫秒值,函式接收此時間並新增一些隨機資料對其進行增強,然後 consumer 記錄增強後的時間。
為簡潔起見,我們省略了所有 import,但整個應用程式中沒有任何 Spring Cloud Stream 特定的內容。它是如何成為一個與 Apache Pulsar 互動的 Spring Cloud Stream 應用程式的呢?您必須在應用程式中包含上述 Binder 依賴項。一旦添加了該依賴項,您必須提供以下配置屬性。
spring:
cloud:
function:
definition: timeSupplier;timeProcessor;timeLogger;
stream:
bindings:
timeProcessor-in-0:
destination: timeSupplier-out-0
timeProcessor-out-0:
destination: timeProcessor-out-0
timeLogger-in-0:
destination: timeProcessor-out-0
有了這些,上面的 Spring Boot 應用程式就變成了一個基於 Spring Cloud Stream 的端到端事件驅動應用程式。由於類路徑中存在 Pulsar Binder,應用程式將與 Apache Pulsar 互動。如果應用程式中只有一個函式,則無需告知 Spring Cloud Stream 啟用該函式進行執行,因為它預設就會這樣做。如果應用程式中有多個這樣的函式,如我們的示例所示,我們需要指示 Spring Cloud Stream 啟用哪些函式。在我們的例子中,我們需要所有函式都啟用,我們透過 spring.cloud.function.definition
屬性來實現。Bean 名稱預設成為 Spring Cloud Stream 繫結名稱的一部分。繫結是 Spring Cloud Stream 中的一個基本抽象概念,框架透過它與中介軟體目標進行通訊。Spring Cloud Stream 所做的一切幾乎都發生在一個具體的繫結上。Supplier 只有一個輸出繫結;函式有輸入和輸出繫結;Consumer 只有輸入繫結。讓我們以我們的 supplier Bean - timeSupplier
為例。此 supplier 的預設繫結名稱將是 timeSupplier-out-0
。類似地,timeProcessor
函式的預設繫結名稱在入站是 timeProcessor-in-0
,在出站是 timeProcessor-out-0
。有關如何更改預設繫結名稱的詳細資訊,請參閱 Spring Cloud Stream 參考文件。在大多數情況下,使用預設繫結名稱就足夠了。我們如上所示在繫結名稱上設定目標。如果未提供目標,則繫結名稱將成為目標的取值,如 timeSupplier-out-0
的情況。
當執行上述應用程式時,您應該會看到 supplier 每秒執行一次,然後由函式消費並增強時間,再由 logger consumer 消費。
基於 Binder 的應用程式中的訊息轉換
在上面的示例應用程式中,我們沒有為訊息轉換提供任何 schema 資訊。這是因為,Spring Cloud Stream 預設使用其自身的訊息轉換機制,該機制利用 Spring Framework 透過 Spring Messaging 專案建立的訊息支援。除非另有指定,Spring Cloud Stream 在入站和出站繫結上都使用 application/json
作為訊息轉換的 content-type
。在出站,資料被序列化為 byte[]
,然後 Pulsar Binder 使用 Schema.BYTES
將其透過線路傳送到 Pulsar 主題。類似地,在入站,資料從 Pulsar 主題作為 byte[]
消費,然後使用適當的訊息轉換器轉換為目標型別。
使用 Pulsar Schema 在 Pulsar 中進行原生轉換
儘管預設使用框架提供的訊息轉換,但 Spring Cloud Stream 允許每個 Binder 決定如何轉換訊息。如果應用程式選擇此路徑,Spring Cloud Stream 將避免使用任何 Spring 提供的訊息轉換功能,並直接傳遞接收或生成的資料。Spring Cloud Stream 中的此功能被稱為生產者端的原生編碼和消費者端的原生解碼。這意味著編碼和解碼在目標中介軟體上原生髮生,在我們的例子中是在 Apache Pulsar 上。對於上述應用程式,我們可以使用以下配置繞過框架轉換並使用原生編碼和解碼。
spring:
cloud:
stream:
bindings:
timeSupplier-out-0:
producer:
use-native-encoding: true
timeProcessor-in-0:
destination: timeSupplier-out-0
consumer:
use-native-decoding: true
timeProcessor-out-0:
destination: timeProcessor-out-0
producer:
use-native-encoding: true
timeLogger-in-0:
destination: timeProcessor-out-0
consumer:
use-native-decoding: true
pulsar:
bindings:
timeSupplier-out-0:
producer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-in-0:
consumer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-out-0:
producer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
timeLogger-in-0:
consumer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
在生產者端啟用原生編碼的屬性是核心 Spring Cloud Stream 中的繫結級別屬性。您可以在生產者繫結上設定它 - spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding
並將其設定為 true
。類似地,對於消費者繫結,使用 - spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding
並將其設定為 true
。如果決定使用原生編碼和解碼,對於 Pulsar,我們需要設定相應的 schema 和底層訊息型別資訊。此資訊作為擴充套件繫結屬性提供。如您在上述配置中所見,屬性是 - 用於 schema 資訊的 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type
和用於實際目標型別的 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type
。如果訊息同時包含 key 和 value,您可以使用 message-key-type
和 message-value-type
來指定它們的目標型別。
省略 schema-type 屬性時,將查閱任何已配置的自定義 schema 對映。 |
訊息頭轉換
每條訊息通常都有頭資訊,需要在訊息在 Pulsar 和 Spring Messaging 之間透過 Spring Cloud Stream 輸入和輸出繫結傳輸時攜帶。為了支援這種傳輸,框架處理必要的訊息頭轉換。
自定義頭對映器
Pulsar Binder 配置了預設的頭對映器,可以透過提供自己的 PulsarHeaderMapper
bean 來覆蓋。
在以下示例中,配置了一個 JSON 頭對映器,它:
-
對映所有入站頭(除了鍵為“top”或“secret”的頭)
-
映射出站頭(除了鍵為“id”、“timestamp”或“userId”的頭)
-
僅信任“com.acme”包中的物件進行出站反序列化
-
使用簡單的
toString()
編碼/解碼任何“com.acme.Money”頭值
@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
return JsonPulsarHeaderMapper.builder()
.inboundPatterns("!top", "!secret", "*")
.outboundPatterns("!id", "!timestamp", "!userId", "*")
.trustedPackages("com.acme")
.toStringClasses("com.acme.Money")
.build();
}
在 Binder 中使用 Pulsar 屬性
該 Binder 使用 Spring for Apache Pulsar 框架的基本元件來構建其生產者和消費者繫結。由於基於 Binder 的應用程式是 Spring Boot 應用程式,Binder 預設使用 Spring Boot 對 Spring for Apache Pulsar 的自動配置。因此,核心框架級別提供的所有 Pulsar Spring Boot 屬性也可以透過 Binder 使用。例如,您可以使用帶有字首 spring.pulsar.producer…
、spring.pulsar.consumer…
等的屬性。此外,您還可以設定這些 Pulsar 屬性在 Binder 級別。例如,以下配置也將生效 - spring.cloud.stream.pulsar.binder.producer…
或 spring.cloud.stream.pulsar.binder.consumer…
。
以上兩種方法都可以,但使用這樣的屬性時,它會應用於整個應用程式。如果應用程式中有多個函式,它們都會獲得相同的屬性。您還可以將這些 Pulsar 屬性設定在擴充套件繫結屬性級別來解決此問題。擴充套件繫結屬性應用於繫結本身。例如,如果您有一個輸入繫結和一個輸出繫結,並且兩者都需要單獨的 Pulsar 屬性集,則必須在擴充套件繫結上設定它們。生產者繫結的模式是 spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…
。類似地,對於消費者繫結,模式是 spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…
。透過這種方式,您可以在同一應用程式中為不同的繫結應用不同的 Pulsar 屬性集。
擴充套件繫結屬性的優先順序最高。Binder 中屬性應用的優先順序順序是:擴充套件繫結屬性 → Binder 屬性 → Spring Boot 屬性
(從高到低)。
Pulsar Binder 屬性資源
以下是一些可供查閱的資源,以瞭解有關 Pulsar Binder 可用屬性的更多資訊。
Pulsar 生產者繫結配置。這些屬性需要 spring.cloud.stream.bindings.<binding-name>.producer
字首。Spring Boot 提供的所有 Pulsar 生產者屬性 也可透過此配置類使用。
Pulsar 消費者繫結配置。這些屬性需要 spring.cloud.stream.bindings.<binding-name>.consumer
字首。Spring Boot 提供的所有 Pulsar 消費者屬性 也可透過此配置類使用。
有關常見的 Pulsar Binder 特定配置屬性,請參閱此處。這些屬性需要 spring.cloud.stream.pulsar.binder
字首。上述指定的生產者和消費者屬性(包括 Spring Boot 的屬性)可以在 Binder 級別使用 spring.cloud.stream.pulsar.binder.producer
或 spring.cloud.stream.pulsar.binder.consumer
字首使用。
Pulsar 主題供應器
Spring Cloud Stream Binder for Apache Pulsar 附帶一個開箱即用的 Pulsar 主題供應器。執行應用程式時,如果必要的主題不存在,Pulsar 將為您建立主題。然而,這只是一個基本的非分割槽主題,如果您想要建立分割槽主題等高階功能,可以依賴 Binder 中的主題供應器。Pulsar 主題供應器使用框架中的 PulsarAdministration
,後者使用 PulsarAdminBuilder
。因此,除非您在預設伺服器和埠上執行 Pulsar,否則需要設定 spring.pulsar.administration.service-url
屬性。
建立主題時指定分割槽計數
建立主題時,可以透過兩種方式設定分割槽計數。首先,您可以在 Binder 級別使用屬性 spring.cloud.stream.pulsar.binder.partition-count
進行設定。正如我們上面看到的,這樣做會使應用程式建立的所有主題繼承此屬性。假設您希望在繫結級別對設定分割槽進行精細控制。在這種情況下,您可以使用格式 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count
按繫結設定 partition-count
屬性。透過這種方式,同一應用程式中不同函式建立的各種主題將根據應用程式需求擁有不同的分割槽。