消費批次

從 3.0 版本開始,當 spring.cloud.stream.bindings.<name>.consumer.batch-mode 設定為 true 時,透過輪詢 Kafka Consumer 接收到的所有記錄都將作為 List<?> 呈現給監聽器方法。否則,該方法將一次接收一條記錄。批次的大小由 Kafka 消費者屬性 max.poll.recordsfetch.min.bytesfetch.max.wait.ms 控制;有關更多資訊,請參閱 Kafka 文件。

接收批次時,允許使用以下型別簽名:

List<Person>
Message<List<Person>>

List<Person> 的第一個選項中,監聽器將不會獲取任何訊息頭。如果使用第二個型別簽名 (Message<List<Person>>),則可以訪問訊息頭;但是,所有訊息頭仍將以 Collection 的形式存在。讓我們看以下示例。

假設 Message 包含一個包含十個 Person 物件的列表。MessageMessageHeaders 包含一個訊息頭對映,其中鍵是訊息頭名稱,值是一個列表。此列表包含該訊息頭的值,順序與有效載荷列表相同。因此,應用程式需要根據有效載荷列表的迭代,從 MessageHeaders 對映中正確訪問訊息頭。

請注意,在批處理模式下消費時,不允許使用 List<Message<Person>> 形式的型別簽名。

4.0.2 版本開始,該繫結器在批次消費模式下支援 DLQ 功能。請記住,當在批次模式下對消費者繫結使用 DLQ 時,從上一次輪詢接收到的所有記錄都將傳送到 DLQ 主題。

在批次模式下使用時,繫結器內的重試不受支援,因此 maxAttempts 將被覆蓋為 1。您可以配置 DefaultErrorHandler(使用 ListenerContainerCustomizer)以實現與繫結器中重試類似的功能。您還可以使用手動 AckMode 並呼叫 Ackowledgment.nack(index, sleep) 來提交部分批次的偏移量,並讓剩餘記錄重新傳遞。有關這些技術的更多資訊,請參閱 Spring for Apache Kafka 文件
在批次模式下接收 KafkaNull 物件時,接收到的列表將包含對應 KafkaNull 物件的空元素。這對於 List<Person>Message<List<Person>> 樣式型別簽名都適用。

批次消費模式下的可觀測性

在批次消費記錄時,不支援直接的觀察跟蹤傳播功能。這是因為 Kafka 繫結器使用的 Spring for Apache Kafka 庫不支援批處理監聽器的跟蹤;它只支援記錄監聽器。在批處理監聽器中,接收到的記錄可能來自多個主題/分割槽和多個生產者,其中新增跟蹤資訊是可選的。由於批處理中的記錄之間可能沒有任何關聯,框架無法對它們進行任何跟蹤假設,例如將它們作為單個跟蹤 ID 提供等。如果您使用 Message<List<String>> 的型別簽名,則可以獲得一個名為 kafka_batchConvertedHeaders 的訊息頭,其中包含一個與有效負載具有相同數量條目的列表。此列表包含一個包含跟蹤訊息頭的 Map。但是,應用程式需要正確迭代此列表並開始觀察。

© . This site is unofficial and not affiliated with VMware.