訊息頭

0.11.0.0 客戶端引入了訊息頭支援。從 2.0 版本開始,Spring for Apache Kafka 現在支援將這些頭對映到 spring-messagingMessageHeaders,反之亦然。

之前的版本將 ConsumerRecordProducerRecord 對映到 spring-messaging 的 Message<?>,其中 value 屬性對映到 payload,而其他屬性(如 topicpartition 等)則對映到頭。這種情況仍然存在,但現在可以對映額外的(任意)頭。

Apache Kafka 的頭具有簡單的 API,如以下介面定義所示

public interface Header {

    String key();

    byte[] value();

}

提供了 KafkaHeaderMapper 策略,用於在 Kafka 的 HeadersMessageHeaders 之間對映頭條目。其介面定義如下

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

SimpleKafkaHeaderMapper 將原始頭對映為 byte[],並提供配置選項以轉換為 String 值。

DefaultKafkaHeaderMapper 將鍵對映到 MessageHeaders 的頭名稱,並且為了支援出站訊息的豐富頭型別,會執行 JSON 轉換。一個“特殊”的頭(鍵為 spring_json_header_types)包含一個 <key>:<type> 的 JSON 對映。此頭在入站側用於將每個頭值適當轉換回原始型別。

在入站側,所有 Kafka 的 Header 例項都對映到 MessageHeaders。在出站側,預設情況下,除了 idtimestamp 以及對映到 ConsumerRecord 屬性的頭之外,所有 MessageHeaders 都被對映。

你可以透過向對映器提供模式來指定要對映哪些出站訊息的頭。以下列表顯示了一些示例對映

public DefaultKafkaHeaderMapper() { (1)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { (3)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
    ...
}
1 使用預設的 Jackson ObjectMapper,並對映大多數頭,如示例之前所述。
2 使用提供的 Jackson ObjectMapper,並對映大多數頭,如示例之前所述。
3 使用預設的 Jackson ObjectMapper,並根據提供的模式對映頭。
4 使用提供的 Jackson ObjectMapper,並根據提供的模式對映頭。

模式相當簡單,可以包含前導萬用字元 (*)、尾部萬用字元或兩者都有(例如 *.cat.*)。你可以使用前導 ! 來否定模式。第一個匹配到頭名稱的模式(無論是正向還是負向)將生效。

當你提供自己的模式時,我們建議包含 !id!timestamp,因為這些頭在入站側是隻讀的。

預設情況下,對映器只反序列化 java.langjava.util 中的類。你可以透過 addTrustedPackages 方法新增受信任的包來信任其他(或所有)包。如果你從不受信任的源接收訊息,你可能只想新增你信任的那些包。要信任所有包,可以使用 mapper.addTrustedPackages("*")
當與不瞭解對映器 JSON 格式的系統通訊時,以原始形式對映 String 頭值非常有用。

從 2.2.5 版本開始,你可以指定某些字串值的頭不應使用 JSON 進行對映,而是使用原始的 byte[] 進行對映。AbstractKafkaHeaderMapper 有新的屬性;當 mapAllStringsOut 設定為 true 時,所有字串值的頭將使用 charset 屬性(預設為 UTF-8)轉換為 byte[]。此外,還有一個屬性 rawMappedHeaders,它是一個 header name : boolean 的對映;如果對映中包含一個頭名稱,並且該頭包含一個 String 值,它將使用字元集被對映為原始的 byte[]。此對映也用於將原始的入站 byte[] 頭使用字元集對映到 String,當且僅當對映中的布林值為 true 時。如果布林值為 false,或者頭名稱不在對映中且值為 true,則入站頭將簡單地被對映為原始的未對映頭。

以下測試用例說明了此機制。

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

預設情況下,兩個頭對映器都對映所有入站頭。從 2.8.8 版本開始,模式也可以應用於入站對映。要建立用於入站對映的對映器,請使用相應對映器上的靜態方法之一

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

例如

DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

這將排除所有以 abc 開頭的頭,幷包含所有其他頭。

預設情況下,只要 Jackson 在類路徑上,DefaultKafkaHeaderMapper 就會在 MessagingMessageConverterBatchMessagingMessageConverter 中使用。

對於批處理轉換器,轉換後的頭在 KafkaHeaders.BATCH_CONVERTED_HEADERS 中以 List<Map<String, Object>> 的形式提供,其中列表中某個位置的對映對應於有效載荷中的資料位置。

如果沒有轉換器(可能是因為沒有 Jackson,或者它被明確設定為 null),則消費者記錄中的頭會在 KafkaHeaders.NATIVE_HEADERS 頭中以未轉換的形式提供。此頭是一個 Headers 物件(對於批處理轉換器來說是 List<Headers>),其中列表中某個位置對應於有效載荷中的資料位置。

某些型別不適合進行 JSON 序列化,對於這些型別,可能更傾向於簡單的 toString() 序列化。DefaultKafkaHeaderMapper 有一個名為 addToStringClasses() 的方法,允許你提供在出站對映時應以這種方式處理的類名。在入站對映期間,它們被對映為 String。預設情況下,只有 org.springframework.util.MimeTypeorg.springframework.http.MediaType 是以這種方式對映的。
從 2.3 版本開始,對字串值頭的處理得到簡化。預設情況下,此類頭不再進行 JSON 編碼(即不再新增封閉的 "...")。型別仍然新增到 JSON_TYPES 頭中,以便接收系統可以轉換回 String(從 byte[])。對映器可以處理(解碼)由舊版本產生的頭(它檢查是否有前導的 ");這樣,使用 2.3 的應用程式可以消費舊版本產生的記錄。
為了與早期版本相容,如果使用 2.3 版本生成的記錄可能被使用早期版本的應用程式消費,請將 encodeStrings 設定為 true。當所有應用程式都使用 2.3 或更高版本時,可以將此屬性保持其預設值 false
@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

如果使用 Spring Boot,它將自動將此轉換器 bean 配置到自動配置的 KafkaTemplate 中;否則,你應該手動將此轉換器新增到模板中。