序列化、反序列化和訊息轉換
概述
Apache Kafka 提供了用於序列化和反序列化記錄值及其鍵的高階 API。它透過 org.apache.kafka.common.serialization.Serializer<T> 和 org.apache.kafka.common.serialization.Deserializer<T> 抽象以及一些內建實現來提供。同時,我們可以透過使用 Producer 或 Consumer 配置屬性來指定序列化器和反序列化器類。以下示例展示瞭如何實現:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
對於更復雜或特殊的情況,KafkaConsumer(因此也包括 KafkaProducer)提供了過載的建構函式,分別接受用於 keys 和 values 的 Serializer 和 Deserializer 例項。
當您使用此 API 時,DefaultKafkaProducerFactory 和 DefaultKafkaConsumerFactory 也提供了屬性(透過建構函式或 setter 方法)來將自定義 Serializer 和 Deserializer 例項注入到目標 Producer 或 Consumer 中。此外,您可以透過建構函式傳入 Supplier<Serializer> 或 Supplier<Deserializer> 例項 - 這些 Supplier 在建立每個 Producer 或 Consumer 時都會被呼叫。
字串序列化
自 2.5 版本以來,Spring for Apache Kafka 提供了 ToStringSerializer 和 ParseStringDeserializer 類,它們使用實體的字串表示形式。它們依賴於 toString 方法以及一些 Function<String> 或 BiFunction<String, Headers> 來解析字串並填充例項的屬性。通常,這會呼叫類上的某個靜態方法,例如 parse。
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
預設情況下,ToStringSerializer 配置為在記錄 Headers 中傳遞有關序列化實體的型別資訊。您可以透過將 addTypeInfo 屬性設定為 false 來停用此功能。此資訊可由接收端的 ParseStringDeserializer 使用。
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS(預設true):您可以將其設定為false以在ToStringSerializer上停用此功能(設定addTypeInfo屬性)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以配置用於將 String 轉換為 byte[] 或從 byte[] 轉換的 Charset,預設值為 UTF-8。
您可以使用 ConsumerConfig 屬性配置反序列化器,並指定解析器方法的名稱。
-
ParseStringDeserializer.KEY_PARSER -
ParseStringDeserializer.VALUE_PARSER
這些屬性必須包含類的完全限定名,後跟方法名,並用句點 . 分隔。該方法必須是靜態的,並且簽名必須是 (String, Headers) 或 (String)。
還提供了 ToFromStringSerde,用於 Kafka Streams。
JSON
Spring for Apache Kafka 還提供了基於 Jackson JSON 物件對映器的 JacksonJsonSerializer 和 JacksonJsonDeserializer 實現。JacksonJsonSerializer 允許將任何 Java 物件寫入為 JSON byte[]。JacksonJsonDeserializer 需要一個額外的 Class<?> targetType 引數,以允許將消費的 byte[] 反序列化為正確的目標物件。以下示例展示瞭如何建立 JacksonJsonDeserializer:
JacksonJsonDeserializer<Thing> thingDeserializer = new JacksonJsonDeserializer<>(Thing.class);
您可以使用 ObjectMapper 自定義 JacksonJsonSerializer 和 JacksonJsonDeserializer。您還可以擴充套件它們,在 configure(Map<String, ?> configs, boolean isKey) 方法中實現一些特定的配置邏輯。
從 2.3 版本開始,所有支援 JSON 的元件預設使用 JacksonUtils.enhancedObjectMapper() 例項進行配置,該例項停用了 MapperFeature.DEFAULT_VIEW_INCLUSION 和 DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES 功能。此外,該例項還提供了用於自定義資料型別(如 Java time 和 Kotlin 支援)的知名模組。有關更多資訊,請參閱 JacksonUtils.enhancedObjectMapper() 的 JavaDocs。此方法還註冊了一個 org.springframework.kafka.support.JacksonMimeTypeModule,用於將 org.springframework.util.MimeType 物件序列化為純字串,以便在網路上實現跨平臺相容性。JacksonMimeTypeModule 可以作為 bean 註冊到應用程式上下文中,它將被自動配置到 Spring Boot ObjectMapper 例項中。
同樣從 2.3 版本開始,JsonDeserializer 提供了基於 TypeReference 的建構函式,以更好地處理目標泛型容器型別。
從 2.1 版本開始,您可以在記錄 Headers 中傳遞型別資訊,從而支援處理多種型別。此外,您可以使用以下 Kafka 屬性配置序列化器和反序列化器。如果您已為 KafkaConsumer 和 KafkaProducer 分別提供了 Serializer 和 Deserializer 例項,則它們不起作用。
配置屬性
-
JacksonJsonSerializer.ADD_TYPE_INFO_HEADERS(預設true):您可以將其設定為false以在JacksonJsonSerializer上停用此功能(設定addTypeInfo屬性)。 -
JacksonJsonSerializer.TYPE_MAPPINGS(預設empty):參見對映型別。 -
JacksonJsonDeserializer.USE_TYPE_INFO_HEADERS(預設true):您可以將其設定為false以忽略序列化器設定的頭。 -
JacksonJsonDeserializer.REMOVE_TYPE_INFO_HEADERS(預設true):您可以將其設定為false以保留序列化器設定的頭。 -
JacksonJsonDeserializer.KEY_DEFAULT_TYPE:如果不存在頭資訊,則鍵反序列化的回退型別。 -
JacksonJsonDeserializer.VALUE_DEFAULT_TYPE:如果不存在頭資訊,則值反序列化的回退型別。 -
JacksonJsonDeserializer.TRUSTED_PACKAGES(預設java.util,java.lang):允許反序列化的包模式的逗號分隔列表。*表示反序列化所有。 -
JacksonJsonDeserializer.TYPE_MAPPINGS(預設empty):參見對映型別。 -
JacksonJsonDeserializer.KEY_TYPE_METHOD(預設empty):參見使用方法確定型別。 -
JacksonJsonDeserializer.VALUE_TYPE_METHOD(預設empty):參見使用方法確定型別。
從 2.2 版本開始,型別資訊頭(如果由序列化器新增)會被反序列化器移除。您可以透過將 removeTypeHeaders 屬性設定為 false,直接在反序列化器上或透過前面描述的配置屬性,恢復到以前的行為。
從 2.8 版本開始,如果您按照 程式設計構造 中所示以程式設計方式構造序列化器或反序列化器,工廠將應用上述屬性,只要您沒有顯式設定任何屬性(使用 set*() 方法或使用流暢 API)。以前,以程式設計方式建立時,配置屬性從不應用;如果您直接在物件上顯式設定屬性,則仍然如此。 |
對映型別
從 2.2 版本開始,當使用 JSON 時,您現在可以使用前面列表中的屬性提供型別對映。以前,您必須在序列化器和反序列化器中自定義型別對映器。對映由逗號分隔的 token:className 對列表組成。出站時,有效負載的類名對映到相應的令牌。入站時,型別頭中的令牌對映到相應的類名。
以下示例建立了一組對映
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
| 相應的物件必須相容。 |
如果您使用 Spring Boot,您可以在 application.properties(或 yaml)檔案中提供這些屬性。以下示例展示瞭如何實現:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
|
您只能透過屬性執行簡單的配置。對於更高階的配置(例如在序列化器和反序列化器中使用自定義
還提供了 setter 方法,作為使用這些建構函式的替代方案。 |
當使用 Spring Boot 並如上所示覆蓋 ConsumerFactory 和 ProducerFactory 時,bean 方法的返回型別需要使用萬用字元泛型型別。如果提供了具體的泛型型別,那麼 Spring Boot 將忽略這些 bean,並仍然使用預設的 bean。 |
從 2.2 版本開始,您可以透過使用帶有布林型 useHeadersIfPresent 引數(預設為 true)的過載建構函式之一,顯式配置反序列化器以使用提供的目標型別並忽略頭中的型別資訊。以下示例展示瞭如何實現:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法確定型別
從 2.5 版本開始,您現在可以透過屬性配置反序列化器,以呼叫一個方法來確定目標型別。如果存在,這將覆蓋前面討論的任何其他技術。如果資料由不使用 Spring 序列化器的應用程式釋出,並且您需要根據資料或其他頭反序列化為不同的型別,這可能很有用。將這些屬性設定為方法名 - 完全限定的類名後跟方法名,用句點 . 分隔。該方法必須宣告為 public static,具有以下三種簽名之一:(String topic, byte[] data, Headers headers)、(byte[] data, Headers headers) 或 (byte[] data),並返回 Jackson JavaType。
-
JsonDeserializer.KEY_TYPE_METHOD:spring.json.key.type.method -
JsonDeserializer.VALUE_TYPE_METHOD:spring.json.value.type.method
您可以使用任意頭或檢查資料來確定型別。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
對於更復雜的資料檢查,請考慮使用 JsonPath 或類似工具,但確定型別的測試越簡單,過程效率就越高。
以下是程式設計建立反序列化器的示例(在建構函式中為消費者工廠提供反序列化器時):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程式設計構造
自 2.3 版本以來,當以程式設計方式構造序列化器/反序列化器以在生產者/消費者工廠中使用時,您可以使用流暢 API,這簡化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
為了以程式設計方式提供型別對映,類似於 使用方法確定型別,請使用 typeFunction 屬性。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要您不使用流暢 API 配置屬性,或使用 set*() 方法設定它們,工廠將使用配置屬性配置序列化器/反序列化器;請參閱 配置屬性。
委託序列化器和反序列化器
使用 Headers
版本 2.3 引入了 DelegatingSerializer 和 DelegatingDeserializer,它們允許生產和消費具有不同鍵和/或值型別的記錄。生產者必須將頭 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 設定為選擇器值,該值用於選擇用於值的序列化器,並將 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR 設定為鍵的選擇器;如果未找到匹配項,則會丟擲 IllegalStateException。
對於傳入記錄,反序列化器使用相同的頭來選擇要使用的反序列化器;如果未找到匹配項或頭不存在,則返回原始 byte[]。
您可以透過建構函式配置選擇器到 Serializer / Deserializer 的對映,或者透過 Kafka 生產者/消費者屬性配置,鍵為 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG 和 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG。對於序列化器,生產者屬性可以是 Map<String, Object>,其中鍵是選擇器,值是 Serializer 例項、序列化器 Class 或類名。該屬性也可以是逗號分隔的對映條目字串,如下所示。
對於反序列化器,消費者屬性可以是 Map<String, Object>,其中鍵是選擇器,值是 Deserializer 例項、反序列化器 Class 或類名。該屬性也可以是逗號分隔的對映條目字串,如下所示。
要使用屬性進行配置,請使用以下語法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
生產者隨後會將 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 頭設定為 thing1 或 thing2。
此技術支援將不同型別傳送到相同主題(或不同主題)。
從 2.5.1 版本開始,如果型別(鍵或值)是 Serdes 支援的標準型別之一(Long、Integer 等),則不需要設定選擇器頭。相反,序列化器會將頭設定為該型別的類名。不需要為這些型別配置序列化器或反序列化器,它們將動態建立(一次)。 |
有關將不同型別傳送到不同主題的另一種技術,請參閱 使用 RoutingKafkaTemplate。
按型別
版本 2.8 引入了 DelegatingByTypeSerializer。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
從 2.8.3 版本開始,您可以配置序列化器以檢查對映鍵是否可從目標物件賦值,這在委託序列化器可以序列化子類時非常有用。在這種情況下,如果存在歧義匹配,則應提供一個有序的 Map,例如 LinkedHashMap。
按主題
從 2.8 版本開始,DelegatingByTopicSerializer 和 DelegatingByTopicDeserializer 允許根據主題名稱選擇序列化器/反序列化器。使用正則表示式 Pattern 來查詢要使用的例項。對映可以使用建構函式或透過屬性(逗號分隔的 pattern:serializer 列表)進行配置。
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
當用於鍵時,請使用 KEY_SERIALIZATION_TOPIC_CONFIG。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT 和 DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT 指定在沒有模式匹配時使用的預設序列化器/反序列化器。
額外的屬性 DelegatingByTopicSerialization.CASE_SENSITIVE(預設 true),當設定為 false 時,主題查詢將不區分大小寫。
重試反序列化器
RetryingDeserializer 使用委託 Deserializer 和 RetryTemplate 在委託反序列化期間可能出現瞬時錯誤(例如網路問題)時重試反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
可以在 RetryingDeserializer 上設定恢復回撥,以便在所有重試都用盡時返回一個備用物件。
有關使用重試策略、退避等配置 RetryTemplate,請參閱 Spring Framework 專案。
Spring Messaging 訊息轉換
儘管 Serializer 和 Deserializer API 從低階 Kafka Consumer 和 Producer 的角度來看非常簡單和靈活,但在使用 @KafkaListener 或 Spring Integration 的 Apache Kafka 支援時,您可能需要在 Spring Messaging 級別上獲得更大的靈活性。為了讓您輕鬆地在 org.springframework.messaging.Message 之間進行轉換,Spring for Apache Kafka 提供了 MessageConverter 抽象,並提供了 MessagingMessageConverter 實現及其 JacksonJsonMessageConverter(及其子類)定製。您可以將 MessageConverter 直接注入到 KafkaTemplate 例項中,並透過使用 AbstractKafkaListenerContainerFactory bean 定義來設定 @KafkaListener.containerFactory() 屬性。以下示例展示瞭如何實現:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
當使用 Spring Boot 時,只需將轉換器定義為 @Bean,Spring Boot 自動配置就會將其連線到自動配置的模板和容器工廠中。
當您使用 @KafkaListener 時,引數型別會提供給訊息轉換器以協助轉換。
|
此型別推斷只能在方法級別宣告 |
|
在消費者端,您可以配置一個 在生產者端,當您使用 Spring Integration 或
同樣,使用 為方便起見,從 2.3 版本開始,框架還提供了 |
從 2.7.1 版本開始,訊息有效負載轉換可以委託給 spring-messaging SmartMessageConverter;這使得轉換,例如,可以基於 MessageHeaders.CONTENT_TYPE 頭。
KafkaMessageConverter.fromMessage() 方法用於將出站訊息轉換為 ProducerRecord,其中訊息有效負載位於 ProducerRecord.value() 屬性中。KafkaMessageConverter.toMessage() 方法用於將入站訊息從 ConsumerRecord 轉換為,其中有效負載是 ConsumerRecord.value() 屬性。SmartMessageConverter.toMessage() 方法用於從傳遞給 fromMessage() 的 Message 建立新的出站 Message<?>(通常透過 KafkaTemplate.send(Message<?> msg))。類似地,在 KafkaMessageConverter.toMessage() 方法中,在轉換器從 ConsumerRecord 建立新的 Message<?> 後,將呼叫 SmartMessageConverter.fromMessage() 方法,然後使用新轉換的有效負載建立最終的入站訊息。在這兩種情況下,如果 SmartMessageConverter 返回 null,則使用原始訊息。 |
當 KafkaTemplate 和偵聽器容器工廠中使用預設轉換器時,您可以透過在模板上呼叫 setMessagingConverter() 並透過 @KafkaListener 方法上的 contentTypeConverter 屬性來配置 SmartMessageConverter。
示例
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring Data Projection 介面
從 2.1.1 版本開始,您可以將 JSON 轉換為 Spring Data Projection 介面而不是具體型別。這允許對資料進行非常選擇性且低耦合的繫結,包括從 JSON 文件中的多個位置查詢值。例如,以下介面可以定義為訊息有效負載型別:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
預設情況下,訪問器方法將用於查詢接收到的 JSON 文件中的屬性名稱作為欄位。@JsonPath 表示式允許自定義值查詢,甚至可以定義多個 JSON Path 表示式,以從多個位置查詢值,直到某個表示式返回實際值。
要啟用此功能,請使用配置了適當委託轉換器(用於出站轉換和非投影介面的轉換)的 JacksonProjectingMessageConverter。您還必須將 spring-data:spring-data-commons 和 com.jayway.jsonpath:json-path 新增到類路徑。
當用作 @KafkaListener 方法的引數時,介面型別會自動像往常一樣傳遞給轉換器。
使用 ErrorHandlingDeserializer
當反序列化器無法反序列化訊息時,Spring 無法處理該問題,因為它發生在 poll() 返回之前。為了解決這個問題,引入了 ErrorHandlingDeserializer。此反序列化器委託給一個真正的反序列化器(鍵或值)。如果委託反序列化器無法反序列化記錄內容,ErrorHandlingDeserializer 將返回一個 null 值,並在一個頭中返回一個 DeserializationException,其中包含原因和原始位元組。當您使用記錄級 MessageListener 時,如果 ConsumerRecord 包含鍵或值的 DeserializationException 頭,則容器的 ErrorHandler 將使用失敗的 ConsumerRecord 被呼叫。該記錄不會傳遞給偵聽器。
或者,您可以透過提供 failedDeserializationFunction(一個 Function<FailedDeserializationInfo, T>)來配置 ErrorHandlingDeserializer 以建立自定義值。此函式被呼叫以建立 T 例項,該例項以通常的方式傳遞給偵聽器。一個型別為 FailedDeserializationInfo 的物件,其中包含所有上下文資訊,將提供給該函式。您可以在頭中找到 DeserializationException(作為序列化的 Java 物件)。有關 ErrorHandlingDeserializer 的更多資訊,請參閱 Javadoc。
您可以使用接受鍵和值 Deserializer 物件的 DefaultKafkaConsumerFactory 建構函式,並連線您已配置了適當委託的 ErrorHandlingDeserializer 例項。或者,您可以使用消費者配置屬性(由 ErrorHandlingDeserializer 使用)來例項化委託。屬性名稱是 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。屬性值可以是類或類名。以下示例展示瞭如何設定這些屬性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用 failedDeserializationFunction。
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
前面的示例使用以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消費者配置了 ErrorHandlingDeserializer,則重要的是要配置 KafkaTemplate 及其生產者,使其具有可以處理普通物件以及原始 byte[] 值(由反序列化異常引起)的序列化器。模板的泛型值型別應為 Object。一種技術是使用 DelegatingByTypeSerializer;示例如下: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
當將 ErrorHandlingDeserializer 與批處理偵聽器一起使用時,您必須檢查訊息頭中的反序列化異常。與 DefaultBatchErrorHandler 一起使用時,您可以使用該頭來確定異常失敗的記錄,並透過 BatchListenerFailedException 將其通知錯誤處理器。
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException() 可用於將頭轉換為 DeserializationException。
當消費 List<ConsumerRecord<?, ?> 時,改用 SerializationUtils.getExceptionFromHeader()。
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果您還在使用 DeadLetterPublishingRecoverer,為 DeserializationException 釋出的記錄將具有 byte[] 型別的 record.value();這不應被序列化。考慮使用配置為對 byte[] 使用 ByteArraySerializer 而對所有其他型別使用普通序列化器(Json、Avro 等)的 DelegatingByTypeSerializer。 |
從 3.1 版本開始,您可以向 ErrorHandlingDeserializer 新增一個 Validator。如果委託 Deserializer 成功反序列化物件,但該物件未能透過驗證,則會丟擲類似於反序列化異常的異常。這允許將原始原始資料傳遞給錯誤處理器。當您自己建立反序列化器時,只需呼叫 setValidator;如果您使用屬性配置序列化器,請將消費者配置屬性 ErrorHandlingDeserializer.VALIDATOR_CLASS 設定為 Validator 的類或完全限定類名。當使用 Spring Boot 時,此屬性名為 spring.kafka.consumer.properties.spring.deserializer.validator.class。
批處理偵聽器與有效負載轉換
當您使用批處理偵聽器容器工廠時,還可以在 BatchMessagingMessageConverter 中使用 JacksonJsonMessageConverter 來轉換批處理訊息。有關更多資訊,請參閱 序列化、反序列化和訊息轉換 和 Spring Messaging 訊息轉換。
預設情況下,轉換的型別從偵聽器引數推斷。如果您使用 DefaultJackson2TypeMapper 配置 JacksonJsonMessageConverter,並將其 TypePrecedence 設定為 TYPE_ID(而不是預設的 INFERRED),則轉換器將使用頭中的型別資訊(如果存在)。這允許,例如,偵聽器方法使用介面而不是具體類宣告。此外,型別轉換器支援對映,因此反序列化可以轉換為與源不同的型別(只要資料相容)。當您使用 類級別 @KafkaListener 例項 時,這也很實用,因為有效負載必須已經轉換才能確定要呼叫的方法。以下示例建立了使用此方法的 bean:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
請注意,為了使其工作,轉換目標的 方法簽名 必須是一個帶有單個泛型引數型別的容器物件,例如以下內容:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
請注意,您仍然可以訪問批處理頭。
如果批處理轉換器具有支援它的記錄轉換器,您還可以接收訊息列表,其中有效負載根據泛型型別進行轉換。以下示例展示瞭如何實現:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
如果批處理中的記錄無法轉換,其有效負載將在目標 payloads 列表中設定為 null。此記錄的轉換異常將作為警告記錄,並作為 List<ConversionException> 的一項儲存在 KafkaHeaders.CONVERSION_FAILURES 頭中。目標 @KafkaListener 方法可以執行 Java Stream API 以從有效負載列表中過濾掉這些 null 值,或對轉換異常頭進行處理。
@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
@Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {
for (int i = 0; i < list.size(); i++) {
if (conversionFailures.get(i) != null) {
throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
}
}
}
ConversionService 定製
從 2.1.1 版本開始,預設的 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory 用於解析偵聽器方法呼叫的引數的 org.springframework.core.convert.ConversionService 將提供實現以下任何介面的所有 bean:
-
org.springframework.core.convert.converter.Converter -
org.springframework.core.convert.converter.GenericConverter -
org.springframework.format.Formatter
這允許您進一步自定義偵聽器反序列化,而無需更改 ConsumerFactory 和 KafkaListenerContainerFactory 的預設配置。
透過 KafkaListenerConfigurer bean 在 KafkaListenerEndpointRegistrar 上設定自定義 MessageHandlerMethodFactory 將停用此功能。 |
向 @KafkaListener 新增自定義 HandlerMethodArgumentResolver
從 2.4.2 版本開始,您可以新增自己的 HandlerMethodArgumentResolver 並解析自定義方法引數。您只需實現 KafkaListenerConfigurer 並使用 KafkaListenerEndpointRegistrar 類的 setCustomMethodArgumentResolvers() 方法。
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
您還可以透過向 KafkaListenerEndpointRegistrar bean 新增自定義 MessageHandlerMethodFactory 來完全替換框架的引數解析。如果您這樣做,並且您的應用程式需要處理具有 null value() 的墓碑記錄(例如,來自壓縮主題),您應該向工廠新增一個 KafkaNullAwarePayloadArgumentResolver;它必須是最後一個解析器,因為它支援所有型別並且可以匹配沒有 @Payload 註解的引數。如果您正在使用 DefaultMessageHandlerMethodFactory,請將此解析器設定為最後一個自定義解析器;工廠將確保此解析器將在標準 PayloadMethodArgumentResolver 之前使用,後者不瞭解 KafkaNull 有效負載。
另請參閱 空有效負載和墓碑記錄的日誌壓縮。