Spring Cloud Stream Binder for Apache Pulsar
Spring for Apache Pulsar為Spring Cloud Stream提供了一個繫結器,我們可以用它來使用釋出-訂閱正規化構建事件驅動的微服務。在本節中,我們將詳細介紹這個繫結器的基本細節。
用法
我們需要在您的應用程式中包含以下依賴項,以使用Spring Cloud Stream的Apache Pulsar繫結器。
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}
概述
Spring Cloud Stream for Apache Pulsar的繫結器允許應用程式專注於業務邏輯,而不是處理管理和維護Pulsar的底層細節。繫結器會為應用程式開發人員處理所有這些細節。Spring Cloud Stream帶來了基於Spring Cloud Function的強大程式設計模型,允許應用程式開發人員使用函式式風格編寫複雜的事件驅動應用程式。應用程式可以從與中介軟體無關的方式開始,然後透過Spring Boot配置屬性將Pulsar主題對映為Spring Cloud Stream中的目標。Spring Cloud Stream構建在Spring Boot之上,當使用Spring Cloud Stream編寫事件驅動的微服務時,您本質上是在編寫一個Boot應用程式。下面是一個簡單的Spring Cloud Stream應用程式。
@SpringBootApplication
public class SpringPulsarBinderSampleApp {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
public static void main(String[] args) {
SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
}
@Bean
public Supplier<Time> timeSupplier() {
return () -> new Time(String.valueOf(System.currentTimeMillis()));
}
@Bean
public Function<Time, EnhancedTime> timeProcessor() {
return (time) -> {
EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
return enhancedTime;
};
}
@Bean
public Consumer<EnhancedTime> timeLogger() {
return (time) -> this.logger.info("SINK: {}", time);
}
record Time(String time) {
}
record EnhancedTime(Time time, String extra) {
}
}
上述示例應用程式是一個成熟的Spring Boot應用程式,值得做一些解釋。然而,初次檢視,您會發現這只是普通的Java以及一些Spring和Spring Boot註解。我們這裡有三個Bean方法:一個java.util.function.Supplier,一個java.util.function.Function,以及一個java.util.function.Consumer。Supplier生成當前時間的毫秒數,Function接收這個時間並新增一些隨機資料進行增強,然後Consumer記錄增強後的時間。
為了簡潔起見,我們省略了所有匯入,但整個應用程式中沒有任何Spring Cloud Stream特有的內容。它如何成為一個與Apache Pulsar互動的Spring Cloud Stream應用程式呢?您必須在應用程式中包含上述繫結器依賴。一旦添加了該依賴,您必須提供以下配置屬性。
spring:
cloud:
function:
definition: timeSupplier;timeProcessor;timeLogger;
stream:
bindings:
timeProcessor-in-0:
destination: timeSupplier-out-0
timeProcessor-out-0:
destination: timeProcessor-out-0
timeLogger-in-0:
destination: timeProcessor-out-0
有了這些,上面的Spring Boot應用程式就變成了一個基於Spring Cloud Stream的端到端事件驅動應用程式。因為我們類路徑中包含了Pulsar繫結器,所以應用程式會與Apache Pulsar互動。如果應用程式中只有一個函式,那麼我們不需要告訴Spring Cloud Stream啟用該函式執行,因為它預設會這樣做。如果應用程式中有多個這樣的函式,就像我們的示例一樣,我們需要指示Spring Cloud Stream我們想要啟用哪些函式。在我們的例子中,我們需要啟用所有這些函式,我們透過spring.cloud.function.definition屬性來實現。預設情況下,Bean名稱成為Spring Cloud Stream繫結名稱的一部分。繫結是Spring Cloud Stream中一個基本的抽象概念,框架透過它與中介軟體目標通訊。Spring Cloud Stream所做的幾乎所有事情都發生在具體的繫結之上。一個供應器(supplier)只有一個輸出繫結;函式(function)有輸入和輸出繫結;消費者(consumer)只有一個輸入繫結。以我們的供應器bean——timeSupplier為例。這個供應器的預設繫結名稱將是timeSupplier-out-0。類似地,timeProcessor函式的預設繫結名稱在入站(inbound)方面是timeProcessor-in-0,在出站(outbound)方面是timeProcessor-out-0。有關如何更改預設繫結名稱的詳細資訊,請參閱Spring Cloud Stream參考文件。在大多數情況下,使用預設繫結名稱就足夠了。我們如上所示,在繫結名稱上設定了目標。如果沒有提供目標,則繫結名稱成為目標的值,就像timeSupplier-out-0的情況一樣。
當執行上述應用程式時,您應該會看到供應器每秒執行一次,然後由函式消費並增強由日誌消費者消費的時間。
基於繫結器的應用程式中的訊息轉換
在上面的示例應用程式中,我們沒有提供訊息轉換的任何Schema資訊。這是因為,預設情況下,Spring Cloud Stream使用透過Spring Messaging專案在Spring Framework中建立的訊息支援來使用其訊息轉換機制。除非另有說明,Spring Cloud Stream在入站和出站繫結上都使用application/json作為訊息轉換的content-type。在出站端,資料被序列化為byte[],然後Pulsar繫結器使用Schema.BYTES透過網路將其傳送到Pulsar主題。類似地,在入站端,資料從Pulsar主題中以byte[]形式消費,然後使用適當的訊息轉換器轉換為目標型別。
在Pulsar中使用Pulsar Schema進行原生轉換
儘管預設是使用框架提供的訊息轉換,但Spring Cloud Stream 允許每個繫結器決定訊息應該如何轉換。如果應用程式選擇此路徑,Spring Cloud Stream 將避免使用任何 Spring 提供的訊息轉換功能,並直接傳遞它接收或生成的資料。Spring Cloud Stream 中的此功能在生產者端稱為原生編碼,在消費者端稱為原生解碼。這意味著編碼和解碼是在目標中介軟體(在我們的例子中是 Apache Pulsar)上原生髮生的。對於上述應用程式,我們可以使用以下配置來繞過框架轉換並使用原生編碼和解碼。
spring:
cloud:
stream:
bindings:
timeSupplier-out-0:
producer:
use-native-encoding: true
timeProcessor-in-0:
destination: timeSupplier-out-0
consumer:
use-native-decoding: true
timeProcessor-out-0:
destination: timeProcessor-out-0
producer:
use-native-encoding: true
timeLogger-in-0:
destination: timeProcessor-out-0
consumer:
use-native-decoding: true
pulsar:
bindings:
timeSupplier-out-0:
producer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-in-0:
consumer:
schema-type: JSON
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
timeProcessor-out-0:
producer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
timeLogger-in-0:
consumer:
schema-type: AVRO
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
在生產者端啟用原生編碼的屬性是核心Spring Cloud Stream的繫結級別屬性。您可以在生產者繫結上設定它——spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding並將其設定為true。同樣,對於消費者繫結,使用spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding並將其設定為true。如果我們決定使用原生編碼和解碼,對於Pulsar,我們需要設定相應的schema和底層訊息型別資訊。此資訊作為擴充套件繫結屬性提供。如您在上述配置中所示,屬性是用於schema資訊的spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type和用於實際目標型別的spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type。如果訊息中同時包含鍵和值,您可以使用message-key-type和message-value-type來指定它們的目標型別。
| 當省略 `schema-type` 屬性時,將查閱任何已配置的自定義 Schema 對映。 |
訊息頭轉換
每個訊息通常都包含需要在 Pulsar 和 Spring Messaging 之間透過 Spring Cloud Stream 輸入和輸出繫結進行傳輸的頭部資訊。為了支援這種傳輸,框架會處理必要的訊息頭部轉換。
自定義頭對映器
Pulsar 繫結器配置了預設的頭部對映器,可以透過提供您自己的 PulsarHeaderMapper bean 來覆蓋。
在以下示例中,配置了一個 JSON 頭部對映器,它:
-
對映所有入站頭(除了鍵為“top”或“secret”的頭)
-
映射出站頭(除了鍵為“id”、“timestamp”或“userId”的頭)
-
對於出站反序列化,僅信任“com.acme”包中的物件
-
使用簡單的
toString()編碼對任何“com.acme.Money”頭值進行序列化/反序列化
@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
return JsonPulsarHeaderMapper.builder()
.inboundPatterns("!top", "!secret", "*")
.outboundPatterns("!id", "!timestamp", "!userId", "*")
.trustedPackages("com.acme")
.toStringClasses("com.acme.Money")
.build();
}
在繫結器中使用 Pulsar 屬性
繫結器使用 Spring for Apache Pulsar 框架中的基本元件來構建其生產者和消費者繫結。由於基於繫結器的應用程式是 Spring Boot 應用程式,因此繫結器預設使用 Spring for Apache Pulsar 的 Spring Boot 自動配置。因此,核心框架級別上所有可用的 Pulsar Spring Boot 屬性也透過繫結器提供。例如,您可以使用以 spring.pulsar.producer…、spring.pulsar.consumer… 等為字首的屬性。此外,您還可以在繫結器級別設定這些 Pulsar 屬性。例如,spring.cloud.stream.pulsar.binder.producer… 或 spring.cloud.stream.pulsar.binder.consumer… 也能正常工作。
以上兩種方法都可行,但當使用這些屬性時,它們會應用於整個應用程式。如果應用程式中有多個函式,它們都將獲得相同的屬性。您還可以透過擴充套件繫結屬性級別來設定這些 Pulsar 屬性以解決此問題。擴充套件繫結屬性應用於繫結本身。例如,如果您有一個輸入繫結和一個輸出繫結,並且它們都要求一組獨立的 Pulsar 屬性,則必須在擴充套件繫結上設定它們。生產者繫結的模式是 spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…。同樣,對於消費者繫結,模式是 spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…。透過這種方式,您可以為同一應用程式中的不同繫結應用一組獨立的 Pulsar 屬性。
擴充套件繫結屬性具有最高的優先順序。繫結器中應用屬性的優先順序順序是 擴充套件繫結屬性 → 繫結器屬性 → Spring Boot 屬性(從高到低)。
Pulsar 繫結器屬性資源
以下是一些可以用來查詢更多 Pulsar 繫結器可用屬性的資源。
Pulsar 生產者繫結配置。這些屬性需要 spring.cloud.stream.bindings.<binding-name>.producer 字首。所有 Spring Boot 提供的 Pulsar 生產者屬性 也透過此配置類提供。
Pulsar消費者繫結配置。這些屬性需要 spring.cloud.stream.bindings.<binding-name>.consumer 字首。所有 Spring Boot 提供的 Pulsar消費者屬性 也透過此配置類提供。
有關常見的 Pulsar 繫結器特定配置屬性,請參閱 此處。這些屬性需要 spring.cloud.stream.pulsar.binder 字首。上述指定的生產者和消費者屬性(包括 Spring Boot 屬性)可以在繫結器中使用 spring.cloud.stream.pulsar.binder.producer 或 spring.cloud.stream.pulsar.binder.consumer 字首。
Pulsar 主題配置器
Spring Cloud Stream for Apache Pulsar的繫結器提供了一個開箱即用的Pulsar主題配置器。當執行應用程式時,如果缺少必要的主題,Pulsar將為您建立主題。然而,這是一個基本的非分割槽主題,如果您想要高階功能,例如建立分割槽主題,您可以依賴繫結器中的主題配置器。Pulsar主題配置器使用框架中的PulsarAdministration,它又使用PulsarAdminBuilder。因此,您需要設定spring.pulsar.administration.service-url屬性,除非您在預設伺服器和埠上執行Pulsar。
建立主題時指定分割槽數量
建立主題時,您可以透過兩種方式設定分割槽數量。首先,您可以在繫結器級別使用屬性 spring.cloud.stream.pulsar.binder.partition-count 進行設定。如上所述,這樣做將使應用程式建立的所有主題都繼承此屬性。如果您希望在繫結級別對設定分割槽進行精細控制,則可以使用 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count 格式為每個繫結設定 partition-count 屬性。這樣,同一應用程式中不同函式建立的各種主題將根據應用程式要求具有不同的分割槽。