消費批次
從 3.0 版本開始,當 spring.cloud.stream.bindings.<name>.consumer.batch-mode 設定為 true 時,透過輪詢 Kafka Consumer 接收到的所有記錄都將作為 List<?> 呈現給監聽器方法。否則,該方法將一次接收一條記錄。批次的大小由 Kafka 消費者屬性 max.poll.records、fetch.min.bytes、fetch.max.wait.ms 控制;有關更多資訊,請參閱 Kafka 文件。
接收批次時,允許使用以下型別簽名:
List<Person>
Message<List<Person>>
在 List<Person> 的第一個選項中,監聽器將不會獲取任何訊息頭。如果使用第二個型別簽名 (Message<List<Person>>),則可以訪問訊息頭;但是,所有訊息頭仍將以 Collection 的形式存在。讓我們看以下示例。
假設 Message 包含一個包含十個 Person 物件的列表。Message 的 MessageHeaders 包含一個訊息頭對映,其中鍵是訊息頭名稱,值是一個列表。此列表包含該訊息頭的值,順序與有效載荷列表相同。因此,應用程式需要根據有效載荷列表的迭代,從 MessageHeaders 對映中正確訪問訊息頭。
請注意,在批處理模式下消費時,不允許使用 List<Message<Person>> 形式的型別簽名。
從 4.0.2 版本開始,該繫結器在批次消費模式下支援 DLQ 功能。請記住,當在批次模式下對消費者繫結使用 DLQ 時,從上一次輪詢接收到的所有記錄都將傳送到 DLQ 主題。
在批次模式下使用時,繫結器內的重試不受支援,因此 maxAttempts 將被覆蓋為 1。您可以配置 DefaultErrorHandler(使用 ListenerContainerCustomizer)以實現與繫結器中重試類似的功能。您還可以使用手動 AckMode 並呼叫 Ackowledgment.nack(index, sleep) 來提交部分批次的偏移量,並讓剩餘記錄重新傳遞。有關這些技術的更多資訊,請參閱 Spring for Apache Kafka 文件。 |
在批次模式下接收 KafkaNull 物件時,接收到的列表將包含對應 KafkaNull 物件的空元素。這對於 List<Person> 和 Message<List<Person>> 樣式型別簽名都適用。 |
批次消費模式下的可觀測性
在批次消費記錄時,不支援直接的觀察跟蹤傳播功能。這是因為 Kafka 繫結器使用的 Spring for Apache Kafka 庫不支援批處理監聽器的跟蹤;它只支援記錄監聽器。在批處理監聽器中,接收到的記錄可能來自多個主題/分割槽和多個生產者,其中新增跟蹤資訊是可選的。由於批處理中的記錄之間可能沒有任何關聯,框架無法對它們進行任何跟蹤假設,例如將它們作為單個跟蹤 ID 提供等。如果您使用 Message<List<String>> 的型別簽名,則可以獲得一個名為 kafka_batchConvertedHeaders 的訊息頭,其中包含一個與有效負載具有相同數量條目的列表。此列表包含一個包含跟蹤訊息頭的 Map。但是,應用程式需要正確迭代此列表並開始觀察。