變更歷史

3.2 版本相對於 3.1 版本的新特性

本節介紹從 3.1 版本到 3.2 版本的變化。有關更早版本中的變化,請參見變更歷史

Kafka 客戶端版本

此版本需要 3.7.0 kafka-clients。3.7.0 版本的 Kafka 客戶端引入了新的消費組協議。有關更多詳細資訊及其限制,請參見 KIP-848。新的消費組協議是早期訪問版本,不建議在生產環境中使用。此版本僅建議用於測試目的。因此,Spring for Apache Kafka 對此新的消費組協議的支援僅限於 kafka-client 本身提供的測試級別支援。預設情況下,Spring for Apache Kafka 使用經典消費組協議,在測試新的消費組協議時,需要透過 consumer 上的 group.protocol 屬性啟用。

測試支援的變化

EmbeddedKafka 中的 kraft 模式預設停用,需要使用 kraft 模式的使用者必須啟用它。這是由於在使用 EmbeddedKafkakraft 模式時觀察到某些不穩定,尤其是在測試新的消費組協議時。新的消費組協議僅在 kraft 模式下支援,因此在測試新協議時,需要在真實的 Kafka 叢集上進行,而不是基於 KafkaClusterTestKitEmbeddedKafka 基於此)的叢集。此外,在使用 EmbeddedKafkakraft 模式執行多個 KafkaListener 方法時,還觀察到其他一些競態條件。在這些問題解決之前,EmbeddedKafkakraft 的預設值將保持為 false

Kafka Streams 互動式查詢支援

一個用於訪問 Kafka Streams 互動式查詢中所用可查詢儲存的新 API KafkaStreamsInteractiveQuerySupport 已引入。更多詳細資訊請參見Kafka Streams 互動式支援

TransactionIdSuffixStrategy

引入了一個新的介面 TransactionIdSuffixStrategy,用於管理 transactional.id 字尾。當設定 maxCache 大於零時,預設實現 DefaultTransactionIdSuffixStrategy 可以在特定範圍內重用 transactional.id,否則將透過遞增計數器動態生成字尾。更多資訊請參見固定事務 ID 字尾

非同步 @KafkaListener 返回

@KafkaListener(和 @KafkaHandler)方法現在可以返回非同步返回型別,包括 CompletableFuture<?>Mono<?> 和 Kotlin 的 suspend 函式。更多資訊請參見非同步返回

基於丟擲異常將訊息路由到自定義 DLT

現在可以基於在訊息處理期間丟擲的異常型別,將訊息重定向到自定義 DLT。重定向規則可以透過 RetryableTopic.exceptionBasedDltRoutingRetryTopicConfigurationBuilder.dltRoutingRules 設定。自定義 DLT 以及其他重試和死信主題都會自動建立。更多資訊請參見基於丟擲異常將訊息路由到自定義 DLT

ContainerProperties 的 transactionManager 屬性已棄用

棄用 ContainerProperties 中的 transactionManager 屬性,轉而使用 KafkaAwareTransactionManager,相較於通用的 PlatformTransactionManager,這是一個更窄的型別。參見ContainerProperties事務同步

回滾後處理

提供了一個新的 AfterRollbackProcessor API processBatch。更多資訊請參見回滾後處理器

改變 @RetryableTopic SameIntervalTopicReuseStrategy 預設值

@RetryableTopicSameIntervalTopicReuseStrategy 屬性預設值改為 SINGLE_TOPIC。參見最大間隔指數延遲的單一主題

非阻塞重試支援類級別的 @KafkaListener

非阻塞重試支援類級別的 @KafkaListener。參見非阻塞重試

RetryTopicConfigurationProvider 中支援處理類級別的 @RetryableTopic。

提供了一個新的公共 API,用於查詢 RetryTopicConfiguration。參見查詢 RetryTopicConfiguration

RetryTopicConfigurer 支援處理 MultiMethodKafkaListenerEndpoint。

RetryTopicConfigurer 支援處理和註冊 MultiMethodKafkaListenerEndpointMultiMethodKafkaListenerEndpoint 為屬性 defaultMethodmethods 提供 getter/setter 方法。修改嚴格針對 MethodKafkaListenerEndpoint 型別的 EndpointCustomizerEndpointHandlerMethod 添加了新的建構函式,用於為提供的 bean 構建例項。提供新的類 EndpointHandlerMultiMethod,用於處理重試端點的多方法。

基於使用者提供的函式跳轉到偏移量的新 API 方法

ConsumerCallback 提供了一個新的 API,用於基於使用者定義的函式跳轉到偏移量,該函式以 consumer 中的當前偏移量作為引數。更多詳細資訊請參見Seek API 文件

@PartitionOffset 支援 SeekPosition

@PartitionOffset 新增 seekPosition 屬性以支援 TopicPartitionOffset.SeekPosition。更多詳細資訊請參見手動分配

TopicPartitionOffset 中接受計算跳轉偏移量函式的新的建構函式

TopicPartitionOffset 有一個新的建構函式,它接受一個使用者提供的函式,用於計算跳轉到哪個偏移量。使用此建構函式時,框架會呼叫該函式,並將當前 consumer 偏移量位置作為輸入引數。更多詳細資訊請參見Seek API 文件

Spring Boot 應用名稱作為預設客戶端 ID 字首

對於定義了應用名稱的 Spring Boot 應用,該名稱現在被用作某些客戶端型別的自動生成的客戶端 ID 的預設字首。更多詳細資訊請參見預設客戶端 ID 字首

增強的 MessageListenerContainers 獲取

ListenerContainerRegistry 提供了兩個新的 API,用於動態查詢和過濾 MessageListenerContainer 例項。getListenerContainersMatching(Predicate<String> idMatcher) 用於按 ID 過濾,另一個是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher) 用於按 ID 和容器屬性過濾。

透過提供更多跟蹤標籤增強可觀測性

KafkaTemplateObservation 提供更多跟蹤標籤(低基數)。KafkaListenerObservation 提供了一個新的 API,用於查詢高基數鍵名以及更多跟蹤標籤(高基數或低基數)。參見Micrometer 可觀測性

3.1 版本相對於 3.0 版本的新特性

本節介紹從 3.0 版本到 3.1 版本的變化。有關更早版本中的變化,請參見變更歷史

Kafka 客戶端版本

此版本需要 3.6.0 kafka-clients

EmbeddedKafkaBroker

現在提供了一個額外的實現,用於使用 Kraft 而不是 Zookeeper。更多資訊請參見嵌入式 Kafka Broker

JsonDeserializer

發生反序列化異常時,SerializationException 訊息不再包含形式為 Can’t deserialize data [[123, 34, 98, 97, 122, …​ 的資料;每資料位元組的數值陣列沒有用處,並且對於大量資料來說會非常冗長。與 ErrorHandlingDeserializer 一起使用時,傳送到錯誤處理器的 DeserializationException 包含 data 屬性,該屬性包含無法反序列化的原始資料。不與 ErrorHandlingDeserializer 一起使用時,KafkaConsumer 會不斷為同一條記錄發出異常,顯示主題/分割槽/偏移量以及 Jackson 丟擲的原因。

ContainerPostProcessor

可以透過在 @KafkaListener 註解上指定 ContainerPostProcessor 的 bean 名稱,對監聽器容器應用後處理。這發生在容器建立之後,以及在容器工廠上配置的任何 ContainerCustomizer 之後。更多資訊請參見容器工廠

ErrorHandlingDeserializer

現在可以將一個 Validator 新增到此反序列化器中;如果委託的 Deserializer 成功反序列化物件,但該物件驗證失敗,則會丟擲一個類似於發生反序列化異常的異常。這允許將原始原始資料傳遞給錯誤處理器。更多資訊請參見使用 ErrorHandlingDeserializer

可重試主題

當使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) 時,將字尾 -retry-5000 改為 -retry。如果想保留後綴 -retry-5000,請使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")。更多資訊請參見主題命名

監聽器容器變化

手動分配分割槽時,如果 consumer 的 group.idnull,則 AckMode 現在會自動強制設定為 MANUAL。更多資訊請參見手動分配所有分割槽

3.0 版本相對於 2.9 版本的新特性

Kafka 客戶端版本

此版本需要 3.3.1 kafka-clients

精確一次語義

EOSMode.V1 (即 ALPHA) 不再支援。

使用事務時,最低 broker 版本為 2.5。

更多資訊請參見精確一次語義KIP-447

可觀測性

現在支援使用 Micrometer 啟用計時器和跟蹤的可觀測性。更多資訊請參見可觀測性

原生映象

提供了建立原生映象的支援。更多資訊請參見原生映象

全域性單一嵌入式 Kafka

嵌入式 Kafka(EmbeddedKafkaBroker)現在可以作為整個測試計劃的單一全域性例項啟動。更多資訊請參見在多個測試類中使用相同的 Broker(s)

可重試主題變化

此功能不再被視為實驗性功能(就其 API 而言),此功能本身自 2.7 版本以來就已支援,但存在高於正常水平的破壞性 API 變更的可能性。

此版本更改了非阻塞重試基礎設施 bean 的引導方式,以避免某些應用中與應用初始化有關的時序問題。

現在可以為重試容器設定不同的 concurrency;預設情況下,concurrency 與主容器相同。

@RetryableTopic 現在可以用作自定義註解上的元註解,包括支援 @AliasFor 屬性。

更多資訊請參見配置

重試主題的預設複製因子現在是 -1(使用 broker 預設值)。如果您的 broker 版本早於 2.4,您現在需要顯式設定此屬性。

現在可以在同一個應用上下文中為同一個主題配置多個 @RetryableTopic 監聽器。之前,這是不可能的。更多資訊請參見多個監聽器監聽同一主題

RetryTopicConfigurationSupport 中存在破壞性 API 變更;具體來說,如果您覆蓋了 destinationTopicResolverkafkaConsumerBackoffManager 和/或 retryTopicConfigurer 的 bean 定義方法,這些方法現在需要一個 ObjectProvider<RetryTopicComponentFactory> 引數。

監聽器容器變化

與 consumer 認證和授權失敗相關的事件現在由容器釋出。更多資訊請參見應用事件

現在可以自定義 consumer 執行緒使用的執行緒名稱。更多資訊請參見容器執行緒命名

容器屬性 restartAfterAuthException 已新增。更多資訊請參見監聽器容器屬性

KafkaTemplate 變化

此類返回的 future 現在是 CompletableFuture s 而不是 ListenableFuture s。關於使用此版本進行遷移的幫助,請參見使用 KafkaTemplate

ReplyingKafkaTemplate 變化

此類返回的 future 現在是 CompletableFuture s 而不是 ListenableFuture s。關於使用此版本進行遷移的幫助,請參見使用 ReplyingKafkaTemplate使用 Message<?> 進行請求/回覆

@KafkaListener 變化

現在可以使用自定義關聯頭,該關聯頭將在任何回覆訊息中回顯。更多資訊請參見使用 ReplyingKafkaTemplate 末尾的說明。

現在可以手動提交批處理的一部分,在整個批處理完成處理之前。更多資訊請參見提交偏移量

KafkaHeaders 變化

在 2.9.x 中已棄用的 KafkaHeaders 中的四個常量現已移除。

  • 取代 MESSAGE_KEY,請使用 KEY

  • 取代 PARTITION_ID,請使用 PARTITION

類似地,RECEIVED_MESSAGE_KEYRECEIVED_KEY 取代,RECEIVED_PARTITION_IDRECEIVED_PARTITION 取代。

測試變化

3.0.7 版本引入了 MockConsumerFactoryMockProducerFactory。更多資訊請參見Mock Consumer 和 Producer

從 3.0.10 版本開始,嵌入式 Kafka broker 預設將 Spring Boot 屬性 spring.kafka.bootstrap-servers 設定為嵌入式 broker 的地址。

2.9 版本相對於 2.8 版本的新特性

Kafka 客戶端版本

此版本需要 3.2.0 kafka-clients

錯誤處理器變化

現在可以將 DefaultErrorHandler 配置為暫停容器一個 poll 並使用前一個 poll 的剩餘結果,而不是跳轉到剩餘記錄的偏移量。更多資訊請參見DefaultErrorHandler

DefaultErrorHandler 現在有一個 BackOffHandler 屬性。更多資訊請參見Back Off Handler

監聽器容器變化

interceptBeforeTx 現在適用於所有事務管理器(之前僅在使用 KafkaAwareTransactionManager 時應用)。參見[interceptBeforeTx]

提供了一個新的容器屬性 pauseImmediate,這允許容器在當前記錄處理後暫停 consumer,而不是在前一個 poll 的所有記錄處理後。參見[pauseImmediate]

與 consumer 認證和授權相關的事件

Header Mapper 變化

現在可以配置應該對映哪些入站頭。在 2.8.8 或更高版本中也可用。更多資訊請參見訊息頭

KafkaTemplate 變化

在 3.0 版本中,此類返回的 future 將是 CompletableFuture s 而不是 ListenableFuture s。關於使用此版本進行遷移的幫助,請參見使用 KafkaTemplate

ReplyingKafkaTemplate 變化

該模板現在提供了一個方法,用於等待回覆容器上的分配,以避免在回覆容器初始化之前傳送請求時出現的競態條件。在 2.8.8 或更高版本中也可用。參見使用 ReplyingKafkaTemplate

在 3.0 版本中,此類返回的 future 將是 CompletableFuture s 而不是 ListenableFuture s。關於使用此版本進行遷移的幫助,請參見使用 ReplyingKafkaTemplate使用 Message<?> 進行請求/回覆

2.8 版本相對於 2.7 版本的新特性

本節介紹從 2.7 版本到 2.8 版本的變化。有關更早版本中的變化,請參見變更歷史

Kafka 客戶端版本

此版本需要 3.0.0 kafka-clients

包變化

與型別對映相關的類和介面已從 …​support.converter 移動到 …​support.mapping

  • AbstractJavaTypeMapper

  • ClassMapper

  • DefaultJackson2JavaTypeMapper

  • Jackson2JavaTypeMapper

無序手動提交

監聽器容器現在可以配置為接受無序的手動偏移量提交(通常是非同步的)。容器將延遲提交,直到缺失的偏移量被確認。更多資訊請參見手動提交偏移量

@KafkaListener 變化

現在可以在方法本身上指定監聽器方法是否為批處理監聽器。這使得同一個容器工廠可以用於記錄和批處理監聽器。

更多資訊請參見[batch-listeners]

批處理監聽器現在可以處理轉換異常。

RecordFilterStrategy 與批處理監聽器一起使用時,現在可以在一次呼叫中過濾整個批處理。更多資訊請參見[batch-listeners]末尾的說明。

@KafkaListener 註解現在具有 filter 屬性,用於覆蓋容器工廠在此監聽器上的 RecordFilterStrategy

@KafkaListener 註解現在具有 info 屬性;這用於填充新的監聽器容器屬性 listenerInfo。然後將其用於在每條記錄中填充一個 KafkaHeaders.LISTENER_INFO 頭,可以在 RecordInterceptorRecordFilterStrategy 或監聽器本身中使用。更多資訊請參見監聽器資訊頭AbstractMessageListenerContainer 屬性

KafkaTemplate 變化

現在可以接收單條記錄,給定主題、分割槽和偏移量。更多資訊請參見使用 KafkaTemplate 接收

CommonErrorHandler 新增

舊版的 GenericErrorHandler 及其用於記錄和批處理監聽器的子介面層次結構已被新的單一介面 CommonErrorHandler 取代,新介面的實現對應於大多數舊版 GenericErrorHandler 的實現。更多資訊請參見 容器錯誤處理器將自定義舊版錯誤處理器實現遷移到 CommonErrorHandler

監聽器容器變更

interceptBeforeTx 容器屬性現在預設為 true

authorizationExceptionRetryInterval 屬性已重新命名為 authExceptionRetryInterval,現在除了之前應用於 AuthorizationException 外,也適用於 AuthenticationException。預設情況下,這兩種異常都被視為致命異常,容器將停止,除非設定了此屬性。

序列化器/反序列化器變更

現在提供了 DelegatingByTopicSerializerDelegatingByTopicDeserializer。更多資訊請參見 委派序列化器和反序列化器

DeadLetterPublishingRecover 變更

屬性 stripPreviousExceptionHeaders 現在預設為 true

現在有幾種技術可以自定義哪些頭部會被新增到輸出記錄中。

更多資訊請參見 管理死信記錄頭部

可重試主題變更

現在,您可以對可重試主題和不可重試主題使用相同的工廠。更多資訊請參見 指定 ListenerContainerFactory

現在有一個可管理的全域性致命異常列表,這些異常將使失敗的記錄直接進入 DLT。請參考 異常分類器 檢視如何管理它。

您現在可以結合使用阻塞和非阻塞重試。更多資訊請參見 結合使用阻塞和非阻塞重試

使用可重試主題功能時丟擲的 KafkaBackOffException 現在記錄在 DEBUG 級別。如果您需要將日誌級別改回 WARN 或設定為任何其他級別,請參見 更改 KafkaBackOffException 日誌級別

2.6 與 2.7 之間的變更

Kafka 客戶端版本

此版本需要 2.7.0 版本的 kafka-clients。自版本 2.7.1 起,它也與 2.8.0 客戶端相容;請參見 覆蓋 Spring Boot 依賴

使用主題實現非阻塞延遲重試

此版本中增加了這一重要的新功能。當嚴格排序不重要時,失敗的投遞可以傳送到另一個主題以便稍後消費。可以配置一系列此類重試主題,並增加延遲時間。更多資訊請參見 非阻塞重試

監聽器容器變更

onlyLogRecordMetadata 容器屬性現在預設為 true

現在提供了一個新的容器屬性 stopImmediate

更多資訊請參見 監聽器容器屬性

在投遞嘗試之間使用 BackOff 的錯誤處理器(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessor)現在會在容器停止後立即退出退避間隔,而不是延遲停止。

繼承 FailedRecordProcessor 的錯誤處理器和回滾後處理器現在可以配置一個或多個 RetryListener,以接收有關重試和恢復進度的資訊。

RecordInterceptor 現在增加了在監聽器返回(正常或透過丟擲異常)後呼叫的方法。它還有一個子介面 ConsumerAwareRecordInterceptor。此外,現在還有一個用於批處理監聽器的 BatchInterceptor。更多資訊請參見 訊息監聽器容器

@KafkaListener 變更

您現在可以驗證 @KafkaHandler 方法(類級別監聽器)的載荷引數。更多資訊請參見 @KafkaListener @Payload 驗證

您現在可以在 MessagingMessageConverterBatchMessagingMessageConverter 上設定 rawRecordHeader 屬性,這使得原始 ConsumerRecord 會被新增到轉換後的 Message<?> 中。這對於例如您希望在監聽器錯誤處理器中使用 DeadLetterPublishingRecoverer 的場景非常有用。更多資訊請參見 監聽器錯誤處理器

您現在可以在應用程式初始化期間修改 @KafkaListener 註解。更多資訊請參見 @KafkaListener 屬性修改

DeadLetterPublishingRecover 變更

現在,如果鍵和值都反序列化失敗,原始值會發布到 DLT。以前,值會被填充,但鍵的 DeserializationException 會留在頭部中。如果您子類化了恢復器並重寫了 createProducerRecord 方法,這是一個破壞性的 API 變更。

此外,恢復器在釋出到目標分割槽之前,會驗證目標解析器選擇的分割槽確實存在。

更多資訊請參見 釋出死信記錄

ChainedKafkaTransactionManager 已棄用

更多資訊請參見 事務

ReplyingKafkaTemplate 變更

現在有一種機制可以在檢查回覆時,如果存在某些條件,則異常地終止 future。

已新增對傳送和接收 spring-messaging Message<?> 的支援。

更多資訊請參見 使用 ReplyingKafkaTemplate

Kafka Streams 變更

預設情況下,StreamsBuilderFactoryBean 現在配置為不清理本地狀態。更多資訊請參見 配置

KafkaAdmin 變更

已新增新的方法 createOrModifyTopicsdescribeTopics。已新增 KafkaAdmin.NewTopics 以方便在單個 bean 中配置多個主題。更多資訊請參見 [configuring-topics]

MessageConverter 變更

現在可以將 spring-messaging SmartMessageConverter 新增到 MessagingMessageConverter 中,從而允許根據 contentType 頭部進行內容協商。更多資訊請參見 Spring Messaging 訊息轉換

按順序啟動 @KafkaListener

更多資訊請參見 按順序啟動 @KafkaListener

ExponentialBackOffWithMaxRetries

提供了一種新的 BackOff 實現,使得配置最大重試次數更加方便。更多資訊請參見 ExponentialBackOffWithMaxRetries 實現

條件委派錯誤處理器

這些新的錯誤處理器可以配置為根據異常型別委派給不同的錯誤處理器。更多資訊請參見 委派錯誤處理器

2.5 與 2.6 之間的變更

Kafka 客戶端版本

此版本需要 2.6.0 版本的 kafka-clients

監聽器容器變更

預設的 EOSMode 現在是 BETA。更多資訊請參見 精確一次語義

各種錯誤處理器(繼承 FailedRecordProcessor)和 DefaultAfterRollbackProcessor 現在在恢復失敗時會重置 BackOff。此外,您現在可以根據失敗的記錄和/或異常來選擇使用的 BackOff

您現在可以在容器屬性中配置 adviceChain。更多資訊請參見 監聽器容器屬性

當容器配置為釋出 ListenerContainerIdleEvent 時,在釋出空閒事件後接收到記錄時,它現在會發布 ListenerContainerNoLongerIdleEvent。更多資訊請參見 應用事件檢測空閒和無響應的消費者

@KafkaListener 變更

使用手動分割槽分配時,您現在可以指定一個萬用字元來確定哪些分割槽應重置為初始偏移量。此外,如果監聽器實現了 ConsumerSeekAware,則在手動分配後會呼叫 onPartitionsAssigned() 方法。(也在版本 2.5.5 中新增)。更多資訊請參見 顯式分割槽分配

AbstractConsumerSeekAware 中添加了便捷方法,使得 seek 操作更容易。更多資訊請參見 [seek]

ErrorHandler 變更

FailedRecordProcessor 的子類(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessorRecoveringBatchErrorHandler)現在可以配置為,如果異常型別與此記錄之前發生的異常型別不同,則重置重試狀態。

生產者工廠變更

您現在可以為生產者設定最大存活時間,超過此時間後它們將被關閉並重新建立。更多資訊請參見 事務

您現在可以在建立 DefaultKafkaProducerFactory 後更新配置對映。這可能很有用,例如,如果您在憑證更改後必須更新 SSL key/trust store 位置。更多資訊請參見 使用 DefaultKafkaProducerFactory

2.4 與 2.5 之間的變更

本節介紹從 2.4 版到 2.5 版的變更。對於早期版本的變更,請參見 變更歷史

消費者/生產者工廠變更

預設的消費者和生產者工廠現在可以在建立或關閉消費者或生產者時呼叫回撥。提供了原生 Micrometer 指標的實現。更多資訊請參見 工廠監聽器

您現在可以在執行時更改 bootstrap server 屬性,從而實現故障轉移到另一個 Kafka 叢集。更多資訊請參見 連線到 Kafka

StreamsBuilderFactoryBean 變更

工廠 bean 現在可以在建立或銷燬 KafkaStreams 時呼叫回撥。提供了原生 Micrometer 指標的實現。更多資訊請參見 KafkaStreams Micrometer 支援

Kafka 客戶端版本

此版本需要 2.5.0 版本的 kafka-clients

類/包變更

SeekUtils 已從 o.s.k.support 包移動到 o.s.k.listener 包。

投遞嘗試頭部

現在有一個選項可以在使用某些錯誤處理器和回滾後處理器時新增一個用於跟蹤投遞嘗試次數的頭部。更多資訊請參見 投遞嘗試頭部

@KafkaListener 變更

@KafkaListener 返回型別為 Message<?> 時,如果需要,預設回覆頭部現在會自動填充。更多資訊請參見 回覆型別 Message<?>

當傳入記錄的鍵為 null 時,KafkaHeaders.RECEIVED_MESSAGE_KEY 不再填充 null 值;而是完全省略該頭部。

@KafkaListener 方法現在可以指定 ConsumerRecordMetadata 引數,而不是使用離散的頭部來獲取元資料,例如主題、分割槽等。更多資訊請參見 消費者記錄元資料

監聽器容器變更

assignmentCommitOption 容器屬性現在預設為 LATEST_ONLY_NO_TX。更多資訊請參見 監聽器容器屬性

使用事務時,subBatchPerPartition 容器屬性現在預設為 true。更多資訊請參見 事務

現在提供了一個新的 RecoveringBatchErrorHandler

現在支援靜態組成員資格。更多資訊請參見 訊息監聽器容器

配置了增量/協同再平衡時,如果偏移量提交失敗且異常為非致命的 RebalanceInProgressException,容器將嘗試在再平衡完成後重新提交仍分配給此例項的分割槽的偏移量。

現在,記錄監聽器的預設錯誤處理器是 SeekToCurrentErrorHandler,批處理監聽器是 RecoveringBatchErrorHandler。更多資訊請參見 容器錯誤處理器

您現在可以控制標準錯誤處理器有意丟擲的異常的日誌級別。更多資訊請參見 容器錯誤處理器

已新增 getAssignmentsByClientId() 方法,使得更容易確定併發容器中的哪些消費者被分配了哪些分割槽。更多資訊請參見 監聽器容器屬性

您現在可以抑制在錯誤、除錯日誌等中記錄完整的 ConsumerRecord。請參見 監聽器容器屬性 中的 onlyLogRecordMetadata

KafkaTemplate 變更

KafkaTemplate 現在可以維護 micrometer 計時器。更多資訊請參見 監控

KafkaTemplate 現在可以使用 ProducerConfig 屬性進行配置,以覆蓋生產者工廠中的配置。更多資訊請參見 使用 KafkaTemplate

現在提供了 RoutingKafkaTemplate。更多資訊請參見 使用 RoutingKafkaTemplate

您現在可以使用 KafkaSendCallback 代替 ListenerFutureCallback 來獲取更窄範圍的異常,從而更容易提取失敗的 ProducerRecord。更多資訊請參見 使用 KafkaTemplate

Kafka 字串序列化器/反序列化器

現在提供了新的 ToStringSerializer/StringDeserializer 以及關聯的 SerDe。更多資訊請參見 字串序列化

JsonDeserializer

JsonDeserializer 現在在確定反序列化型別方面更靈活。更多資訊請參見 使用方法確定型別

委派序列化器/反序列化器

當出站記錄沒有頭部時,DelegatingSerializer 現在可以處理“標準”型別。更多資訊請參見 委派序列化器和反序列化器

測試變更

KafkaTestUtils.consumerProps() 輔助記錄現在預設將 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 設定為 earliest。更多資訊請參見 JUnit

2.3 與 2.4 之間的變更

Kafka 客戶端版本

此版本需要 2.4.0 或更高版本的 kafka-clients,並支援新的增量再平衡功能。

ConsumerAwareRebalanceListener

ConsumerRebalanceListener 類似,此介面現在增加了一個方法 onPartitionsLost。更多資訊請參考 Apache Kafka 文件。

ConsumerRebalanceListener 不同,預設實現**不**呼叫 onPartitionsRevoked。相反,監聽器容器在呼叫 onPartitionsLost 之後會呼叫該方法;因此,在實現 ConsumerAwareRebalanceListener 時,您不應執行相同的操作。

更多資訊請參見 再平衡監聽器 末尾的 IMPORTANT 注意事項。

GenericErrorHandler

isAckAfterHandle() 的預設實現現在預設為 true。

KafkaTemplate

KafkaTemplate 現在支援非事務性發布與事務性發布並行。更多資訊請參見 KafkaTemplate 事務性與非事務性發布

AggregatingReplyingKafkaTemplate

releaseStrategy 現在是一個 BiConsumer。它現在在超時後(以及記錄到達時)被呼叫;在超時後呼叫時,第二個引數為 true

更多資訊請參見 聚合多個回覆

監聽器容器

ContainerProperties 提供了一個 authorizationExceptionRetryInterval 選項,允許監聽器容器在 KafkaConsumer 丟擲任何 AuthorizationException 後重試。請參見其 JavaDocs 和 使用 KafkaMessageListenerContainer 瞭解更多資訊。

@KafkaListener

@KafkaListener 註解增加了一個新屬性 splitIterables;預設為 true。當回覆監聽器返回 Iterable 時,此屬性控制返回結果是作為單個記錄傳送,還是為每個元素髮送一個記錄。更多資訊請參見 使用 @SendTo 轉發監聽器結果

批處理監聽器現在可以使用 BatchToRecordAdapter 進行配置;例如,這允許在事務中處理批處理,而監聽器每次獲取一條記錄。使用預設實現時,可以使用 ConsumerRecordRecoverer 來處理批處理中的錯誤,而不會停止整個批處理的程序——這在使用事務時可能很有用。更多資訊請參見 批處理監聽器事務

Kafka Streams

StreamsBuilderFactoryBean 接受一個新的屬性 KafkaStreamsInfrastructureCustomizer。這允許在流建立之前配置 builder 和/或 topology。更多資訊請參見 Spring 管理

2.2 與 2.3 之間的變更

本節介紹從 2.2 版到 2.3 版的變更。

技巧、竅門和示例

已新增新的章節 技巧、竅門和示例。請提交 GitHub issue 和/或 pull request,以便在該章節中新增更多條目。

Kafka 客戶端版本

此版本需要 2.3.0 或更高版本的 kafka-clients

類/包變更

TopicPartitionInitialOffset 已棄用,推薦使用 TopicPartitionOffset

配置變更

從 2.3.4 版本開始,missingTopicsFatal 容器屬性預設為 false。當此屬性為 true 時,如果 broker 關閉,應用程式將無法啟動;許多使用者受到了此變更的影響;考慮到 Kafka 是一個高可用平臺,我們並未預期在沒有活動 broker 的情況下啟動應用程式是一個常見的用例。

生產者和消費者工廠變更

DefaultKafkaProducerFactory 現在可以配置為每個執行緒建立一個生產者。您還可以在建構函式中提供 Supplier<Serializer> 例項,作為配置類(需要無引數建構函式)或使用 Serializer 例項(然後在所有生產者之間共享)的替代方案。更多資訊請參見 使用 DefaultKafkaProducerFactory

DefaultKafkaConsumerFactory 中也提供了使用 Supplier<Deserializer> 例項的相同選項。更多資訊請參見 使用 KafkaMessageListenerContainer

監聽器容器變更

以前,當使用監聽器介面卡(例如 @KafkaListener)呼叫監聽器時,錯誤處理器會接收到 ListenerExecutionFailedException(其中實際的監聽器異常作為 cause)。由原生 GenericMessageListener 丟擲的異常會原樣傳遞給錯誤處理器。現在,引數總是 ListenerExecutionFailedException(其中實際的監聽器異常作為 cause),它提供了訪問容器的 group.id 屬性的途徑。

由於監聽器容器有自己的提交偏移量的機制,它偏好將 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 設定為 false。現在,除非在消費者工廠或容器的消費者屬性覆蓋中明確設定,否則它會自動將其設定為 false。

ackOnError 屬性現在預設為 false

現在可以在監聽器方法中獲取消費者的 group.id 屬性。更多資訊請參見 獲取消費者 group.id

容器增加了一個新的屬性 recordInterceptor,允許在呼叫監聽器之前檢查或修改記錄。如果需要呼叫多個攔截器,還提供了 CompositeRecordInterceptor。更多資訊請參見 訊息監聽器容器

ConsumerSeekAware 增加了新方法,允許您相對於開頭、結尾或當前位置執行 seek 操作,以及 seek 到大於或等於時間戳的第一個偏移量。更多資訊請參見 [seek]

現在提供了一個便捷類 AbstractConsumerSeekAware,以簡化 seek 操作。更多資訊請參見 [seek]

ContainerProperties 提供了一個 idleBetweenPolls 選項,允許監聽器容器的主迴圈在 KafkaConsumer.poll() 呼叫之間休眠。請參見其 JavaDocs 和 使用 KafkaMessageListenerContainer 瞭解更多資訊。

使用 AckMode.MANUAL(或 MANUAL_IMMEDIATE)時,您現在可以透過在 Acknowledgment 上呼叫 nack 來觸發重新投遞。更多資訊請參見 提交偏移量

現在可以使用 Micrometer Timer 監控監聽器效能。更多資訊請參見 監控

容器現在釋出更多與啟動相關的消費者生命週期事件。更多資訊請參見 應用事件

事務性批處理監聽器現在支援殭屍防護(zombie fencing)。更多資訊請參見 事務

監聽器容器工廠現在可以使用 ContainerCustomizer 進行配置,以便在每個容器建立和配置後進一步進行配置。更多資訊請參見 容器工廠

ErrorHandler 變更

SeekToCurrentErrorHandler 現在將某些異常視為致命異常,並停用這些異常的重試,在首次失敗時即呼叫恢復器。

SeekToCurrentErrorHandlerSeekToCurrentBatchErrorHandler 現在可以配置為在投遞嘗試之間應用 BackOff(執行緒休眠)。

從 2.3.2 版本開始,在錯誤處理器恢復失敗記錄後返回時,恢復記錄的偏移量將被提交。

DeadLetterPublishingRecoverer 在與 ErrorHandlingDeserializer 結合使用時,現在將傳送到死信主題的訊息的載荷設定為無法反序列化的原始值。以前,它為 null,使用者程式碼需要從訊息頭部提取 DeserializationException。更多資訊請參見 釋出死信記錄

TopicBuilder

提供了一個新類 TopicBuilder,用於更方便地建立用於自動主題配置的 NewTopic @Bean。更多資訊請參見 [configuring-topics]

Kafka Streams 變更

您現在可以對由 @EnableKafkaStreams 建立的 StreamsBuilderFactoryBean 執行額外配置。更多資訊請參見 Streams 配置

現在提供了一個 RecoveringDeserializationExceptionHandler,它允許恢復帶有反序列化錯誤的記錄。它可以與 DeadLetterPublishingRecoverer 結合使用,將這些記錄傳送到死信主題。更多資訊請參見 從反序列化異常中恢復

提供了 HeaderEnricher 轉換器,它使用 SpEL 生成頭部值。更多資訊請參見 頭部豐富器

提供了 MessagingTransformer。這使得 Kafka Streams topology 可以與 spring-messaging 元件(例如 Spring Integration 流)進行互動。更多資訊請參見 MessagingProcessor[從 KStream 呼叫 Spring Integration 流]

JSON 元件變更

現在,所有支援 JSON 的元件預設都配置了由 JacksonUtils.enhancedObjectMapper() 產生的 Jackson ObjectMapperJsonDeserializer 現在提供了基於 TypeReference 的建構函式,以更好地處理目標泛型容器型別。此外,還引入了 JacksonMimeTypeModule,用於將 org.springframework.util.MimeType 序列化為純字串。更多資訊請參見其 JavaDocs 和 序列化、反序列化和訊息轉換

提供了 ByteArrayJsonMessageConverter 以及所有 JSON 轉換器的新超類 JsonMessageConverter。此外,現在還提供了 StringOrBytesSerializer;它可以序列化 ProducerRecord 中的 byte[]BytesString 值。更多資訊請參見 Spring Messaging 訊息轉換

JsonSerializerJsonDeserializerJsonSerde 現在具有流暢的 API,使得程式設計配置更加簡單。請參見 javadoc、序列化、反序列化和訊息轉換 以及 Streams JSON 序列化和反序列化 瞭解更多資訊。

ReplyingKafkaTemplate

當回覆超時時,future 將以 KafkaReplyTimeoutException 異常完成,而不是 KafkaException

此外,現在提供了一個過載的 sendAndReceive 方法,允許按訊息指定回覆超時時間。

AggregatingReplyingKafkaTemplate

透過聚合來自多個接收者的回覆來擴充套件 ReplyingKafkaTemplate。更多資訊請參見 聚合多個回覆

事務變更

您現在可以在 KafkaTemplateKafkaTransactionManager 上覆蓋生產者工廠的 transactionIdPrefix。更多資訊請參見 transactionIdPrefix

新的委派序列化器/反序列化器

框架現在提供了一個委派 SerializerDeserializer,它利用頭部來支援生成和消費具有多種鍵/值型別的記錄。更多資訊請參見 委派序列化器和反序列化器

新的重試反序列化器

框架現在提供了一個委派的 RetryingDeserializer,用於在發生瞬時錯誤(例如網路問題)時重試序列化。更多資訊請參見 重試反序列化器

2.1 與 2.2 之間的變更

Kafka 客戶端版本

此版本需要 2.0.0 或更高版本的 kafka-clients

類和包變更

ContainerProperties 類已從 org.springframework.kafka.listener.config 包移動到 org.springframework.kafka.listener 包。

AckMode 列舉已從 AbstractMessageListenerContainer 移動到 ContainerProperties

setBatchErrorHandler()setErrorHandler() 方法已從 ContainerProperties 移動到 AbstractMessageListenerContainerAbstractKafkaListenerContainerFactory

回滾後處理

提供了一個新的 AfterRollbackProcessor 策略。更多資訊請參見 回滾後處理器

ConcurrentKafkaListenerContainerFactory 變更

您現在可以使用 ConcurrentKafkaListenerContainerFactory 建立和配置任何 ConcurrentMessageListenerContainer,而不僅僅是用於 @KafkaListener 註解的容器。更多資訊請參見 容器工廠

監聽器容器變更

已新增新的容器屬性(missingTopicsFatal)。更多資訊請參見 使用 KafkaMessageListenerContainer

消費者停止時,現在會發出 ConsumerStoppedEvent。更多資訊請參見 執行緒安全

批處理監聽器可以選擇接收完整的 ConsumerRecords<?, ?> 物件,而不是 List<ConsumerRecord<?, ?>。更多資訊請參見 [batch-listeners]

DefaultAfterRollbackProcessorSeekToCurrentErrorHandler 現在可以恢復(跳過)持續失敗的記錄,並且預設在失敗 10 次後執行此操作。它們可以配置為將失敗的記錄釋出到死信主題。

從 2.2.4 版本開始,在選擇死信主題名稱時可以使用消費者的 group ID。

已新增 ConsumerStoppingEvent。更多資訊請參見 應用事件

當容器配置為 AckMode.MANUAL_IMMEDIATE 時(自 2.2.4 版起),SeekToCurrentErrorHandler 現在可以配置為提交已恢復記錄的偏移量。

@KafkaListener 變更

您現在可以透過在註解上設定屬性來覆蓋監聽器容器工廠的 concurrencyautoStartup 屬性。您現在可以新增配置來決定將哪些頭資訊(如果有)複製到回覆訊息中。更多資訊請參閱 @KafkaListener 註解

您現在可以將 @KafkaListener 作為元註解用於您自己的註解。更多資訊請參閱 @KafkaListener 作為元註解

現在更容易為 @Payload 驗證配置一個 Validator。更多資訊請參閱 @KafkaListener @Payload 驗證

您現在可以直接在註解上指定 Kafka 消費者屬性;這些屬性將覆蓋消費者工廠中定義的同名屬性(自版本 2.2.4 起)。更多資訊請參閱 註解屬性

頭資訊對映變更

型別為 MimeTypeMediaType 的頭資訊現在在 RecordHeader 值中對映為簡單的字串。以前,它們被對映為 JSON,並且只有 MimeType 被解碼。MediaType 無法解碼。現在為了互操作性,它們是簡單的字串。

此外,DefaultKafkaHeaderMapper 有一個新的 addToStringClasses 方法,允許指定應使用 toString() 而非 JSON 對映的型別。更多資訊請參閱 訊息頭資訊

嵌入式 Kafka 變更

KafkaEmbedded 類及其 KafkaRule 介面已被棄用,轉而使用 EmbeddedKafkaBroker 及其 JUnit 4 包裝器 EmbeddedKafkaRule@EmbeddedKafka 註解現在填充一個 EmbeddedKafkaBroker bean,而不是已棄用的 KafkaEmbedded。此變更允許在 JUnit 5 測試中使用 @EmbeddedKafka@EmbeddedKafka 註解現在具有 ports 屬性,用於指定填充 EmbeddedKafkaBroker 的埠。更多資訊請參閱 測試應用程式

JsonSerializer/Deserializer 增強

您現在可以使用生產者和消費者屬性提供型別對映資訊。

反序列化器上提供了新的建構函式,允許使用提供的目標型別覆蓋型別頭資訊。

JsonDeserializer 現在預設刪除任何型別資訊頭資訊。

您現在可以使用 Kafka 屬性將 JsonDeserializer 配置為忽略型別資訊頭資訊(自版本 2.2.3 起)。

更多資訊請參閱 序列化、反序列化和訊息轉換

Kafka Streams 變更

streams 配置 bean 現在必須是一個 KafkaStreamsConfiguration 物件,而不是一個 StreamsConfig 物件。

StreamsBuilderFactoryBean 已從包 …​core 移至 …​config

引入了 KafkaStreamBrancher,以便在基於 KStream 例項構建條件分支時提供更好的使用者體驗。

更多資訊請參閱 Apache Kafka Streams 支援配置

事務 ID

當事務由監聽器容器啟動時,transactional.id 現在是 transactionIdPrefix 附加 <group.id>.<topic>.<partition>。此變更允許正確地隔離殭屍程序,如此處所述

2.0 和 2.1 之間的變更

Kafka 客戶端版本

此版本要求 1.0.0 或更高版本的 kafka-clients

版本 2.2 原生支援 1.1.x 客戶端。

JSON 改進

StringJsonMessageConverterJsonSerializer 現在在 Headers 中新增型別資訊,允許轉換器和 JsonDeserializer 在接收時基於訊息本身而不是固定的配置型別建立特定型別。更多資訊請參閱 序列化、反序列化和訊息轉換

停止容器的錯誤處理器

現在為記錄和批次監聽器提供了容器錯誤處理器,它們將監聽器丟擲的任何異常視為致命錯誤,並停止容器。更多資訊請參閱 處理異常

暫停和恢復容器

監聽器容器現在具有 pause()resume() 方法(自版本 2.1.3 起)。更多資訊請參閱 暫停和恢復監聽器容器

有狀態重試

自版本 2.1.3 起,您可以配置有狀態重試。更多資訊請參閱 有狀態重試

客戶端 ID

自版本 2.1.1 起,您現在可以在 @KafkaListener 上設定 client.id 字首。以前,要自定義客戶端 ID,您需要為每個監聽器配置單獨的消費者工廠(和容器工廠)。當您使用併發時,字首後會新增 -n 以提供唯一的客戶端 ID。

記錄偏移量提交日誌

預設情況下,使用 DEBUG 日誌級別記錄主題偏移量提交日誌。自版本 2.1.2 起,ContainerProperties 中的新屬性 commitLogLevel 允許您指定這些訊息的日誌級別。更多資訊請參閱 使用 KafkaMessageListenerContainer

預設 @KafkaHandler

自版本 2.1.3 起,您可以將類級別 @KafkaListener 上的一個 @KafkaHandler 註解指定為預設註解。更多資訊請參閱 類級別 @KafkaListener

ReplyingKafkaTemplate

自版本 2.1.3 起,提供了一個 KafkaTemplate 的子類來支援請求/回覆語義。更多資訊請參閱 使用 ReplyingKafkaTemplate

ChainedKafkaTransactionManager

版本 2.1.3 引入了 ChainedKafkaTransactionManager。(現已棄用)。

從 2.0 遷移指南

請參閱 2.0 到 2.1 遷移 指南。

1.3 和 2.0 之間的變更

Spring Framework 和 Java 版本

Spring for Apache Kafka 專案現在需要 Spring Framework 5.0 和 Java 8。

@KafkaListener 變更

您現在可以使用 @SendTo 註解 @KafkaListener 方法(以及類和 @KafkaHandler 方法)。如果方法返回結果,則會轉發到指定的主題。更多資訊請參閱 使用 @SendTo 轉發監聽器結果

訊息監聽器

訊息監聽器現在可以感知 Consumer 物件。更多資訊請參閱 [message-listeners]

使用 ConsumerAwareRebalanceListener

重平衡監聽器現在可以在重平衡通知期間訪問 Consumer 物件。更多資訊請參閱 重平衡監聽器

1.2 和 1.3 之間的變更

事務支援

0.11.0.0 客戶端庫增加了對事務的支援。已新增 KafkaTransactionManager 及其他事務支援。更多資訊請參閱 事務

頭資訊支援

0.11.0.0 客戶端庫增加了對訊息頭資訊的支援。現在可以將它們對映到 spring-messaging MessageHeaders 並從中對映。更多資訊請參閱 訊息頭資訊

建立主題

0.11.0.0 客戶端庫提供了一個 AdminClient,您可以使用它來建立主題。KafkaAdmin 使用此客戶端自動新增定義為 @Bean 例項的主題。

Kafka 時間戳支援

KafkaTemplate 現在支援新增帶有時間戳的記錄的 API。已引入新的 KafkaHeaders 以支援 timestamp。此外,還添加了新的 KafkaConditions.timestamp()KafkaMatchers.hasTimestamp() 測試工具。更多詳情請參閱 使用 KafkaTemplate@KafkaListener 註解測試應用程式

@KafkaListener 變更

您現在可以配置 KafkaListenerErrorHandler 來處理異常。更多資訊請參閱 處理異常

預設情況下,@KafkaListenerid 屬性現在用作 group.id 屬性,覆蓋消費者工廠中配置的屬性(如果存在)。此外,您可以在註解上顯式配置 groupId。以前,您需要為每個監聽器使用單獨的容器工廠(和消費者工廠)來使用不同的 group.id 值。要恢復使用工廠配置的 group.id 的先前行為,請將註解上的 idIsGroup 屬性設定為 false

@EmbeddedKafka 註解

為了方便起見,提供了類級別的測試註解 @EmbeddedKafka,用於將 KafkaEmbedded 註冊為 bean。更多資訊請參閱 測試應用程式

Kerberos 配置

現在提供了配置 Kerberos 的支援。更多資訊請參閱 JAAS 和 Kerberos

1.1 和 1.2 之間的變更

此版本使用 0.10.2.x 客戶端。

1.0 和 1.1 之間的變更

Kafka 客戶端

此版本使用 Apache Kafka 0.10.x.x 客戶端。

批次監聽器

可以將監聽器配置為接收 consumer.poll() 操作返回的整個訊息批次,而不是逐個接收。

空載荷

當使用日誌壓縮時,空載荷用於“刪除”鍵。

初始偏移量

當顯式分配分割槽時,您現在可以配置相對於消費者組當前位置的初始偏移量,而不是相對於絕對位置或當前結束位置。

Seek

您現在可以尋找每個主題或分割槽的偏移位置。當使用組管理且 Kafka 分配分割槽時,您可以使用此功能在初始化期間設定初始位置。當檢測到空閒容器或在應用程式執行的任何任意點時,您也可以進行尋求。更多資訊請參閱 [seek]