記錄序列化和反序列化

Kafka Streams binder 允許您透過兩種方式序列化和反序列化記錄。一種是 Kafka 提供的原生序列化和反序列化功能,另一種是 Spring Cloud Stream 框架的訊息轉換能力。讓我們看看一些細節。

入站反序列化

鍵始終使用原生 Serdes 進行反序列化。

對於值,預設情況下,入站反序列化由 Kafka 原生執行。請注意,這是與早期版本的 Kafka Streams binder 的預設行為相比的重大變化,在早期版本中,反序列化是由框架完成的。

Kafka Streams binder 將嘗試透過檢視 java.util.function.Function|Consumer 的型別簽名來推斷匹配的 Serde 型別。這是它匹配 Serdes 的順序。

  • 如果應用提供了一個 Serde 型別的 bean,並且返回型別使用傳入的鍵或值型別的實際型別進行了引數化,那麼它將使用該 Serde 進行入站反序列化。例如,如果您的應用中有以下內容,binder 會檢測到 KStream 的傳入值型別與 Serde bean 引數化的型別匹配。它將使用該 Serde 進行入站反序列化。

@Bean
public Serde<Foo> customSerde() {
 ...
}

@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
  • 接下來,它會檢視型別,看它們是否是 Kafka Streams 公開的型別之一。如果是,則使用它們。以下是 binder 將嘗試從 Kafka Streams 匹配的 Serde 型別。

    Integer, Long, Short, Double, Float, byte[], UUID and String.
  • 如果 Kafka Streams 提供的 Serdes 都不匹配這些型別,那麼它將使用 Spring Kafka 提供的 JsonSerde。在這種情況下,binder 假定這些型別是 JSON 友好的。如果您有多個值物件作為輸入,這非常有用,因為 binder 會在內部將它們推斷為正確的 Java 型別。但在回退到 JsonSerde 之前,binder 會檢查 Kafka Streams 配置中設定的預設 Serde,看它是否是與傳入 KStream 型別匹配的 Serde

如果上述策略都不奏效,則應用必須透過配置提供 Serde。這可以透過兩種方式配置 - 繫結級別或預設級別。

首先,binder 會檢視繫結級別是否提供了 Serde。例如,如果您有以下處理器,

@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}

然後,您可以使用以下方式提供繫結級別 Serde

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您如上所述為每個輸入繫結提供了 Serde,那麼這將具有更高的優先順序,並且 binder 將避免進行任何 Serde 推斷。

如果您希望將預設的鍵/值 Serdes 用於入站反序列化,您可以在 binder 級別進行配置。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果您不想要 Kafka 提供的原生解碼,可以依賴 Spring Cloud Stream 提供的訊息轉換功能。由於原生解碼是預設設定,為了讓 Spring Cloud Stream 反序列化入站值物件,您需要顯式停用原生解碼。

例如,如果您使用與上述相同的 BiFunction 處理器,則設定 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false 您需要為所有輸入單獨停用原生解碼。否則,對於未停用的輸入,仍將應用原生解碼。

預設情況下,Spring Cloud Stream 將使用 application/json 作為內容型別,並使用相應的 json 訊息轉換器。您可以透過使用以下屬性和相應的 MessageConverter bean 來使用自定義訊息轉換器。

spring.cloud.stream.bindings.process-in-0.contentType

出站序列化

出站序列化基本上遵循與上述入站反序列化相同的規則。與入站反序列化一樣,Spring Cloud Stream 早期版本的一個主要變化是出站序列化由 Kafka 原生處理。在 binder 的 3.0 版本之前,這是由框架本身完成的。

出站的鍵總是由 Kafka 使用 binder 推斷出的匹配 Serde 進行序列化。如果無法推斷鍵的型別,則需要使用配置進行指定。

值 serdes 使用與入站反序列化相同的規則進行推斷。首先,它會匹配以查看出站型別是否來自應用中提供的 bean。如果不是,它會檢查是否與 Kafka 公開的 Serde 匹配,例如 - Integer, Long, Short, Double, Float, byte[], UUIDString。如果這不起作用,則會回退到 Spring Kafka 專案提供的 JsonSerde,但首先會檢視預設的 Serde 配置以檢視是否存在匹配項。請記住,所有這些對應用來說是透明的。如果這些都不奏效,則使用者必須透過配置提供要使用的 Serde

假設您使用與上述相同的 BiFunction 處理器。那麼您可以按如下方式配置出站鍵/值 Serdes。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

如果 Serde 推斷失敗,並且未提供繫結級別 Serdes,則 binder 會回退到 JsonSerde,但會檢視預設 Serdes 以查詢匹配項。

預設 serdes 的配置方式與上面反序列化部分描述的相同。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果您的應用使用了分支功能並且有多個輸出繫結,那麼這些必須按繫結進行配置。再次強調,如果 binder 能夠推斷 Serde 型別,您則無需進行此配置。

如果您不想要 Kafka 提供的原生編碼,但希望使用框架提供的訊息轉換,則需要顯式停用原生編碼,因為原生編碼是預設設定。例如,如果您使用與上述相同的 BiFunction 處理器,則設定 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false 在分支情況下,您需要為所有輸出單獨停用原生編碼。否則,對於未停用的輸出,仍將應用原生編碼。

當轉換由 Spring Cloud Stream 完成時,預設情況下,它將使用 application/json 作為內容型別,並使用相應的 json 訊息轉換器。您可以透過使用以下屬性和相應的 MessageConverter bean 來使用自定義訊息轉換器。

spring.cloud.stream.bindings.process-out-0.contentType

停用原生編碼/解碼後,binder 不會像原生 Serdes 那樣進行任何推斷。應用需要顯式提供所有配置選項。因此,通常建議在使用 Spring Cloud Stream Kafka Streams 應用時,堅持使用預設的序列化/反序列化選項,並採用 Kafka Streams 提供的原生序列化/反序列化。唯一必須使用框架提供的訊息轉換能力的情況是,您的上游生產者正在使用特定的序列化策略。在這種情況下,您希望使用匹配的反序列化策略,因為原生機制可能會失敗。當依賴預設的 Serde 機制時,應用必須確保 binder 能夠正確地將入站和出站與適當的 Serde 進行對映,否則可能會失敗。

值得一提的是,上述資料序列化/反序列化方法僅適用於您的處理器的邊緣,即 - 入站和出站。您的業務邏輯可能仍然需要呼叫顯式需要 Serde 物件的 Kafka Streams API。這些仍然是應用的責任,必須由開發人員相應地處理。