記錄序列化和反序列化
Kafka Streams 繫結器允許您以兩種方式序列化和反序列化記錄。一種是 Kafka 提供的原生序列化和反序列化功能,另一種是 Spring Cloud Stream 框架的訊息轉換功能。讓我們看一些細節。
入站反序列化
鍵始終使用原生 Serdes 進行反序列化。
對於值,預設情況下,入站反序列化由 Kafka 原生執行。請注意,這是 Kafka Streams 繫結器先前版本中預設行為的一項重大更改,以前的反序列化是由框架完成的。
Kafka Streams 繫結器將嘗試透過檢視 java.util.function.Function|Consumer 的型別簽名來推斷匹配的 Serde 型別。以下是它匹配 Serdes 的順序。
-
如果應用程式提供了一個
Serde型別的 bean,並且返回型別使用傳入鍵或值的實際型別進行了引數化,那麼它將使用該Serde進行入站反序列化。例如,如果您在應用程式中有以下內容,繫結器會檢測到KStream的傳入值型別與Serdebean 上引數化的型別匹配。它將使用該 Serde 進行入站反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下來,它會檢視型別,並檢視它們是否是 Kafka Streams 公開的型別之一。如果是,則使用它們。以下是繫結器將嘗試從 Kafka Streams 匹配的 Serde 型別。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的 Serdes 都不匹配這些型別,那麼它將使用 Spring Kafka 提供的 JsonSerde。在這種情況下,繫結器假定這些型別是 JSON 友好的。如果您有多個值物件作為輸入,這很有用,因為繫結器將在內部將它們推斷為正確的 Java 型別。但在回退到
JacksonJsonSerde之前,繫結器會檢查 Kafka Streams 配置中設定的預設Serde,以檢視它是否是與傳入 KStream 型別匹配的Serde。
如果以上策略都不奏效,那麼應用程式必須透過配置提供 Serde。這可以透過兩種方式配置 - 繫結或預設。
首先,繫結器將檢視是否在繫結級別提供了 Serde。例如,如果您有以下處理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
那麼,您可以使用以下方式提供繫結級別的 Serde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您為每個輸入繫結提供瞭如上所述的 Serde,那麼它將具有更高的優先順序,並且繫結器將避免任何 Serde 推理。 |
如果您希望將預設的鍵/值 Serdes 用於入站反序列化,您可以在繫結器級別進行此操作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您不希望使用 Kafka 提供的原生解碼,您可以依賴 Spring Cloud Stream 提供的訊息轉換功能。由於原生解碼是預設設定,為了讓 Spring Cloud Stream 反序列化入站值物件,您需要明確停用原生解碼。
例如,如果您有與上面相同的 BiFunction 處理器,那麼 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false 您需要單獨為所有輸入停用原生解碼。否則,原生解碼仍將應用於您未停用的那些。
預設情況下,Spring Cloud Stream 將使用 application/json 作為內容型別,並使用適當的 json 訊息轉換器。您可以透過使用以下屬性和適當的 MessageConverter bean 來使用自定義訊息轉換器。
spring.cloud.stream.bindings.process-in-0.contentType
出站序列化
出站序列化遵循與上述入站反序列化相同的規則。與入站反序列化一樣,Spring Cloud Stream 以前版本的一個主要變化是,出站序列化由 Kafka 原生處理。在繫結器的 3.0 版本之前,這是由框架本身完成的。
出站鍵始終由 Kafka 使用繫結器推斷的匹配 Serde 進行序列化。如果無法推斷鍵的型別,則需要透過配置指定。
值 Serdes 使用與入站反序列化相同的規則進行推斷。首先,它匹配以查看出站型別是否來自應用程式中提供的 bean。如果不是,它會檢查是否與 Kafka 公開的 Serde(例如 - Integer、Long、Short、Double、Float、byte[]、UUID 和 String)匹配。如果這不起作用,它將回退到 Spring Kafka 專案提供的 JacksonJsonSerde,但首先會檢視預設的 Serde 配置以檢視是否有匹配。請記住,所有這些對應用程式都是透明的。如果這些都不起作用,那麼使用者必須透過配置提供要使用的 Serde。
假設您使用的是與上面相同的 BiFunction 處理器。然後您可以按如下方式配置出站鍵/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推理失敗,並且未提供繫結級別 Serdes,則繫結器將回退到 JacksonJsonSerde,但會查詢預設 Serdes 以進行匹配。
預設 Serdes 的配置方式與上面在反序列化下描述的方式相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的應用程式使用分支功能並具有多個輸出繫結,則需要為每個繫結配置這些。同樣,如果繫結器能夠推斷 Serde 型別,則無需進行此配置。
如果您不希望使用 Kafka 提供的原生編碼,而是希望使用框架提供的訊息轉換,那麼您需要明確停用原生編碼,因為原生編碼是預設設定。例如,如果您有與上面相同的 BiFunction 處理器,那麼 spring.cloud.stream.bindings.process-out-0.producer.useNativeEncoding: false 在分支情況下,您需要單獨為所有輸出停用原生編碼。否則,原生編碼仍將應用於您未停用的那些。
當由 Spring Cloud Stream 完成轉換時,預設情況下,它將使用 application/json 作為內容型別,並使用適當的 json 訊息轉換器。您可以透過使用以下屬性和相應的 MessageConverter bean 來使用自定義訊息轉換器。
spring.cloud.stream.bindings.process-out-0.contentType
當原生編碼/解碼被停用時,繫結器將不會像原生 Serdes 那樣進行任何推理。應用程式需要明確提供所有配置選項。因此,通常建議在編寫 Spring Cloud Stream Kafka Streams 應用程式時,堅持使用預設的序列化/反序列化選項,並使用 Kafka Streams 提供的原生序列化/反序列化。您必須使用框架提供的訊息轉換能力的唯一場景是當您的上游生產者使用特定的序列化策略時。在這種情況下,您希望使用匹配的反序列化策略,因為原生機制可能會失敗。當依賴預設的 Serde 機制時,應用程式必須確保繫結器能夠正確地將入站和出站與適當的 Serde 進行對映,否則可能會失敗。
值得一提的是,上面概述的資料序列化/反序列化方法僅適用於您的處理器邊緣,即入站和出站。您的業務邏輯可能仍然需要呼叫明確需要 Serde 物件的 Kafka Streams API。這些仍然是應用程式的責任,必須由開發人員相應地處理。