Spring Cloud Stream 模式登錄檔

簡介

當組織擁有基於訊息的釋出/訂閱架構,並且多個生產者和消費者微服務相互通訊時,這些微服務通常需要就基於模式的契約達成一致。當這種模式需要演進以適應新的業務需求時,現有元件仍然需要繼續工作。Spring Cloud Stream 提供對獨立模式登錄檔伺服器的支援,透過該伺服器,上述模式可以被註冊並由應用程式使用。Spring Cloud Stream 模式登錄檔支援還為基於 Avro 的模式登錄檔客戶端提供支援,這些客戶端本質上提供訊息轉換器,與模式登錄檔通訊以在訊息轉換期間協調模式。Spring Cloud Stream 提供的模式演進支援適用於上述獨立模式登錄檔以及 Confluent 提供的專門用於 Apache Kafka 的模式登錄檔。

Spring Cloud Stream 模式登錄檔概述

Spring Cloud Stream 模式登錄檔提供模式演進支援,以便資料可以隨著時間演進,並且仍然適用於舊的或新的生產者和消費者,反之亦然。大多數序列化模型,尤其是那些旨在跨不同平臺和語言實現可移植性的模型,都依賴於描述資料如何以二進位制有效負載序列化的模式。為了序列化資料然後解釋它,傳送方和接收方都必須能夠訪問描述二進位制格式的模式。在某些情況下,模式可以從序列化時的有效負載型別或反序列化時的目標型別推斷出來。然而,許多應用程式受益於能夠訪問描述二進位制資料格式的顯式模式。模式登錄檔允許您以文字格式(通常是 JSON)儲存模式資訊,並使該資訊可供需要它以二進位制格式接收和傳送資料的各種應用程式訪問。模式可以作為由以下部分組成的元組進行引用:

  • 作為模式邏輯名稱的主題

  • 模式版本

  • 模式格式,描述資料的二進位制格式

Spring Cloud Stream 模式登錄檔提供以下元件:

  • 獨立模式登錄檔伺服器

    By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
  • 能夠透過與模式登錄檔通訊進行訊息編組的模式登錄檔客戶端。

    Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.

模式登錄檔客戶端

與模式登錄檔伺服器互動的客戶端抽象是 SchemaRegistryClient 介面,其結構如下:

public interface SchemaRegistryClient {

    SchemaRegistrationResponse register(String subject, String format, String schema);

    String fetch(SchemaReference schemaReference);

    String fetch(Integer id);

}

Spring Cloud Stream 提供開箱即用的實現,用於與其自己的模式伺服器互動以及與 Confluent 模式登錄檔互動。

Spring Cloud Stream 模式登錄檔的客戶端可以透過使用 @EnableSchemaRegistryClient 進行配置,如下所示:

@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {

}
預設轉換器經過最佳化,不僅快取來自遠端伺服器的模式,還快取 parse()toString() 方法,這兩個方法非常耗費資源。因此,它使用一個不快取響應的 DefaultSchemaRegistryClient。如果您打算更改預設行為,可以直接在程式碼中使用客戶端並將其覆蓋為所需的結果。為此,您必須在應用程式屬性中新增屬性 spring.cloud.stream.schemaRegistryClient.cached=true

模式登錄檔客戶端屬性

模式登錄檔客戶端支援以下屬性

spring.cloud.stream.schemaRegistryClient.endpoint

模式伺服器的位置。設定此項時,請使用完整的 URL,包括協議 (httphttps)、埠和上下文路徑。

預設值

localhost:8990/

spring.cloud.stream.schemaRegistryClient.cached

客戶端是否應快取模式伺服器響應。通常設定為 false,因為快取發生在訊息轉換器中。使用模式登錄檔客戶端的客戶端應將此設定為 true

預設值

Avro 模式登錄檔客戶端訊息轉換器

對於在應用程式上下文中註冊了 SchemaRegistryClient bean 的應用程式,Spring Cloud Stream 會自動配置一個 Apache Avro 訊息轉換器用於模式管理。這簡化了模式演進,因為接收訊息的應用程式可以輕鬆訪問寫入者模式,該模式可以與其自己的讀取者模式進行協調。

對於出站訊息,如果繫結內容型別設定為 application/*+avro,則 MessageConverter 將被啟用,如以下示例所示:

spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro

在出站轉換期間,訊息轉換器嘗試推斷每個出站訊息的模式(基於其型別),並使用 SchemaRegistryClient 將其註冊到主題(基於有效負載型別)。如果已經找到相同的模式,則檢索對其的引用。如果未找到,則註冊模式,並提供新的版本號。訊息將使用以下方案的 contentType 標頭髮送:application/[prefix].[subject].v[version]+avro,其中 prefix 可配置,subject 從有效負載型別推斷。

例如,型別為 User 的訊息可以作為二進位制有效負載傳送,其內容型別為 application/vnd.user.v2+avro,其中 user 是主題,2 是版本號。

接收訊息時,轉換器從傳入訊息的標頭推斷模式引用,並嘗試檢索它。該模式用作反序列化過程中的寫入者模式。

Avro 模式登錄檔訊息轉換器屬性

如果透過設定 spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro 啟用了基於 Avro 的模式登錄檔客戶端,則可以透過設定以下屬性來自定義註冊行為。

spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled

如果要讓轉換器使用反射從 POJO 推斷模式,請啟用此項。

預設值:false

spring.cloud.stream.schema.avro.readerSchema

Avro 透過檢視寫入者模式(原始有效負載)和讀取者模式(您的應用程式有效負載)來比較模式版本。有關更多資訊,請參閱 Avro 文件。如果設定,此項將覆蓋模式伺服器上的任何查詢,並使用本地模式作為讀取者模式。預設值:null

spring.cloud.stream.schema.avro.schemaLocations

將此屬性中列出的任何 .avsc 檔案註冊到模式伺服器。

預設值:empty

spring.cloud.stream.schema.avro.prefix

在 Content-Type 標頭中使用的字首。

預設值:vnd

spring.cloud.stream.schema.avro.subjectNamingStrategy

確定用於在模式登錄檔中註冊 Avro 模式的主題名稱。有兩種實現可用:org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy,其中主題是模式名稱;以及 org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy,它使用 Avro 模式名稱空間和名稱返回完全限定的主題。可以透過實現 org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy 建立自定義策略。

預設值:org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy

spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer

忽略任何模式登錄檔通訊。對於測試目的很有用,這樣在執行單元測試時,它不會不必要地嘗試連線到模式登錄檔伺服器。

預設值:false

Apache Avro 訊息轉換器

Spring Cloud Stream 透過其 spring-cloud-stream-schema-registry-client 模組提供對基於模式的訊息轉換器的支援。目前,對於基於模式的訊息轉換器,開箱即用僅支援 Apache Avro 序列化格式,未來版本將新增更多格式。

spring-cloud-stream-schema-registry-client 模組包含兩種型別的訊息轉換器,可用於 Apache Avro 序列化:

  • 使用序列化或反序列化物件的類資訊或在啟動時已知位置的模式的轉換器。

  • 使用模式登錄檔的轉換器。它們在執行時查詢模式並隨著域物件的演進而動態註冊新模式。

支援模式的轉換器

AvroSchemaMessageConverter 支援透過使用預定義模式或使用類中可用的模式資訊(透過反射或包含在 SpecificRecord 中)序列化和反序列化訊息。如果您提供自定義轉換器,則不會建立預設的 AvroSchemaMessageConverter bean。以下示例顯示了自定義轉換器:

要使用自定義轉換器,您可以將其新增到應用程式上下文,並可選擇指定一個或多個 MimeType 以將其關聯。預設的 MimeTypeapplication/avro

如果轉換的目標型別是 GenericRecord,則必須設定模式。

以下示例展示瞭如何在接收器應用程式中配置轉換器,透過註冊沒有預定義模式的 Apache Avro MessageConverter。在此示例中,請注意 mime 型別值為 avro/bytes,而不是預設的 application/avro

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

相反,以下應用程式註冊了一個帶有預定義模式(在類路徑中找到)的轉換器

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

模式登錄檔伺服器

Spring Cloud Stream 提供了一個模式登錄檔伺服器實現。要使用它,您可以下載最新的 spring-cloud-stream-schema-registry-server 版本並將其作為獨立應用程式執行。

wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar

您可以將模式登錄檔嵌入到現有的 Spring Boot Web 應用程式中。為此,請將 spring-cloud-stream-schema-registry-core 工件新增到您的專案,並使用 @EnableSchemaRegistryServer 註解,該註解會將模式登錄檔伺服器 REST 控制器新增到您的應用程式中。以下示例顯示了一個啟用模式登錄檔的 Spring Boot 應用程式:

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}

spring.cloud.stream.schema.server.path 屬性可用於控制模式伺服器的根路徑(尤其是在嵌入到其他應用程式中時)。spring.cloud.stream.schema.server.allowSchemaDeletion 布林屬性啟用模式刪除。預設情況下,此功能是停用的。

模式登錄檔伺服器使用關係資料庫來儲存模式。預設情況下,它使用嵌入式資料庫。您可以使用 Spring Boot SQL 資料庫和 JDBC 配置選項來自定義模式儲存。

模式登錄檔伺服器 API

模式登錄檔伺服器 API 包含以下操作

註冊新模式

要註冊新模式,請向 / 端點發送 POST 請求。

/ 接受包含以下欄位的 JSON 有效負載

  • subject:模式主題

  • format:模式格式

  • definition:模式定義

其響應是 JSON 格式的模式物件,包含以下欄位

  • id:模式 ID

  • subject:模式主題

  • format:模式格式

  • version:模式版本

  • definition:模式定義

按主題、格式和版本檢索現有模式

要透過主題、格式和版本檢索現有模式,請向 {subject}/{format}/{version} 端點發送 GET 請求。

其響應是 JSON 格式的模式物件,包含以下欄位

  • id:模式 ID

  • subject:模式主題

  • format:模式格式

  • version:模式版本

  • definition:模式定義

按主題和格式檢索現有模式

要透過主題和格式檢索現有模式,請向 /subject/format 端點發送 GET 請求。

其響應是模式列表,每個模式物件都是 JSON 格式,包含以下欄位

  • id:模式 ID

  • subject:模式主題

  • format:模式格式

  • version:模式版本

  • definition:模式定義

按 ID 檢索現有模式

要按 ID 檢索模式,請向 /schemas/{id} 端點發送 GET 請求。

其響應是 JSON 格式的模式物件,包含以下欄位

  • id:模式 ID

  • subject:模式主題

  • format:模式格式

  • version:模式版本

  • definition:模式定義

按主題、格式和版本刪除模式

要刪除由其主題、格式和版本標識的模式,請向 {subject}/{format}/{version} 端點發送 DELETE 請求。

按 ID 刪除模式

要按 ID 刪除模式,請向 /schemas/{id} 端點發送 DELETE 請求。

按主題刪除模式

DELETE /{subject}

按主題刪除現有模式。

此說明僅適用於 Spring Cloud Stream 1.1.0.RELEASE 的使用者。Spring Cloud Stream 1.1.0.RELEASE 使用表名 schema 來儲存 Schema 物件。Schema 在許多資料庫實現中都是關鍵字。為了避免將來發生任何衝突,從 1.1.1.RELEASE 開始,我們選擇使用 SCHEMA_REPOSITORY 作為儲存表的名稱。任何升級的 Spring Cloud Stream 1.1.0.RELEASE 使用者都應該在升級之前將其現有模式遷移到新表。

使用 Confluent 的模式登錄檔

預設配置建立了一個 DefaultSchemaRegistryClient bean。如果您想使用 Confluent 模式登錄檔,則需要建立一個 ConfluentSchemaRegistryClient 型別的 bean,該 bean 會取代框架預設配置的 bean。以下示例展示瞭如何建立這樣的 bean:

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
  ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
  client.setEndpoint(endpoint);
  return client;
}
ConfluentSchemaRegistryClient 針對 Confluent 平臺版本 4.0.0 進行了測試。

模式註冊與解析

為了更好地理解 Spring Cloud Stream 如何註冊和解析新模式以及它如何利用 Avro 模式比較功能,我們提供了兩個獨立的子章節:

模式註冊過程(序列化)

註冊過程的第一部分是從透過通道傳送的有效負載中提取模式。Avro 型別(例如 SpecificRecordGenericRecord)已經包含模式,可以立即從例項中檢索。對於 POJO,如果 spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled 屬性設定為 true(預設值),則會推斷模式。

一旦獲取到模式,轉換器就會從遠端伺服器載入其元資料(版本)。首先,它查詢本地快取。如果未找到結果,它會將資料提交到伺服器,伺服器會回覆版本資訊。轉換器總是快取結果,以避免為每個需要序列化的新訊息查詢模式伺服器的開銷。

有了模式版本資訊,轉換器將訊息的 contentType 標頭設定為攜帶版本資訊——例如:application/vnd.user.v1+avro

模式解析過程(反序列化)

當讀取包含版本資訊的訊息時(即,具有在 模式註冊過程(序列化) 中描述的方案的 contentType 標頭),轉換器會查詢模式伺服器以獲取訊息的寫入者模式。一旦找到傳入訊息的正確模式,它就會檢索讀取者模式,並透過使用 Avro 的模式解析支援,將其讀入讀取者定義(設定預設值和任何缺失的屬性)。

您應該理解寫入者模式(編寫訊息的應用程式)和讀取者模式(接收應用程式)之間的區別。我們建議花點時間閱讀 Avro 術語並理解該過程。Spring Cloud Stream 總是獲取寫入者模式以確定如何讀取訊息。如果您希望 Avro 的模式演進支援生效,則需要確保為您的應用程式正確設定了 readerSchema
© . This site is unofficial and not affiliated with VMware.