Spring Cloud Stream Schema Registry
引言
當組織擁有基於訊息傳遞的釋出/訂閱架構,並且多個生產者和消費者微服務相互通訊時,這些微服務通常需要就一個基於模式的契約達成一致。當需要演進此類模式以適應新的業務需求時,現有元件仍需繼續正常工作。Spring Cloud Stream 支援使用獨立的 schema registry 伺服器,透過該伺服器可以註冊模式並供應用程式使用。Spring Cloud Stream 的 schema registry 支援還提供了基於 Avro 的 schema registry 客戶端支援,這些客戶端實質上提供了訊息轉換器,透過與 schema registry 通訊來在訊息轉換過程中協調模式。Spring Cloud Stream 提供的模式演進支援既可與前面提到的獨立 schema registry 配合使用,也可與 Confluent 提供的專門用於 Apache Kafka 的 schema registry 配合使用。
Spring Cloud Stream Schema Registry 概覽
Spring Cloud Stream Schema Registry 支援模式演進,以便資料可以隨時間演變,同時仍與較舊或較新的生產者和消費者相容,反之亦然。大多數序列化模型,特別是那些旨在跨不同平臺和語言實現可移植性的模型,都依賴於描述資料如何在二進位制負載中序列化的模式。為了序列化資料然後對其進行解釋,傳送方和接收方都必須能夠訪問描述二進位制格式的模式。在某些情況下,模式可以在序列化時從負載型別推斷出來,或者在反序列化時從目標型別推斷出來。然而,許多應用程式得益於訪問描述二進位制資料格式的顯式模式。Schema registry 允許您以文字格式(通常是 JSON)儲存模式資訊,並使該資訊可供需要接收和傳送二進位制格式資料的各種應用程式訪問。模式可以作為一個包含以下元素的元組進行引用:
-
一個主題 (subject),它是模式的邏輯名稱
-
模式版本
-
模式格式,它描述了資料的二進位制格式
Spring Cloud Stream Schema Registry 提供以下元件:
-
獨立 Schema Registry 伺服器
By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
-
能夠透過與 Schema Registry 通訊進行訊息編組的 Schema Registry 客戶端。
Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.
Schema Registry 客戶端
用於與 schema registry 伺服器互動的客戶端抽象是 `SchemaRegistryClient` 介面,其結構如下:
public interface SchemaRegistryClient {
SchemaRegistrationResponse register(String subject, String format, String schema);
String fetch(SchemaReference schemaReference);
String fetch(Integer id);
}
Spring Cloud Stream 提供了與自身 schema 伺服器和與 Confluent Schema Registry 互動的開箱即用實現。
可以透過使用 `@EnableSchemaRegistryClient` 來配置 Spring Cloud Stream schema registry 的客戶端,如下所示:
@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {
}
預設轉換器經過最佳化,不僅快取遠端伺服器的模式,還快取 `parse()` 和 `toString()` 方法,這些方法開銷很大。因此,它使用一個不快取響應的 `DefaultSchemaRegistryClient`。如果您打算改變預設行為,可以直接在程式碼中使用該客戶端,並將其覆蓋以達到預期結果。為此,您必須將屬性 `spring.cloud.stream.schemaRegistryClient.cached=true` 新增到您的應用程式屬性中。 |
Schema Registry 客戶端屬性
Schema Registry 客戶端支援以下屬性:
spring.cloud.stream.schemaRegistryClient.endpoint
-
schema 伺服器的位置。設定此項時,請使用完整的 URL,包括協議 (`http` 或 `https`)、埠和上下文路徑。
- 預設值
spring.cloud.stream.schemaRegistryClient.cached
-
客戶端是否快取 schema 伺服器的響應。通常設定為 `false`,因為快取發生在訊息轉換器中。使用 schema registry 客戶端的客戶端應將此設定為 `true`。
- 預設值
-
false
Avro Schema Registry 客戶端訊息轉換器
對於已在應用上下文中註冊了 `SchemaRegistryClient` bean 的應用程式,Spring Cloud Stream 會為模式管理自動配置一個 Apache Avro 訊息轉換器。這簡化了模式演進,因為接收訊息的應用程式可以輕鬆訪問寫模式 (writer schema),並與它們自己的讀模式 (reader schema) 進行協調。
對於出站訊息,如果繫結的內容型別設定為 `application/*+avro`,則 `MessageConverter` 會被啟用,如下例所示:
spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
在出站轉換期間,訊息轉換器會嘗試推斷每條出站訊息的模式(基於其型別),並使用 `SchemaRegistryClient` 將其註冊到主題(基於負載型別)。如果已找到相同的模式,則會檢索其引用。否則,模式會被註冊,並提供新的版本號。訊息會以 `contentType` 頭髮送,使用以下方案:`application/[prefix].[subject].v[version]+avro`,其中 `prefix` 可配置,而 `subject` 是從負載型別推匯出來的。
例如,型別為 `User` 的訊息可能會以二進位制負載傳送,其內容型別為 `application/vnd.user.v2+avro`,其中 `user` 是主題,`2` 是版本號。
接收訊息時,轉換器從入站訊息的頭部推斷模式引用,並嘗試檢索它。該模式用作反序列化過程中的寫模式 (writer schema)。
Avro Schema Registry 訊息轉換器屬性
如果您已透過設定 `spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro` 啟用了基於 Avro 的 schema registry 客戶端,則可以透過設定以下屬性來自定義註冊行為。
- spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
-
啟用此項,讓轉換器使用反射從 POJO 推斷 Schema。
預設值:
false
- spring.cloud.stream.schema.avro.readerSchema
-
Avro 透過比較寫模式(writer schema,原始負載)和讀模式(reader schema,您的應用負載)來比較模式版本。有關更多資訊,請參見Avro 文件。如果設定了此項,它將覆蓋模式伺服器上的任何查詢,並使用本地模式作為讀模式。預設值:
null
- spring.cloud.stream.schema.avro.schemaLocations
-
將此屬性中列出的所有 `.avsc` 檔案註冊到 Schema 伺服器。
預設值:
empty
- spring.cloud.stream.schema.avro.prefix
-
用於 Content-Type 頭部的R字首。
預設值:
vnd
- spring.cloud.stream.schema.avro.subjectNamingStrategy
-
確定在 schema registry 中註冊 Avro 模式時使用的主題 (subject) 名稱。提供了兩種實現:
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
,其中主題是模式名稱;以及org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
,它使用 Avro 模式名稱空間和名稱返回完全限定的主題。可以透過實現org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
建立自定義策略。預設值:
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
- spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer
-
忽略任何 schema registry 通訊。對於測試很有用,這樣在執行單元測試時,它不會不必要地嘗試連線到 Schema Registry 伺服器。
預設值:
false
Apache Avro 訊息轉換器
Spring Cloud Stream 透過其 `spring-cloud-stream-schema-registry-client` 模組提供對基於模式的訊息轉換器的支援。目前,開箱即用的基於模式訊息轉換器唯一支援的序列化格式是 Apache Avro,未來版本將新增更多格式。
spring-cloud-stream-schema-registry-client
模組包含兩種可用於 Apache Avro 序列化的訊息轉換器:
-
使用序列化或反序列化物件的類資訊,或者在啟動時已知位置的模式的轉換器。
-
使用 schema registry 的轉換器。它們在執行時定位模式,並在領域物件演變時動態註冊新模式。
帶模式支援的轉換器
AvroSchemaMessageConverter
支援透過使用預定義模式或使用類中可用的模式資訊(無論是反射還是包含在 `SpecificRecord` 中)來序列化和反序列化訊息。如果您提供了自定義轉換器,則不會建立預設的 AvroSchemaMessageConverter bean。以下示例展示了一個自定義轉換器:
要使用自定義轉換器,只需將其新增到應用上下文中,可以選擇指定一個或多個與之關聯的 `MimeTypes`。預設的 `MimeType` 是 `application/avro`。
如果轉換的目標型別是 `GenericRecord`,則必須設定模式。
以下示例展示瞭如何在 sink 應用程式中配置不帶預定義模式的 Apache Avro `MessageConverter`。在此示例中,請注意 mime 型別的值是 `avro/bytes`,而不是預設的 `application/avro`。
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
反之,以下應用程式註冊了一個帶有預定義模式(在 classpath 中找到)的轉換器:
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
return converter;
}
}
Schema Registry 伺服器
Spring Cloud Stream 提供了一個 schema registry 伺服器實現。要使用它,您可以下載最新的 `spring-cloud-stream-schema-registry-server` release 並將其作為獨立應用程式執行:
wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar
您可以將 schema registry 嵌入到您現有的 Spring Boot web 應用程式中。為此,將 `spring-cloud-stream-schema-registry-core` artifact 新增到您的專案中,並使用 `@EnableSchemaRegistryServer` 註解,這將向您的應用程式新增 schema registry 伺服器 REST 控制器。以下示例展示了一個啟用 schema registry 的 Spring Boot 應用程式:
|
spring.cloud.stream.schema.server.path
屬性可用於控制 schema 伺服器的根路徑(特別是在嵌入到其他應用程式中時)。spring.cloud.stream.schema.server.allowSchemaDeletion
布林屬性啟用模式刪除功能。預設情況下,此功能是停用的。
schema registry 伺服器使用關係資料庫來儲存模式。預設情況下,它使用嵌入式資料庫。您可以使用Spring Boot SQL 資料庫和 JDBC 配置選項來自定義模式儲存。
Schema Registry 伺服器 API
Schema Registry 伺服器 API 包含以下操作:
-
POST /
— 參見 註冊新模式 -
GET /{subject}/{format}/{version}
— 參見 透過主題、格式和版本檢索現有模式 -
GET /{subject}/{format}
— 參見 透過主題和格式檢索現有模式 -
GET /schemas/{id}
— 參見 透過 ID 檢索現有模式 -
DELETE /{subject}/{format}/{version}
— 參見 透過主題、格式和版本刪除模式 -
DELETE /schemas/{id}
— 參見 透過 ID 刪除模式 -
DELETE /{subject}
— 參見 透過主題刪除模式
註冊新模式
要註冊新模式,請向 `/` 端點發送一個 `POST` 請求。
`/` 接受包含以下欄位的 JSON 負載:
-
subject
: 模式主題 -
format
: 模式格式 -
definition
: 模式定義
其響應是一個 JSON 格式的模式物件,包含以下欄位:
-
id
: 模式 ID -
subject
: 模式主題 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定義
透過主題、格式和版本檢索現有模式
要透過主題、格式和版本檢索現有模式,請向 `{subject}/{format}/{version}` 端點發送一個 `GET` 請求。
其響應是一個 JSON 格式的模式物件,包含以下欄位:
-
id
: 模式 ID -
subject
: 模式主題 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定義
透過主題和格式檢索現有模式
要透過主題和格式檢索現有模式,請向 `/subject/format` 端點發送一個 `GET` 請求。
其響應是一個模式列表,每個模式物件都是 JSON 格式,包含以下欄位:
-
id
: 模式 ID -
subject
: 模式主題 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定義
透過 ID 檢索現有模式
要透過其 ID 檢索模式,請向 `/schemas/{id}` 端點發送一個 `GET` 請求。
其響應是一個 JSON 格式的模式物件,包含以下欄位:
-
id
: 模式 ID -
subject
: 模式主題 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定義
透過主題刪除模式
DELETE /{subject}
透過其主題刪除現有模式。
本說明僅適用於 Spring Cloud Stream 1.1.0.RELEASE 的使用者。Spring Cloud Stream 1.1.0.RELEASE 使用表名 schema 來儲存 Schema 物件。Schema 在許多資料庫實現中是一個關鍵字。為了避免將來出現任何衝突,從 1.1.1.RELEASE 版本開始,我們選擇了 SCHEMA_REPOSITORY 作為儲存表的名稱。任何計劃升級的 Spring Cloud Stream 1.1.0.RELEASE 使用者都應在升級之前將其現有模式遷移到新表中。 |
使用 Confluent 的 Schema Registry
預設配置會建立一個 `DefaultSchemaRegistryClient` bean。如果您想使用 Confluent schema registry,則需要建立一個 `ConfluentSchemaRegistryClient` 型別的 bean,它將取代框架預設配置的 bean。以下示例展示瞭如何建立此類 bean:
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
ConfluentSchemaRegistryClient 已針對 Confluent 平臺版本 4.0.0 進行了測試。 |
模式註冊過程(序列化)
註冊過程的第一部分是從正在透過通道傳送的負載中提取模式。`SpecificRecord` 或 `GenericRecord` 等 Avro 型別已包含模式,可以立即從例項中檢索。對於 POJO,如果將 `spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled` 屬性設定為 `true`(預設值),則會推斷模式。
獲取模式後,轉換器會從遠端伺服器載入其元資料(版本)。首先,它查詢本地快取。如果未找到結果,則將資料提交到伺服器,伺服器回覆版本資訊。轉換器總是快取結果,以避免為每個需要序列化的新訊息查詢 Schema 伺服器的開銷。
有了模式版本資訊,轉換器將訊息的 `contentType` 頭設定為攜帶版本資訊,例如:`application/vnd.user.v1+avro`。
模式解析過程(反序列化)
讀取包含版本資訊的訊息時(即,帶有 模式註冊過程(序列化) 中描述的方案的 `contentType` 頭),轉換器會查詢 Schema 伺服器以獲取入站訊息的寫模式 (writer schema)。一旦找到入站訊息的正確模式,它會檢索讀模式 (reader schema),並使用 Avro 的模式解析支援將其讀入讀定義(設定預設值和任何缺失的屬性)。
您應該理解寫模式(寫入訊息的應用程式)和讀模式(接收應用程式)之間的區別。我們建議花點時間閱讀Avro 術語並理解這個過程。Spring Cloud Stream 始終獲取寫模式以確定如何讀取訊息。如果您希望 Avro 的模式演進支援正常工作,則需要確保為您的應用程式正確設定了 `readerSchema`。 |