序列化、反序列化和訊息轉換

概述

Apache Kafka 為記錄值及其鍵的序列化和反序列化提供了高階 API。它透過 org.apache.kafka.common.serialization.Serializer<T>org.apache.kafka.common.serialization.Deserializer<T> 抽象提供了內建實現。同時,我們可以使用 ProducerConsumer 配置屬性來指定序列化器和反序列化器類。以下示例展示瞭如何進行配置

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

對於更復雜或特殊的情況,KafkaConsumer(以及 KafkaProducer)提供了過載建構函式,分別用於接受 keysvaluesSerializerDeserializer 例項。

使用此 API 時,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 也透過屬性(透過建構函式或 setter 方法)提供了將自定義 SerializerDeserializer 例項注入目標 ProducerConsumer 的能力。此外,您還可以透過建構函式傳入 Supplier<Serializer>Supplier<Deserializer> 例項 - 這些 Supplier 會在建立每個 ProducerConsumer 時被呼叫。

String 序列化

從版本 2.5 開始,Spring for Apache Kafka 提供了使用實體字串表示的 ToStringSerializerParseStringDeserializer 類。它們依賴於 toString 方法以及某些 Function<String>BiFunction<String, Headers> 函式來解析字串並填充例項的屬性。通常,這將呼叫類上的靜態方法,例如 parse

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);

預設情況下,ToStringSerializer 配置為在記錄 Headers 中傳遞有關序列化實體型別的資訊。您可以透過將 addTypeInfo 屬性設定為 false 來停用此功能。此資訊可供接收端的 ParseStringDeserializer 使用。

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS(預設 true):您可以將其設定為 false 以停用 ToStringSerializer 上的此功能(設定 addTypeInfo 屬性)。

ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});

您可以配置用於將 String 轉換為 byte[] 或從 byte[] 轉換回 StringCharset,預設為 UTF-8

您可以使用 ConsumerConfig 屬性配置反序列化器使用的解析器方法名稱

  • ParseStringDeserializer.KEY_PARSER

  • ParseStringDeserializer.VALUE_PARSER

屬性必須包含類的完全限定名,後跟方法名,用句點 . 分隔。方法必須是靜態的,並且簽名必須是 (String, Headers)(String) 之一。

還提供了 ToFromStringSerde,用於 Kafka Streams。

JSON

Spring for Apache Kafka 還提供了基於 Jackson JSON object mapper 的 JsonSerializerJsonDeserializer 實現。JsonSerializer 允許將任何 Java 物件寫為 JSON 格式的 byte[]JsonDeserializer 需要一個額外的 Class<?> targetType 引數,以便將消費到的 byte[] 反序列化為適當的目標物件。以下示例展示瞭如何建立一個 JsonDeserializer

JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);

您可以使用 ObjectMapper 自定義 JsonSerializerJsonDeserializer。您還可以擴充套件它們,在 configure(Map<String, ?> configs, boolean isKey) 方法中實現一些特定的配置邏輯。

從版本 2.3 開始,所有支援 JSON 的元件預設使用一個 JacksonUtils.enhancedObjectMapper() 例項進行配置,該例項停用了 MapperFeature.DEFAULT_VIEW_INCLUSIONDeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES 特性。此外,該例項還提供了用於自定義資料型別的知名模組,例如 Java 時間和 Kotlin 支援。更多資訊請參閱 JacksonUtils.enhancedObjectMapper() JavaDocs。此方法還註冊了一個 org.springframework.kafka.support.JacksonMimeTypeModule,用於將 org.springframework.util.MimeType 物件序列化為普通字串,以實現網路上的跨平臺相容性。JacksonMimeTypeModule 可以註冊為應用上下文中的 bean,並將自動配置到Spring Boot ObjectMapper 例項中。

同樣從版本 2.3 開始,JsonDeserializer 提供了基於 TypeReference 的建構函式,以便更好地處理目標泛型容器型別。

從版本 2.1 開始,您可以在記錄 Headers 中傳遞型別資訊,從而允許處理多種型別。此外,您可以使用以下 Kafka 屬性來配置序列化器和反序列化器。如果您已經分別向 KafkaConsumerKafkaProducer 提供了 SerializerDeserializer 例項,則這些屬性無效。

配置屬性

  • JsonSerializer.ADD_TYPE_INFO_HEADERS(預設 true):您可以將其設定為 false 以停用 JsonSerializer 上的此功能(設定 addTypeInfo 屬性)。

  • JsonSerializer.TYPE_MAPPINGS(預設 empty):參閱 型別對映

  • JsonDeserializer.USE_TYPE_INFO_HEADERS(預設 true):您可以將其設定為 false 以忽略序列化器設定的頭部。

  • JsonDeserializer.REMOVE_TYPE_INFO_HEADERS(預設 true):您可以將其設定為 false 以保留序列化器設定的頭部。

  • JsonDeserializer.KEY_DEFAULT_TYPE:如果不存在頭部資訊,則為鍵的反序列化提供的回退型別。

  • JsonDeserializer.VALUE_DEFAULT_TYPE:如果不存在頭部資訊,則為值的反序列化提供的回退型別。

  • JsonDeserializer.TRUSTED_PACKAGES(預設 java.util, java.lang):允許反序列化的包模式的逗號分隔列表。* 表示反序列化所有。

  • JsonDeserializer.TYPE_MAPPINGS(預設 empty):參閱 型別對映

  • JsonDeserializer.KEY_TYPE_METHOD(預設 empty):參閱 使用方法確定型別

  • JsonDeserializer.VALUE_TYPE_METHOD(預設 empty):參閱 使用方法確定型別

從版本 2.2 開始,型別資訊頭部(如果由序列化器新增)將被反序列化器移除。您可以透過將 removeTypeHeaders 屬性設定為 false 來恢復到之前的行為,無論是直接在反序列化器上設定,還是使用前面描述的配置屬性設定。

從版本 2.8 開始,如果您按照 程式設計構建 中所示以程式設計方式構造序列化器或反序列化器,只要您沒有明確設定任何屬性(使用 set*() 方法或流式 API),工廠就會應用上述屬性。以前,以程式設計方式建立時,配置屬性從未被應用;如果您直接在物件上明確設定屬性,情況仍然如此。

型別對映

從版本 2.2 開始,當使用 JSON 時,您現在可以使用前面列表中的屬性提供型別對映。以前,您必須在序列化器和反序列化器中自定義型別對映器。對映由逗號分隔的 token:className 對列表組成。出站時,負載的類名被對映到相應的 token。入站時,型別頭部中的 token 被對映到相應的類名。

以下示例建立了一組對映

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
相應的物件必須相容。

如果您使用 Spring Boot,您可以在 application.properties(或 yaml)檔案中提供這些屬性。以下示例展示瞭如何進行配置

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

只能透過屬性進行簡單配置。對於更高階的配置(例如在序列化器和反序列化器中使用自定義 ObjectMapper),您應該使用接受預構建序列化器和反序列化器的生產者和消費者工廠建構函式。以下 Spring Boot 示例覆蓋了預設工廠

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

也提供了 Setter 方法,作為使用這些建構函式的替代方案。

當使用 Spring Boot 並覆蓋 ConsumerFactoryProducerFactory 時(如上所示),需要將萬用字元泛型型別與 bean 方法返回型別一起使用。如果提供具體的泛型型別,則 Spring Boot 將忽略這些 bean,並仍然使用預設的。

從版本 2.2 開始,您可以透過使用帶布林引數 useHeadersIfPresent(預設為 true)的過載建構函式之一,明確配置反序列化器使用提供的目標型別並忽略頭部中的型別資訊。以下示例展示瞭如何進行配置

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

使用方法確定型別

從版本 2.5 開始,您現在可以透過屬性配置反序列化器呼叫一個方法來確定目標型別。如果此方法存在,它將覆蓋上面討論的任何其他技術。如果資料是由不使用 Spring 序列化器的應用程式釋出的,並且您需要根據資料或其他頭部反序列化到不同的型別,這將很有用。將這些屬性設定為方法名稱 - 即完全限定的類名後跟方法名,用句點 . 分隔。該方法必須宣告為 public static,具有以下三種簽名之一:(String topic, byte[] data, Headers headers)(byte[] data, Headers headers)(byte[] data),並返回一個 Jackson JavaType

  • JsonDeserializer.KEY_TYPE_METHOD : spring.json.key.type.method

  • JsonDeserializer.VALUE_TYPE_METHOD : spring.json.value.type.method

您可以使用任意頭部或檢查資料來確定型別。

JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}

對於更復雜的資料檢查,可以考慮使用 JsonPath 或類似工具,但是用於確定型別的測試越簡單,過程就越高效。

以下是以程式設計方式建立反序列化器(在建構函式中向消費者工廠提供反序列化器)的示例

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}

程式設計構建

在為生產者/消費者工廠以程式設計方式構造序列化器/反序列化器時,自版本 2.3 以來,您可以使用流式 API,這簡化了配置。

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}

要以程式設計方式提供型別對映,類似於 使用方法確定型別,請使用 typeFunction 屬性。

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);

或者,只要您不使用流式 API 配置屬性,或使用 set*() 方法設定它們,工廠將使用配置屬性配置序列化器/反序列化器;參閱 配置屬性

委派序列化器和反序列化器

使用頭部

版本 2.3 引入了 DelegatingSerializerDelegatingDeserializer,它們允許生產和消費具有不同鍵和/或值型別的記錄。生產者必須將頭部 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 設定為用於選擇值序列化器的選擇器值,並將 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR 設定為鍵的選擇器值;如果未找到匹配項,則丟擲 IllegalStateException

對於入站記錄,反序列化器使用相同的頭部來選擇要使用的反序列化器;如果未找到匹配項或頭部不存在,則返回原始的 byte[]

您可以透過建構函式配置選擇器到 Serializer / Deserializer 的對映,或者透過 Kafka 生產者/消費者屬性使用鍵 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIGDelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG 進行配置。對於序列化器,生產者屬性可以是 Map<String, Object>,其中鍵是選擇器,值是 Serializer 例項、序列化器 Class 或類名。該屬性也可以是逗號分隔的對映條目字串,如下所示。

對於反序列化器,消費者屬性可以是 Map<String, Object>,其中鍵是選擇器,值是 Deserializer 例項、反序列化器 Class 或類名。該屬性也可以是逗號分隔的對映條目字串,如下所示。

要使用屬性進行配置,使用以下語法

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

生產者隨後會將 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 頭部設定為 thing1thing2

此技術支援將不同型別傳送到同一主題(或不同主題)。

從版本 2.5.1 開始,如果型別(鍵或值)是 Serdes 支援的標準型別之一(例如 LongInteger 等),則不必設定選擇器頭部。相反,序列化器將把頭部設定為該型別的類名。對於這些型別,不必配置序列化器或反序列化器,它們將動態地(建立一次)建立。

對於另一種將不同型別傳送到不同主題的技術,參閱 使用 RoutingKafkaTemplate

按型別

版本 2.8 引入了 DelegatingByTypeSerializer

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

從版本 2.8.3 開始,您可以配置序列化器檢查對映鍵是否可賦值自目標物件,這在委託序列化器可以序列化子類時很有用。在這種情況下,如果存在歧義匹配,則應提供有序的 Map,例如 LinkedHashMap

按主題

從版本 2.8 開始,DelegatingByTopicSerializerDelegatingByTopicDeserializer 允許基於主題名稱選擇序列化器/反序列化器。使用正則表示式 Pattern 來查詢要使用的例項。可以透過建構函式或透過屬性(逗號分隔的 pattern:serializer 列表)配置對映。

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());

當用於鍵時,使用 KEY_SERIALIZATION_TOPIC_CONFIG 屬性。

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            new IntegerSerializer(),
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}

當沒有模式匹配時,您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULTDelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT 指定要使用的預設序列化器/反序列化器。

一個附加屬性 DelegatingByTopicSerialization.CASE_SENSITIVE(預設為 true),當設定為 false 時,主題查詢將不區分大小寫。

重試反序列化器

RetryingDeserializer 使用一個委託的 DeserializerRetryTemplate,用於在委託反序列化器可能在反序列化期間出現瞬時錯誤(例如網路問題)時重試反序列化。

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

從版本 3.1.2 開始,可以在 RetryingDeserializer 上可選地設定 RecoveryCallback

關於如何使用重試策略、退避策略等配置 RetryTemplate,請參閱 spring-retry 專案。

Spring Messaging 訊息轉換

雖然從底層 Kafka ConsumerProducer 的角度來看,SerializerDeserializer API 相當簡單靈活,但在 Spring Messaging 級別使用 @KafkaListenerSpring Integration 的 Apache Kafka 支援 時,您可能需要更多的靈活性。為了讓您輕鬆地在 org.springframework.messaging.Message 之間進行轉換,Spring for Apache Kafka 提供了一個 MessageConverter 抽象,其實現為 MessagingMessageConverter 及其定製的 JsonMessageConverter(和子類)。您可以將 MessageConverter 直接注入到 KafkaTemplate 例項中,也可以透過使用 @KafkaListener.containerFactory() 屬性的 AbstractKafkaListenerContainerFactory bean 定義來注入。以下示例展示瞭如何進行配置

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

使用 Spring Boot 時,只需將轉換器定義為一個 @Bean,Spring Boot 自動配置就會將其注入自動配置的模板和容器工廠中。

當您使用 @KafkaListener 時,引數型別會被提供給訊息轉換器,以協助進行轉換。

這種型別推斷只有當 @KafkaListener 註解宣告在方法級別時才能實現。對於類級別的 @KafkaListener,負載型別用於選擇要呼叫的 @KafkaHandler 方法,因此在選擇方法之前它必須已經完成轉換。

在消費者端,您可以配置一個 JsonMessageConverter;它可以處理型別為 byte[]BytesStringConsumerRecord 值,因此應該與 ByteArrayDeserializerBytesDeserializerStringDeserializer 結合使用。(byte[]Bytes 更高效,因為它們避免了不必要的 byte[]String 轉換)。如果您願意,您還可以配置與反序列化器對應的特定 JsonMessageConverter 子類。

在生產者端,當您使用 Spring Integration 或 KafkaTemplate.send(Message<?> message) 方法時(參閱 使用 KafkaTemplate),您必須配置一個與已配置的 Kafka Serializer 相容的訊息轉換器。

  • StringJsonMessageConverterStringSerializer

  • BytesJsonMessageConverterBytesSerializer

  • ByteArrayJsonMessageConverterByteArraySerializer

同樣,使用 byte[]Bytes 更高效,因為它們避免了 Stringbyte[] 的轉換。

為方便起見,從版本 2.3 開始,該框架還提供了 StringOrBytesSerializer,它可以序列化所有這三種值型別,因此可以與任何訊息轉換器一起使用。

從版本 2.7.1 開始,訊息負載轉換可以委託給 spring-messagingSmartMessageConverter;例如,這使得轉換可以基於 MessageHeaders.CONTENT_TYPE 頭部進行。

KafkaMessageConverter.fromMessage() 方法在進行出站轉換到 ProducerRecord 時被呼叫,訊息負載位於 ProducerRecord.value() 屬性中。KafkaMessageConverter.toMessage() 方法在進行入站轉換從 ConsumerRecord 時被呼叫,負載即為 ConsumerRecord.value() 屬性。SmartMessageConverter.toMessage() 方法用於從傳遞給 fromMessage()Message(通常透過 KafkaTemplate.send(Message<?> msg))建立新的出站 Message<?>。類似地,在 KafkaMessageConverter.toMessage() 方法中,在轉換器從 ConsumerRecord 建立新的 Message<?> 之後,會呼叫 SmartMessageConverter.fromMessage() 方法,然後使用新轉換的負載建立最終的入站訊息。在任何一種情況下,如果 SmartMessageConverter 返回 null,則使用原始訊息。

當在 KafkaTemplate 和監聽器容器工廠中使用預設轉換器時,您可以透過在模板上呼叫 setMessagingConverter() 方法以及透過 @KafkaListener 方法上的 contentTypeConverter 屬性來配置 SmartMessageConverter

示例

template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}

使用 Spring Data Projection 介面

從版本 2.1.1 開始,您可以將 JSON 轉換為 Spring Data Projection 介面,而不是具體的型別。這允許非常選擇性且低耦合地繫結到資料,包括從 JSON 文件內多個位置查詢值。例如,以下介面可以定義為訊息負載型別

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

預設情況下,訪問器方法將用於在接收到的 JSON 文件中將屬性名作為欄位查詢。@JsonPath 表示式允許自定義值查詢,甚至可以定義多個 JSON Path 表示式,從多個位置查詢值,直到某個表示式返回實際值。

要啟用此功能,請使用配置了適當委託轉換器(用於出站轉換和轉換非 Projection 介面)的 ProjectingMessageConverter。您還必須將 spring-data:spring-data-commonscom.jayway.jsonpath:json-path 新增到 classpath 中。

當用作 @KafkaListener 方法的引數時,介面型別會正常自動傳遞給轉換器。

使用 ErrorHandlingDeserializer

當反序列化器未能反序列化訊息時,Spring 無法處理此問題,因為它發生在 poll() 返回之前。為了解決這個問題,引入了 ErrorHandlingDeserializer。此反序列化器委託給一個實際的反序列化器(用於鍵或值)。如果委託方未能反序列化記錄內容,ErrorHandlingDeserializer 將返回一個 null 值,並在頭部中包含一個 DeserializationException,該頭部包含原因和原始位元組。當您使用記錄級別的 MessageListener 時,如果 ConsumerRecord 的鍵或值包含 DeserializationException 頭部,容器的 ErrorHandler 將被呼叫,並傳入失敗的 ConsumerRecord。該記錄不會傳遞給監聽器。

或者,您可以透過提供 failedDeserializationFunction 來配置 ErrorHandlingDeserializer 以建立自定義值,該函式是一個 Function<FailedDeserializationInfo, T>。此函式被呼叫以建立 T 的例項,然後像往常一樣將該例項傳遞給監聽器。型別為 FailedDeserializationInfo 的物件(包含所有上下文資訊)會提供給該函式。您可以在頭部中找到 DeserializationException(作為序列化的 Java 物件)。更多資訊請參見 ErrorHandlingDeserializerJavadoc

您可以使用接受鍵和值 Deserializer 物件的 DefaultKafkaConsumerFactory 建構函式,並注入您配置了適當委託的 ErrorHandlingDeserializer 例項。或者,您可以使用消費者配置屬性(ErrorHandlingDeserializer 使用這些屬性)來例項化委託。屬性名稱是 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASSErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。屬性值可以是類或類名。以下示例顯示瞭如何設定這些屬性

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

以下示例使用 failedDeserializationFunction

public class BadThing extends Thing {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

  @Override
  public Thing apply(FailedDeserializationInfo info) {
    return new BadThing(info);
  }

}

前面的示例使用以下配置

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消費者配置了 ErrorHandlingDeserializer,那麼配置 KafkaTemplate 及其生產者使用一個既可以處理普通物件也可以處理因反序列化異常產生的原始 byte[] 值的序列化器就很重要。模板的泛型值型別應該是 Object。一種技術是使用 DelegatingByTypeSerializer;示例如下
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

當將 ErrorHandlingDeserializer 與批處理監聽器一起使用時,您必須檢查訊息頭部中的反序列化異常。與 DefaultBatchErrorHandler 一起使用時,您可以使用該頭部確定異常發生在哪個記錄上,並透過 BatchListenerFailedException 與錯誤處理器通訊。

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            try {
                DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                        headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
                if (deserEx != null) {
                    logger.error(deserEx, "Record at index " + i + " could not be deserialized");
                }
            }
            catch (Exception ex) {
                logger.error(ex, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

SerializationUtils.byteArrayToDeserializationException() 可用於將頭部轉換為 DeserializationException

當消費 List<ConsumerRecord<?, ?> 時,應改為使用 SerializationUtils.getExceptionFromHeader()

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}
如果您還使用了 DeadLetterPublishingRecoverer,為 DeserializationException 釋出的記錄將具有 record.value() 型別為 byte[];這不應被序列化。考慮使用配置為對 byte[] 使用 ByteArraySerializer,而對所有其他型別使用正常序列化器(Json、Avro 等)的 DelegatingByTypeSerializer

從版本 3.1 開始,您可以向 ErrorHandlingDeserializer 新增一個 Validator。如果委託 Deserializer 成功反序列化了物件,但該物件驗證失敗,則會丟擲類似反序列化異常的異常。這允許將原始原始資料傳遞給錯誤處理器。自行建立反序列化器時,只需呼叫 setValidator;如果您使用屬性配置序列化器,請將消費者配置屬性 ErrorHandlingDeserializer.VALIDATOR_CLASS 設定為您的 Validator 的類或完全限定類名。使用 Spring Boot 時,此屬性名稱是 spring.kafka.consumer.properties.spring.deserializer.validator.class

批處理監聽器的 Payload 轉換

當您使用批處理監聽器容器工廠時,您還可以在 BatchMessagingMessageConverter 中使用 JsonMessageConverter 來轉換批處理訊息。更多資訊請參見 序列化、反序列化和訊息轉換 以及 Spring Messaging 訊息轉換

預設情況下,轉換的型別從監聽器引數中推斷。如果您使用 DefaultJackson2TypeMapper 配置 JsonMessageConverter,並將其 TypePrecedence 設定為 TYPE_ID(而不是預設的 INFERRED),轉換器將使用頭部中的型別資訊(如果存在)。這允許,例如,監聽器方法宣告使用介面而不是具體類。此外,型別轉換器支援對映,因此反序列化可以轉換為與源不同的型別(只要資料相容)。當您使用 類級別的 @KafkaListener 例項 時,其中 payload 必須已經轉換才能確定呼叫哪個方法,這也很有用。以下示例建立使用此方法的 bean

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}

請注意,要使其工作,轉換目標的簽名必須是一個容器物件,只有一個泛型引數型別,如下所示

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

請注意,您仍然可以訪問批處理頭部。

如果批處理轉換器具有支援此功能的記錄轉換器,您還可以接收訊息列表,其中 payload 會根據泛型型別進行轉換。以下示例顯示瞭如何實現

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
    ...
}

如果批處理中的記錄無法轉換,其 payload 將被設定為 null 到目標 payloads 列表中。該記錄的轉換異常將作為警告記錄,並存儲到 KafkaHeaders.CONVERSION_FAILURES 頭部中,作為 List<ConversionException> 的一個專案。目標 @KafkaListener 方法可以使用 Java Stream API 從 payload 列表中過濾掉這些 null 值,或者處理轉換異常頭部

@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
         @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {

    for (int i = 0; i < list.size(); i++) {
        if (conversionFailures.get(i) != null) {
            throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
        }
    }
}

ConversionService 自定義

從版本 2.1.1 開始,預設的 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory 用於解析監聽器方法呼叫的引數時使用的 org.springframework.core.convert.ConversionService 會提供給所有實現以下任何介面的 bean

  • org.springframework.core.convert.converter.Converter

  • org.springframework.core.convert.converter.GenericConverter

  • org.springframework.format.Formatter

這允許您進一步自定義監聽器反序列化,而無需更改 ConsumerFactoryKafkaListenerContainerFactory 的預設配置。

透過 KafkaListenerConfigurer bean 在 KafkaListenerEndpointRegistrar 上設定自定義的 MessageHandlerMethodFactory 將停用此功能。

@KafkaListener 新增自定義的 HandlerMethodArgumentResolver

從版本 2.4.2 開始,您可以新增自己的 HandlerMethodArgumentResolver 並解析自定義方法引數。您只需要實現 KafkaListenerConfigurer 並使用 KafkaListenerEndpointRegistrar 類中的 setCustomMethodArgumentResolvers() 方法。

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}

您也可以透過向 KafkaListenerEndpointRegistrar bean 新增自定義的 MessageHandlerMethodFactory 來完全替換框架的引數解析。如果您這樣做,並且您的應用程式需要處理帶有 null value() 的墓碑記錄(例如來自壓縮主題),您應該向工廠新增一個 KafkaNullAwarePayloadArgumentResolver;它必須是最後一個解析器,因為它支援所有型別,並且可以匹配沒有 @Payload 註解的引數。如果您使用的是 DefaultMessageHandlerMethodFactory,請將此解析器設定為最後一個自定義解析器;工廠將確保在標準的 PayloadMethodArgumentResolver 之前使用此解析器,而標準的解析器對 KafkaNull payload 不瞭解。