消費批次
從版本 3.0 開始,當 spring.cloud.stream.bindings.<name>.consumer.batch-mode
設定為 true
時,透過輪詢 Kafka Consumer
接收到的所有記錄將以 List<?>
的形式呈現給監聽器方法。否則,該方法將一次處理一條記錄。批次的大小由 Kafka 消費者屬性 max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
控制;有關更多資訊,請參閱 Kafka 文件。
接收批次時,允許使用以下型別簽名
List<Person>
Message<List<Person>>
在 List<Person>
的第一個選項中,監聽器將不會獲取任何訊息頭部。如果使用第二個型別簽名(Message<List<Person>>
),則可以訪問頭部;但是,所有頭部仍然是 Collection
的形式。讓我們看下面的例子。
假設 Message
包含一個包含十個 Person
物件的列表。Message
的 MessageHeaders
包含一個頭部對映,其中鍵是頭部名稱,值是列表。此列表包含該頭部的值,順序與載荷列表相同。因此,應用程式需要根據載荷列表的迭代,從 MessageHeaders
對映中正確訪問頭部。
請注意,在批處理模式下消費時,不允許使用 List<Message<Person>>
形式的型別簽名。
從版本 4.0.2
開始,Binder 在批處理模式下消費時支援 DLQ 功能。請記住,當在批處理模式下消費繫結上使用 DLQ 時,從前一次輪詢接收到的所有記錄都將傳送到 DLQ 主題。
在使用批處理模式時,Binder 內部不支援重試,因此 maxAttempts 將被覆蓋為 1。您可以透過配置 DefaultErrorHandler (使用 ListenerContainerCustomizer )來實現類似於 Binder 中重試的功能。您還可以使用手動 AckMode 並呼叫 Ackowledgment.nack(index, sleep) 來提交部分批次的 offset 並重新投遞剩餘的記錄。有關這些技術的更多資訊,請參閱 Spring for Apache Kafka 文件。 |
在批處理模式下接收 KafkaNull 物件時,接收到的列表中將包含對應於 KafkaNull 物件的 null 元素。這對於 List<Person> 和 Message<List<Person>> 風格的型別簽名都適用。 |
在批處理模式下消費時的可觀測性
在批次消費記錄時,不直接支援可觀測性追蹤傳播功能。這是因為 Kafka Binder 使用的 Spring for Apache Kafka 庫不支援對批次監聽器進行追蹤;它僅支援記錄監聽器。在批次監聽器中,接收到的記錄可能來自多個主題/分割槽,並且來自多個生產者,新增追蹤資訊是可選的。由於批次中的記錄之間可能沒有任何關聯,框架無法對它們進行追蹤做出任何假設,例如為它們提供一個單一的追蹤 ID 等。如果您使用 Message<List<String>>
的型別簽名,則可以獲取一個名為 kafka_batchConvertedHeaders
的頭部,它包含一個與您的載荷條目數量相同的列表。此列表包含一個包含追蹤頭部的 Map
。但是,應用程式需要正確地迭代此列表並啟動一個觀測。