記錄序列化和反序列化
Kafka Streams binder 允許您透過兩種方式序列化和反序列化記錄。一種是 Kafka 提供的原生序列化和反序列化功能,另一種是 Spring Cloud Stream 框架的訊息轉換能力。讓我們看看一些細節。
入站反序列化
鍵始終使用原生 Serdes 進行反序列化。
對於值,預設情況下,入站反序列化由 Kafka 原生執行。請注意,這是與早期版本的 Kafka Streams binder 的預設行為相比的重大變化,在早期版本中,反序列化是由框架完成的。
Kafka Streams binder 將嘗試透過檢視 java.util.function.Function|Consumer
的型別簽名來推斷匹配的 Serde
型別。這是它匹配 Serdes 的順序。
-
如果應用提供了一個
Serde
型別的 bean,並且返回型別使用傳入的鍵或值型別的實際型別進行了引數化,那麼它將使用該Serde
進行入站反序列化。例如,如果您的應用中有以下內容,binder 會檢測到KStream
的傳入值型別與Serde
bean 引數化的型別匹配。它將使用該 Serde 進行入站反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下來,它會檢視型別,看它們是否是 Kafka Streams 公開的型別之一。如果是,則使用它們。以下是 binder 將嘗試從 Kafka Streams 匹配的 Serde 型別。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的 Serdes 都不匹配這些型別,那麼它將使用 Spring Kafka 提供的 JsonSerde。在這種情況下,binder 假定這些型別是 JSON 友好的。如果您有多個值物件作為輸入,這非常有用,因為 binder 會在內部將它們推斷為正確的 Java 型別。但在回退到
JsonSerde
之前,binder 會檢查 Kafka Streams 配置中設定的預設Serde
,看它是否是與傳入 KStream 型別匹配的Serde
。
如果上述策略都不奏效,則應用必須透過配置提供 Serde
。這可以透過兩種方式配置 - 繫結級別或預設級別。
首先,binder 會檢視繫結級別是否提供了 Serde
。例如,如果您有以下處理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然後,您可以使用以下方式提供繫結級別 Serde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您如上所述為每個輸入繫結提供了 Serde ,那麼這將具有更高的優先順序,並且 binder 將避免進行任何 Serde 推斷。 |
如果您希望將預設的鍵/值 Serdes 用於入站反序列化,您可以在 binder 級別進行配置。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您不想要 Kafka 提供的原生解碼,可以依賴 Spring Cloud Stream 提供的訊息轉換功能。由於原生解碼是預設設定,為了讓 Spring Cloud Stream 反序列化入站值物件,您需要顯式停用原生解碼。
例如,如果您使用與上述相同的 BiFunction 處理器,則設定 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
您需要為所有輸入單獨停用原生解碼。否則,對於未停用的輸入,仍將應用原生解碼。
預設情況下,Spring Cloud Stream 將使用 application/json
作為內容型別,並使用相應的 json 訊息轉換器。您可以透過使用以下屬性和相應的 MessageConverter
bean 來使用自定義訊息轉換器。
spring.cloud.stream.bindings.process-in-0.contentType
出站序列化
出站序列化基本上遵循與上述入站反序列化相同的規則。與入站反序列化一樣,Spring Cloud Stream 早期版本的一個主要變化是出站序列化由 Kafka 原生處理。在 binder 的 3.0 版本之前,這是由框架本身完成的。
出站的鍵總是由 Kafka 使用 binder 推斷出的匹配 Serde
進行序列化。如果無法推斷鍵的型別,則需要使用配置進行指定。
值 serdes 使用與入站反序列化相同的規則進行推斷。首先,它會匹配以查看出站型別是否來自應用中提供的 bean。如果不是,它會檢查是否與 Kafka 公開的 Serde
匹配,例如 - Integer
, Long
, Short
, Double
, Float
, byte[]
, UUID
和 String
。如果這不起作用,則會回退到 Spring Kafka 專案提供的 JsonSerde
,但首先會檢視預設的 Serde
配置以檢視是否存在匹配項。請記住,所有這些對應用來說是透明的。如果這些都不奏效,則使用者必須透過配置提供要使用的 Serde
。
假設您使用與上述相同的 BiFunction
處理器。那麼您可以按如下方式配置出站鍵/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推斷失敗,並且未提供繫結級別 Serdes,則 binder 會回退到 JsonSerde
,但會檢視預設 Serdes 以查詢匹配項。
預設 serdes 的配置方式與上面反序列化部分描述的相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的應用使用了分支功能並且有多個輸出繫結,那麼這些必須按繫結進行配置。再次強調,如果 binder 能夠推斷 Serde
型別,您則無需進行此配置。
如果您不想要 Kafka 提供的原生編碼,但希望使用框架提供的訊息轉換,則需要顯式停用原生編碼,因為原生編碼是預設設定。例如,如果您使用與上述相同的 BiFunction 處理器,則設定 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支情況下,您需要為所有輸出單獨停用原生編碼。否則,對於未停用的輸出,仍將應用原生編碼。
當轉換由 Spring Cloud Stream 完成時,預設情況下,它將使用 application/json
作為內容型別,並使用相應的 json 訊息轉換器。您可以透過使用以下屬性和相應的 MessageConverter
bean 來使用自定義訊息轉換器。
spring.cloud.stream.bindings.process-out-0.contentType
停用原生編碼/解碼後,binder 不會像原生 Serdes 那樣進行任何推斷。應用需要顯式提供所有配置選項。因此,通常建議在使用 Spring Cloud Stream Kafka Streams 應用時,堅持使用預設的序列化/反序列化選項,並採用 Kafka Streams 提供的原生序列化/反序列化。唯一必須使用框架提供的訊息轉換能力的情況是,您的上游生產者正在使用特定的序列化策略。在這種情況下,您希望使用匹配的反序列化策略,因為原生機制可能會失敗。當依賴預設的 Serde
機制時,應用必須確保 binder 能夠正確地將入站和出站與適當的 Serde
進行對映,否則可能會失敗。
值得一提的是,上述資料序列化/反序列化方法僅適用於您的處理器的邊緣,即 - 入站和出站。您的業務邏輯可能仍然需要呼叫顯式需要 Serde
物件的 Kafka Streams API。這些仍然是應用的責任,必須由開發人員相應地處理。