訊息頭

0.11.0.0 客戶端引入了對訊息頭部的支援。從 2.0 版本開始,Spring for Apache Kafka 現在支援將這些頭部對映到 spring-messaging MessageHeaders 以及從其對映。

以前的版本將 ConsumerRecordProducerRecord 對映到 spring-messaging Message<?>,其中值屬性被對映到 payload 以及從其對映,其他屬性(topicpartition 等)被對映到頭部。這種情況仍然存在,但現在可以對映額外的(任意)頭部。

Apache Kafka 頭部有一個簡單的 API,如下面的介面定義所示:

public interface Header {

    String key();

    byte[] value();

}

KafkaHeaderMapper 策略用於在 Kafka HeadersMessageHeaders 之間對映頭部條目。其介面定義如下:

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

SimpleKafkaHeaderMapper 將原始頭部對映為 byte[],並提供轉換為 String 值的配置選項。

JsonKafkaHeaderMapper 將鍵對映到 MessageHeaders 頭部名稱,為了支援出站訊息的富頭部型別,會執行 JSON 轉換。一個“特殊”頭部(鍵為 spring_json_header_types)包含一個 <key>:<type> 的 JSON 對映。此頭部在入站端用於提供每個頭部值到原始型別的適當轉換。

在入站端,所有 Kafka Header 例項都對映到 MessageHeaders。在出站端,預設情況下,除了 idtimestamp 以及對映到 ConsumerRecord 屬性的頭部之外,所有 MessageHeaders 都被對映。

您可以透過向對映器提供模式來指定哪些頭部要用於出站訊息的對映。以下列表顯示了一些示例對映:

public JsonKafkaHeaderMapper() { (1)
    ...
}

public JsonKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
    ...
}

public JsonKafkaHeaderMapper(String... patterns) { (3)
    ...
}

public JsonKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
    ...
}
1 使用預設的 Jackson ObjectMapper 並對映大多數頭部,如示例之前所討論。
2 使用提供的 Jackson ObjectMapper 並對映大多數頭部,如示例之前所討論。
3 使用預設的 Jackson ObjectMapper 並根據提供的模式對映頭部。
4 使用提供的 Jackson ObjectMapper 並根據提供的模式對映頭部。

模式相當簡單,可以包含前導萬用字元(*)、尾隨萬用字元或兩者(例如,*.cat.*)。您可以使用前導 ! 來否定模式。第一個匹配頭部名稱的模式(無論是正向還是負向)獲勝。

當您提供自己的模式時,我們建議包含 !id!timestamp,因為這些頭部在入站端是隻讀的。

預設情況下,對映器僅反序列化 java.langjava.util 中的類。您可以透過 addTrustedPackages 方法新增受信任的包來信任其他(或所有)包。如果您從不受信任的源接收訊息,您可能只希望新增您信任的那些包。要信任所有包,可以使用 mapper.addTrustedPackages("*")
以原始形式對映 String 頭部值在與不瞭解對映器 JSON 格式的系統通訊時非常有用。

從 2.2.5 版本開始,您可以指定某些字串值頭部不應使用 JSON 對映,而應對映為/從原始 byte[]AbstractKafkaHeaderMapper 具有新屬性;當 mapAllStringsOut 設定為 true 時,所有字串值頭部都將使用 charset 屬性(預設為 UTF-8)轉換為 byte[]。此外,還有一個屬性 rawMappedHeaders,它是一個 頭部名稱 : 布林值 的對映;如果對映包含一個頭部名稱,並且該頭部包含一個 String 值,它將使用字元集對映為原始 byte[]。此對映也用於將原始入站 byte[] 頭部對映為 String,僅當對映值中的布林值為 true 時才使用字元集。如果布林值為 false,或者頭部名稱不在具有 true 值的對映中,則入站頭部將簡單地對映為原始未對映頭部。

以下測試用例說明了此機制。

@Test
public void testSpecificStringConvert() {
    JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

預設情況下,兩個頭部對映器都對映所有入站頭部。從 2.8.8 版本開始,模式也可以應用於入站對映。要建立用於入站對映的對映器,請使用相應對映器上的靜態方法之一:

public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

例如:

JsonKafkaHeaderMapper inboundMapper = JsonKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

這將排除所有以 abc 開頭的頭部,幷包含所有其他頭部。

預設情況下,只要 Jackson 位於類路徑上,JsonKafkaHeaderMapper 就會在 MessagingMessageConverterBatchMessagingMessageConverter 中使用。

對於批處理轉換器,轉換後的頭部可在 KafkaHeaders.BATCH_CONVERTED_HEADERS 中作為 List<Map<String, Object>> 提供,其中列表中某個位置的對映對應於有效負載中的資料位置。

如果沒有轉換器(因為 Jackson 不存在或顯式設定為 null),則來自消費者記錄的頭部將以未轉換的形式在 KafkaHeaders.NATIVE_HEADERS 頭部中提供。此頭部是一個 Headers 物件(或者在批處理轉換器的情況下是 List<Headers>),其中列表中的位置對應於有效負載中的資料位置。

某些型別不適合 JSON 序列化,對於這些型別,可能更喜歡簡單的 toString() 序列化。JsonKafkaHeaderMapper 有一個名為 addToStringClasses() 的方法,允許您提供應該以這種方式處理出站對映的類名。在入站對映期間,它們被對映為 String。預設情況下,只有 org.springframework.util.MimeTypeorg.springframework.http.MediaType 以這種方式對映。
從 2.3 版本開始,字串值頭部的處理得到簡化。預設情況下,此類頭部不再進行 JSON 編碼(即,它們不再新增封閉的 "...")。型別仍然新增到 JSON_TYPES 頭部,以便接收系統可以轉換回字串(從 byte[])。對映器可以處理(解碼)由舊版本生成的頭部(它檢查前導 ");透過這種方式,使用 2.3 的應用程式可以消費來自舊版本的記錄。
為了與早期版本相容,如果使用 2.3 版本的應用程式生成的記錄可能被使用早期版本的應用程式消費,請將 encodeStrings 設定為 true。當所有應用程式都使用 2.3 或更高版本時,您可以將該屬性保留為預設值 false
@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

如果使用 Spring Boot,它會自動將此轉換器 bean 配置到自動配置的 KafkaTemplate 中;否則,您應該將此轉換器新增到模板中。

支援多值頭部對映

從 4.0 版本開始,支援多值頭部對映,其中同一邏輯頭部鍵在 Kafka 記錄中出現多次。

預設情況下,HeaderMapper 不會建立多個同名 Kafka 頭部。相反,當它遇到集合值(例如,List<byte[]>)時,它會將整個集合序列化為一個 Kafka 頭部,其值為 JSON 陣列。

  • 生產者端: JsonKafkaHeaderMapper 寫入 JSON 位元組,而 SimpleKafkaHeaderMapper 忽略它。

  • 消費者端: 對映器將頭部作為單個值暴露——最後一次出現獲勝;較早的重複項會被靜默丟棄。

保留每個單獨的頭部需要顯式註冊將頭部指定為多值模式。

JsonKafkaHeaderMapper#setMultiValueHeaderPatterns(String…​ patterns) 接受一個模式列表,可以是萬用字元表示式或精確的頭部名稱。

JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();

// Explicit header names
mapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2");

// Wildcard patterns for test-multi-value1, test-multi-value2
mapper.setMultiValueHeaderPatterns("test-multi-*");

任何名稱與所提供的模式之一匹配的頭部

  • 生產者端: 作為單獨的 Kafka 頭部寫入,每個元素一個。

  • 消費者端: 收集到包含單個頭部值的 List<?> 中;每個元素在經過配置的 HeaderMapper 執行的常規反序列化或型別轉換後返回給應用程式。

不支援正則表示式;簡單模式中只允許使用 * 萬用字元——支援直接相等和諸如:xxx*、*xxx、*xxx*、xxx*yyy 等形式。

生產者端,當 JsonKafkaHeaderMapper 序列化多值頭部時,該集合中的每個元素必須是單一 Java 型別——例如,在一個頭部鍵下混合 Stringbyte[] 值將導致轉換錯誤。

© . This site is unofficial and not affiliated with VMware.