訊息頭
0.11.0.0 客戶端引入了訊息頭支援。從 2.0 版本開始,Spring for Apache Kafka 現在支援將這些頭對映到 spring-messaging
的 MessageHeaders
,反之亦然。
之前的版本將 ConsumerRecord 和 ProducerRecord 對映到 spring-messaging 的 Message<?> ,其中 value 屬性對映到 payload ,而其他屬性(如 topic 、partition 等)則對映到頭。這種情況仍然存在,但現在可以對映額外的(任意)頭。 |
Apache Kafka 的頭具有簡單的 API,如以下介面定義所示
public interface Header {
String key();
byte[] value();
}
提供了 KafkaHeaderMapper
策略,用於在 Kafka 的 Headers
和 MessageHeaders
之間對映頭條目。其介面定義如下
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
SimpleKafkaHeaderMapper
將原始頭對映為 byte[]
,並提供配置選項以轉換為 String
值。
DefaultKafkaHeaderMapper
將鍵對映到 MessageHeaders
的頭名稱,並且為了支援出站訊息的豐富頭型別,會執行 JSON 轉換。一個“特殊”的頭(鍵為 spring_json_header_types
)包含一個 <key>:<type>
的 JSON 對映。此頭在入站側用於將每個頭值適當轉換回原始型別。
在入站側,所有 Kafka 的 Header
例項都對映到 MessageHeaders
。在出站側,預設情況下,除了 id
、timestamp
以及對映到 ConsumerRecord
屬性的頭之外,所有 MessageHeaders
都被對映。
你可以透過向對映器提供模式來指定要對映哪些出站訊息的頭。以下列表顯示了一些示例對映
public DefaultKafkaHeaderMapper() { (1)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public DefaultKafkaHeaderMapper(String... patterns) { (3)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
1 | 使用預設的 Jackson ObjectMapper ,並對映大多數頭,如示例之前所述。 |
2 | 使用提供的 Jackson ObjectMapper ,並對映大多數頭,如示例之前所述。 |
3 | 使用預設的 Jackson ObjectMapper ,並根據提供的模式對映頭。 |
4 | 使用提供的 Jackson ObjectMapper ,並根據提供的模式對映頭。 |
模式相當簡單,可以包含前導萬用字元 (*
)、尾部萬用字元或兩者都有(例如 *.cat.*
)。你可以使用前導 !
來否定模式。第一個匹配到頭名稱的模式(無論是正向還是負向)將生效。
當你提供自己的模式時,我們建議包含 !id
和 !timestamp
,因為這些頭在入站側是隻讀的。
預設情況下,對映器只反序列化 java.lang 和 java.util 中的類。你可以透過 addTrustedPackages 方法新增受信任的包來信任其他(或所有)包。如果你從不受信任的源接收訊息,你可能只想新增你信任的那些包。要信任所有包,可以使用 mapper.addTrustedPackages("*") 。 |
當與不瞭解對映器 JSON 格式的系統通訊時,以原始形式對映 String 頭值非常有用。 |
從 2.2.5 版本開始,你可以指定某些字串值的頭不應使用 JSON 進行對映,而是使用原始的 byte[]
進行對映。AbstractKafkaHeaderMapper
有新的屬性;當 mapAllStringsOut
設定為 true 時,所有字串值的頭將使用 charset
屬性(預設為 UTF-8
)轉換為 byte[]
。此外,還有一個屬性 rawMappedHeaders
,它是一個 header name : boolean
的對映;如果對映中包含一個頭名稱,並且該頭包含一個 String
值,它將使用字元集被對映為原始的 byte[]
。此對映也用於將原始的入站 byte[]
頭使用字元集對映到 String
,當且僅當對映中的布林值為 true
時。如果布林值為 false
,或者頭名稱不在對映中且值為 true
,則入站頭將簡單地被對映為原始的未對映頭。
以下測試用例說明了此機制。
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
預設情況下,兩個頭對映器都對映所有入站頭。從 2.8.8 版本開始,模式也可以應用於入站對映。要建立用於入站對映的對映器,請使用相應對映器上的靜態方法之一
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
這將排除所有以 abc
開頭的頭,幷包含所有其他頭。
預設情況下,只要 Jackson 在類路徑上,DefaultKafkaHeaderMapper
就會在 MessagingMessageConverter
和 BatchMessagingMessageConverter
中使用。
對於批處理轉換器,轉換後的頭在 KafkaHeaders.BATCH_CONVERTED_HEADERS
中以 List<Map<String, Object>>
的形式提供,其中列表中某個位置的對映對應於有效載荷中的資料位置。
如果沒有轉換器(可能是因為沒有 Jackson,或者它被明確設定為 null
),則消費者記錄中的頭會在 KafkaHeaders.NATIVE_HEADERS
頭中以未轉換的形式提供。此頭是一個 Headers
物件(對於批處理轉換器來說是 List<Headers>
),其中列表中某個位置對應於有效載荷中的資料位置。
某些型別不適合進行 JSON 序列化,對於這些型別,可能更傾向於簡單的 toString() 序列化。DefaultKafkaHeaderMapper 有一個名為 addToStringClasses() 的方法,允許你提供在出站對映時應以這種方式處理的類名。在入站對映期間,它們被對映為 String 。預設情況下,只有 org.springframework.util.MimeType 和 org.springframework.http.MediaType 是以這種方式對映的。 |
從 2.3 版本開始,對字串值頭的處理得到簡化。預設情況下,此類頭不再進行 JSON 編碼(即不再新增封閉的 "..." )。型別仍然新增到 JSON_TYPES 頭中,以便接收系統可以轉換回 String(從 byte[] )。對映器可以處理(解碼)由舊版本產生的頭(它檢查是否有前導的 " );這樣,使用 2.3 的應用程式可以消費舊版本產生的記錄。 |
為了與早期版本相容,如果使用 2.3 版本生成的記錄可能被使用早期版本的應用程式消費,請將 encodeStrings 設定為 true 。當所有應用程式都使用 2.3 或更高版本時,可以將此屬性保持其預設值 false 。 |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,它將自動將此轉換器 bean 配置到自動配置的 KafkaTemplate
中;否則,你應該手動將此轉換器新增到模板中。