序列化、反序列化和訊息轉換
概述
Apache Kafka 為記錄值及其鍵的序列化和反序列化提供了高階 API。它透過 org.apache.kafka.common.serialization.Serializer<T>
和 org.apache.kafka.common.serialization.Deserializer<T>
抽象提供了內建實現。同時,我們可以使用 Producer
或 Consumer
配置屬性來指定序列化器和反序列化器類。以下示例展示瞭如何進行配置
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
對於更復雜或特殊的情況,KafkaConsumer
(以及 KafkaProducer
)提供了過載建構函式,分別用於接受 keys
和 values
的 Serializer
和 Deserializer
例項。
使用此 API 時,DefaultKafkaProducerFactory
和 DefaultKafkaConsumerFactory
也透過屬性(透過建構函式或 setter 方法)提供了將自定義 Serializer
和 Deserializer
例項注入目標 Producer
或 Consumer
的能力。此外,您還可以透過建構函式傳入 Supplier<Serializer>
或 Supplier<Deserializer>
例項 - 這些 Supplier
會在建立每個 Producer
或 Consumer
時被呼叫。
String 序列化
從版本 2.5 開始,Spring for Apache Kafka 提供了使用實體字串表示的 ToStringSerializer
和 ParseStringDeserializer
類。它們依賴於 toString
方法以及某些 Function<String>
或 BiFunction<String, Headers>
函式來解析字串並填充例項的屬性。通常,這將呼叫類上的靜態方法,例如 parse
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
預設情況下,ToStringSerializer
配置為在記錄 Headers
中傳遞有關序列化實體型別的資訊。您可以透過將 addTypeInfo
屬性設定為 false
來停用此功能。此資訊可供接收端的 ParseStringDeserializer
使用。
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(預設true
):您可以將其設定為false
以停用ToStringSerializer
上的此功能(設定addTypeInfo
屬性)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以配置用於將 String
轉換為 byte[]
或從 byte[]
轉換回 String
的 Charset
,預設為 UTF-8
。
您可以使用 ConsumerConfig
屬性配置反序列化器使用的解析器方法名稱
-
ParseStringDeserializer.KEY_PARSER
-
ParseStringDeserializer.VALUE_PARSER
屬性必須包含類的完全限定名,後跟方法名,用句點 .
分隔。方法必須是靜態的,並且簽名必須是 (String, Headers)
或 (String)
之一。
還提供了 ToFromStringSerde
,用於 Kafka Streams。
JSON
Spring for Apache Kafka 還提供了基於 Jackson JSON object mapper 的 JsonSerializer
和 JsonDeserializer
實現。JsonSerializer
允許將任何 Java 物件寫為 JSON 格式的 byte[]
。JsonDeserializer
需要一個額外的 Class<?> targetType
引數,以便將消費到的 byte[]
反序列化為適當的目標物件。以下示例展示瞭如何建立一個 JsonDeserializer
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以使用 ObjectMapper
自定義 JsonSerializer
和 JsonDeserializer
。您還可以擴充套件它們,在 configure(Map<String, ?> configs, boolean isKey)
方法中實現一些特定的配置邏輯。
從版本 2.3 開始,所有支援 JSON 的元件預設使用一個 JacksonUtils.enhancedObjectMapper()
例項進行配置,該例項停用了 MapperFeature.DEFAULT_VIEW_INCLUSION
和 DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
特性。此外,該例項還提供了用於自定義資料型別的知名模組,例如 Java 時間和 Kotlin 支援。更多資訊請參閱 JacksonUtils.enhancedObjectMapper()
JavaDocs。此方法還註冊了一個 org.springframework.kafka.support.JacksonMimeTypeModule
,用於將 org.springframework.util.MimeType
物件序列化為普通字串,以實現網路上的跨平臺相容性。JacksonMimeTypeModule
可以註冊為應用上下文中的 bean,並將自動配置到Spring Boot ObjectMapper
例項中。
同樣從版本 2.3 開始,JsonDeserializer
提供了基於 TypeReference
的建構函式,以便更好地處理目標泛型容器型別。
從版本 2.1 開始,您可以在記錄 Headers
中傳遞型別資訊,從而允許處理多種型別。此外,您可以使用以下 Kafka 屬性來配置序列化器和反序列化器。如果您已經分別向 KafkaConsumer
和 KafkaProducer
提供了 Serializer
和 Deserializer
例項,則這些屬性無效。
配置屬性
-
JsonSerializer.ADD_TYPE_INFO_HEADERS
(預設true
):您可以將其設定為false
以停用JsonSerializer
上的此功能(設定addTypeInfo
屬性)。 -
JsonSerializer.TYPE_MAPPINGS
(預設empty
):參閱 型別對映。 -
JsonDeserializer.USE_TYPE_INFO_HEADERS
(預設true
):您可以將其設定為false
以忽略序列化器設定的頭部。 -
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(預設true
):您可以將其設定為false
以保留序列化器設定的頭部。 -
JsonDeserializer.KEY_DEFAULT_TYPE
:如果不存在頭部資訊,則為鍵的反序列化提供的回退型別。 -
JsonDeserializer.VALUE_DEFAULT_TYPE
:如果不存在頭部資訊,則為值的反序列化提供的回退型別。 -
JsonDeserializer.TRUSTED_PACKAGES
(預設java.util
,java.lang
):允許反序列化的包模式的逗號分隔列表。*
表示反序列化所有。 -
JsonDeserializer.TYPE_MAPPINGS
(預設empty
):參閱 型別對映。 -
JsonDeserializer.KEY_TYPE_METHOD
(預設empty
):參閱 使用方法確定型別。 -
JsonDeserializer.VALUE_TYPE_METHOD
(預設empty
):參閱 使用方法確定型別。
從版本 2.2 開始,型別資訊頭部(如果由序列化器新增)將被反序列化器移除。您可以透過將 removeTypeHeaders
屬性設定為 false
來恢復到之前的行為,無論是直接在反序列化器上設定,還是使用前面描述的配置屬性設定。
從版本 2.8 開始,如果您按照 程式設計構建 中所示以程式設計方式構造序列化器或反序列化器,只要您沒有明確設定任何屬性(使用 set*() 方法或流式 API),工廠就會應用上述屬性。以前,以程式設計方式建立時,配置屬性從未被應用;如果您直接在物件上明確設定屬性,情況仍然如此。 |
型別對映
從版本 2.2 開始,當使用 JSON 時,您現在可以使用前面列表中的屬性提供型別對映。以前,您必須在序列化器和反序列化器中自定義型別對映器。對映由逗號分隔的 token:className
對列表組成。出站時,負載的類名被對映到相應的 token。入站時,型別頭部中的 token 被對映到相應的類名。
以下示例建立了一組對映
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
相應的物件必須相容。 |
如果您使用 Spring Boot,您可以在 application.properties
(或 yaml)檔案中提供這些屬性。以下示例展示瞭如何進行配置
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
只能透過屬性進行簡單配置。對於更高階的配置(例如在序列化器和反序列化器中使用自定義
也提供了 Setter 方法,作為使用這些建構函式的替代方案。 |
當使用 Spring Boot 並覆蓋 ConsumerFactory 和 ProducerFactory 時(如上所示),需要將萬用字元泛型型別與 bean 方法返回型別一起使用。如果提供具體的泛型型別,則 Spring Boot 將忽略這些 bean,並仍然使用預設的。 |
從版本 2.2 開始,您可以透過使用帶布林引數 useHeadersIfPresent
(預設為 true
)的過載建構函式之一,明確配置反序列化器使用提供的目標型別並忽略頭部中的型別資訊。以下示例展示瞭如何進行配置
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法確定型別
從版本 2.5 開始,您現在可以透過屬性配置反序列化器呼叫一個方法來確定目標型別。如果此方法存在,它將覆蓋上面討論的任何其他技術。如果資料是由不使用 Spring 序列化器的應用程式釋出的,並且您需要根據資料或其他頭部反序列化到不同的型別,這將很有用。將這些屬性設定為方法名稱 - 即完全限定的類名後跟方法名,用句點 .
分隔。該方法必須宣告為 public static
,具有以下三種簽名之一:(String topic, byte[] data, Headers headers)
、(byte[] data, Headers headers)
或 (byte[] data)
,並返回一個 Jackson JavaType
。
-
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
-
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
您可以使用任意頭部或檢查資料來確定型別。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
對於更復雜的資料檢查,可以考慮使用 JsonPath
或類似工具,但是用於確定型別的測試越簡單,過程就越高效。
以下是以程式設計方式建立反序列化器(在建構函式中向消費者工廠提供反序列化器)的示例
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程式設計構建
在為生產者/消費者工廠以程式設計方式構造序列化器/反序列化器時,自版本 2.3 以來,您可以使用流式 API,這簡化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
要以程式設計方式提供型別對映,類似於 使用方法確定型別,請使用 typeFunction
屬性。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要您不使用流式 API 配置屬性,或使用 set*()
方法設定它們,工廠將使用配置屬性配置序列化器/反序列化器;參閱 配置屬性。
委派序列化器和反序列化器
使用頭部
版本 2.3 引入了 DelegatingSerializer
和 DelegatingDeserializer
,它們允許生產和消費具有不同鍵和/或值型別的記錄。生產者必須將頭部 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
設定為用於選擇值序列化器的選擇器值,並將 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
設定為鍵的選擇器值;如果未找到匹配項,則丟擲 IllegalStateException
。
對於入站記錄,反序列化器使用相同的頭部來選擇要使用的反序列化器;如果未找到匹配項或頭部不存在,則返回原始的 byte[]
。
您可以透過建構函式配置選擇器到 Serializer
/ Deserializer
的對映,或者透過 Kafka 生產者/消費者屬性使用鍵 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
和 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
進行配置。對於序列化器,生產者屬性可以是 Map<String, Object>
,其中鍵是選擇器,值是 Serializer
例項、序列化器 Class
或類名。該屬性也可以是逗號分隔的對映條目字串,如下所示。
對於反序列化器,消費者屬性可以是 Map<String, Object>
,其中鍵是選擇器,值是 Deserializer
例項、反序列化器 Class
或類名。該屬性也可以是逗號分隔的對映條目字串,如下所示。
要使用屬性進行配置,使用以下語法
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
生產者隨後會將 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
頭部設定為 thing1
或 thing2
。
此技術支援將不同型別傳送到同一主題(或不同主題)。
從版本 2.5.1 開始,如果型別(鍵或值)是 Serdes 支援的標準型別之一(例如 Long 、Integer 等),則不必設定選擇器頭部。相反,序列化器將把頭部設定為該型別的類名。對於這些型別,不必配置序列化器或反序列化器,它們將動態地(建立一次)建立。 |
對於另一種將不同型別傳送到不同主題的技術,參閱 使用 RoutingKafkaTemplate
。
按型別
版本 2.8 引入了 DelegatingByTypeSerializer
。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
從版本 2.8.3 開始,您可以配置序列化器檢查對映鍵是否可賦值自目標物件,這在委託序列化器可以序列化子類時很有用。在這種情況下,如果存在歧義匹配,則應提供有序的 Map
,例如 LinkedHashMap
。
按主題
從版本 2.8 開始,DelegatingByTopicSerializer
和 DelegatingByTopicDeserializer
允許基於主題名稱選擇序列化器/反序列化器。使用正則表示式 Pattern
來查詢要使用的例項。可以透過建構函式或透過屬性(逗號分隔的 pattern:serializer
列表)配置對映。
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
當用於鍵時,使用 KEY_SERIALIZATION_TOPIC_CONFIG
屬性。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
當沒有模式匹配時,您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
和 DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
指定要使用的預設序列化器/反序列化器。
一個附加屬性 DelegatingByTopicSerialization.CASE_SENSITIVE
(預設為 true
),當設定為 false
時,主題查詢將不區分大小寫。
重試反序列化器
RetryingDeserializer
使用一個委託的 Deserializer
和 RetryTemplate
,用於在委託反序列化器可能在反序列化期間出現瞬時錯誤(例如網路問題)時重試反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
從版本 3.1.2
開始,可以在 RetryingDeserializer
上可選地設定 RecoveryCallback
。
關於如何使用重試策略、退避策略等配置 RetryTemplate
,請參閱 spring-retry 專案。
Spring Messaging 訊息轉換
雖然從底層 Kafka Consumer
和 Producer
的角度來看,Serializer
和 Deserializer
API 相當簡單靈活,但在 Spring Messaging 級別使用 @KafkaListener
或 Spring Integration 的 Apache Kafka 支援 時,您可能需要更多的靈活性。為了讓您輕鬆地在 org.springframework.messaging.Message
之間進行轉換,Spring for Apache Kafka 提供了一個 MessageConverter
抽象,其實現為 MessagingMessageConverter
及其定製的 JsonMessageConverter
(和子類)。您可以將 MessageConverter
直接注入到 KafkaTemplate
例項中,也可以透過使用 @KafkaListener.containerFactory()
屬性的 AbstractKafkaListenerContainerFactory
bean 定義來注入。以下示例展示瞭如何進行配置
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
使用 Spring Boot 時,只需將轉換器定義為一個 @Bean
,Spring Boot 自動配置就會將其注入自動配置的模板和容器工廠中。
當您使用 @KafkaListener
時,引數型別會被提供給訊息轉換器,以協助進行轉換。
這種型別推斷只有當 |
在消費者端,您可以配置一個 在生產者端,當您使用 Spring Integration 或
同樣,使用 為方便起見,從版本 2.3 開始,該框架還提供了 |
從版本 2.7.1 開始,訊息負載轉換可以委託給 spring-messaging
的 SmartMessageConverter
;例如,這使得轉換可以基於 MessageHeaders.CONTENT_TYPE
頭部進行。
KafkaMessageConverter.fromMessage() 方法在進行出站轉換到 ProducerRecord 時被呼叫,訊息負載位於 ProducerRecord.value() 屬性中。KafkaMessageConverter.toMessage() 方法在進行入站轉換從 ConsumerRecord 時被呼叫,負載即為 ConsumerRecord.value() 屬性。SmartMessageConverter.toMessage() 方法用於從傳遞給 fromMessage() 的 Message (通常透過 KafkaTemplate.send(Message<?> msg) )建立新的出站 Message<?> 。類似地,在 KafkaMessageConverter.toMessage() 方法中,在轉換器從 ConsumerRecord 建立新的 Message<?> 之後,會呼叫 SmartMessageConverter.fromMessage() 方法,然後使用新轉換的負載建立最終的入站訊息。在任何一種情況下,如果 SmartMessageConverter 返回 null ,則使用原始訊息。 |
當在 KafkaTemplate
和監聽器容器工廠中使用預設轉換器時,您可以透過在模板上呼叫 setMessagingConverter()
方法以及透過 @KafkaListener
方法上的 contentTypeConverter
屬性來配置 SmartMessageConverter
。
示例
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring Data Projection 介面
從版本 2.1.1 開始,您可以將 JSON 轉換為 Spring Data Projection 介面,而不是具體的型別。這允許非常選擇性且低耦合地繫結到資料,包括從 JSON 文件內多個位置查詢值。例如,以下介面可以定義為訊息負載型別
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
預設情況下,訪問器方法將用於在接收到的 JSON 文件中將屬性名作為欄位查詢。@JsonPath
表示式允許自定義值查詢,甚至可以定義多個 JSON Path 表示式,從多個位置查詢值,直到某個表示式返回實際值。
要啟用此功能,請使用配置了適當委託轉換器(用於出站轉換和轉換非 Projection 介面)的 ProjectingMessageConverter
。您還必須將 spring-data:spring-data-commons
和 com.jayway.jsonpath:json-path
新增到 classpath 中。
當用作 @KafkaListener
方法的引數時,介面型別會正常自動傳遞給轉換器。
使用 ErrorHandlingDeserializer
當反序列化器未能反序列化訊息時,Spring 無法處理此問題,因為它發生在 poll()
返回之前。為了解決這個問題,引入了 ErrorHandlingDeserializer
。此反序列化器委託給一個實際的反序列化器(用於鍵或值)。如果委託方未能反序列化記錄內容,ErrorHandlingDeserializer
將返回一個 null
值,並在頭部中包含一個 DeserializationException
,該頭部包含原因和原始位元組。當您使用記錄級別的 MessageListener
時,如果 ConsumerRecord
的鍵或值包含 DeserializationException
頭部,容器的 ErrorHandler
將被呼叫,並傳入失敗的 ConsumerRecord
。該記錄不會傳遞給監聽器。
或者,您可以透過提供 failedDeserializationFunction
來配置 ErrorHandlingDeserializer
以建立自定義值,該函式是一個 Function<FailedDeserializationInfo, T>
。此函式被呼叫以建立 T
的例項,然後像往常一樣將該例項傳遞給監聽器。型別為 FailedDeserializationInfo
的物件(包含所有上下文資訊)會提供給該函式。您可以在頭部中找到 DeserializationException
(作為序列化的 Java 物件)。更多資訊請參見 ErrorHandlingDeserializer
的 Javadoc。
您可以使用接受鍵和值 Deserializer
物件的 DefaultKafkaConsumerFactory
建構函式,並注入您配置了適當委託的 ErrorHandlingDeserializer
例項。或者,您可以使用消費者配置屬性(ErrorHandlingDeserializer
使用這些屬性)來例項化委託。屬性名稱是 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
。屬性值可以是類或類名。以下示例顯示瞭如何設定這些屬性
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用 failedDeserializationFunction
。
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
前面的示例使用以下配置
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消費者配置了 ErrorHandlingDeserializer ,那麼配置 KafkaTemplate 及其生產者使用一個既可以處理普通物件也可以處理因反序列化異常產生的原始 byte[] 值的序列化器就很重要。模板的泛型值型別應該是 Object 。一種技術是使用 DelegatingByTypeSerializer ;示例如下 |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
當將 ErrorHandlingDeserializer
與批處理監聽器一起使用時,您必須檢查訊息頭部中的反序列化異常。與 DefaultBatchErrorHandler
一起使用時,您可以使用該頭部確定異常發生在哪個記錄上,並透過 BatchListenerFailedException
與錯誤處理器通訊。
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException()
可用於將頭部轉換為 DeserializationException
。
當消費 List<ConsumerRecord<?, ?>
時,應改為使用 SerializationUtils.getExceptionFromHeader()
。
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果您還使用了 DeadLetterPublishingRecoverer ,為 DeserializationException 釋出的記錄將具有 record.value() 型別為 byte[] ;這不應被序列化。考慮使用配置為對 byte[] 使用 ByteArraySerializer ,而對所有其他型別使用正常序列化器(Json、Avro 等)的 DelegatingByTypeSerializer 。 |
從版本 3.1 開始,您可以向 ErrorHandlingDeserializer
新增一個 Validator
。如果委託 Deserializer
成功反序列化了物件,但該物件驗證失敗,則會丟擲類似反序列化異常的異常。這允許將原始原始資料傳遞給錯誤處理器。自行建立反序列化器時,只需呼叫 setValidator
;如果您使用屬性配置序列化器,請將消費者配置屬性 ErrorHandlingDeserializer.VALIDATOR_CLASS
設定為您的 Validator
的類或完全限定類名。使用 Spring Boot 時,此屬性名稱是 spring.kafka.consumer.properties.spring.deserializer.validator.class
。
批處理監聽器的 Payload 轉換
當您使用批處理監聽器容器工廠時,您還可以在 BatchMessagingMessageConverter
中使用 JsonMessageConverter
來轉換批處理訊息。更多資訊請參見 序列化、反序列化和訊息轉換 以及 Spring Messaging 訊息轉換。
預設情況下,轉換的型別從監聽器引數中推斷。如果您使用 DefaultJackson2TypeMapper
配置 JsonMessageConverter
,並將其 TypePrecedence
設定為 TYPE_ID
(而不是預設的 INFERRED
),轉換器將使用頭部中的型別資訊(如果存在)。這允許,例如,監聽器方法宣告使用介面而不是具體類。此外,型別轉換器支援對映,因此反序列化可以轉換為與源不同的型別(只要資料相容)。當您使用 類級別的 @KafkaListener
例項 時,其中 payload 必須已經轉換才能確定呼叫哪個方法,這也很有用。以下示例建立使用此方法的 bean
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
請注意,要使其工作,轉換目標的簽名必須是一個容器物件,只有一個泛型引數型別,如下所示
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
請注意,您仍然可以訪問批處理頭部。
如果批處理轉換器具有支援此功能的記錄轉換器,您還可以接收訊息列表,其中 payload 會根據泛型型別進行轉換。以下示例顯示瞭如何實現
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
如果批處理中的記錄無法轉換,其 payload 將被設定為 null
到目標 payloads
列表中。該記錄的轉換異常將作為警告記錄,並存儲到 KafkaHeaders.CONVERSION_FAILURES
頭部中,作為 List<ConversionException>
的一個專案。目標 @KafkaListener
方法可以使用 Java Stream
API 從 payload 列表中過濾掉這些 null
值,或者處理轉換異常頭部
@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
@Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {
for (int i = 0; i < list.size(); i++) {
if (conversionFailures.get(i) != null) {
throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
}
}
}
ConversionService
自定義
從版本 2.1.1 開始,預設的 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
用於解析監聽器方法呼叫的引數時使用的 org.springframework.core.convert.ConversionService
會提供給所有實現以下任何介面的 bean
-
org.springframework.core.convert.converter.Converter
-
org.springframework.core.convert.converter.GenericConverter
-
org.springframework.format.Formatter
這允許您進一步自定義監聽器反序列化,而無需更改 ConsumerFactory
和 KafkaListenerContainerFactory
的預設配置。
透過 KafkaListenerConfigurer bean 在 KafkaListenerEndpointRegistrar 上設定自定義的 MessageHandlerMethodFactory 將停用此功能。 |
向 @KafkaListener
新增自定義的 HandlerMethodArgumentResolver
從版本 2.4.2 開始,您可以新增自己的 HandlerMethodArgumentResolver
並解析自定義方法引數。您只需要實現 KafkaListenerConfigurer
並使用 KafkaListenerEndpointRegistrar
類中的 setCustomMethodArgumentResolvers()
方法。
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
您也可以透過向 KafkaListenerEndpointRegistrar
bean 新增自定義的 MessageHandlerMethodFactory
來完全替換框架的引數解析。如果您這樣做,並且您的應用程式需要處理帶有 null
value()
的墓碑記錄(例如來自壓縮主題),您應該向工廠新增一個 KafkaNullAwarePayloadArgumentResolver
;它必須是最後一個解析器,因為它支援所有型別,並且可以匹配沒有 @Payload
註解的引數。如果您使用的是 DefaultMessageHandlerMethodFactory
,請將此解析器設定為最後一個自定義解析器;工廠將確保在標準的 PayloadMethodArgumentResolver
之前使用此解析器,而標準的解析器對 KafkaNull
payload 不瞭解。