變更歷史
3.2 版本相對於 3.1 版本的新特性
本節介紹從 3.1 版本到 3.2 版本的變化。有關更早版本中的變化,請參見變更歷史。
Kafka 客戶端版本
此版本需要 3.7.0 kafka-clients
。3.7.0 版本的 Kafka 客戶端引入了新的消費組協議。有關更多詳細資訊及其限制,請參見 KIP-848。新的消費組協議是早期訪問版本,不建議在生產環境中使用。此版本僅建議用於測試目的。因此,Spring for Apache Kafka 對此新的消費組協議的支援僅限於 kafka-client
本身提供的測試級別支援。預設情況下,Spring for Apache Kafka 使用經典消費組協議,在測試新的消費組協議時,需要透過 consumer 上的 group.protocol
屬性啟用。
測試支援的變化
EmbeddedKafka
中的 kraft
模式預設停用,需要使用 kraft
模式的使用者必須啟用它。這是由於在使用 EmbeddedKafka
的 kraft
模式時觀察到某些不穩定,尤其是在測試新的消費組協議時。新的消費組協議僅在 kraft
模式下支援,因此在測試新協議時,需要在真實的 Kafka 叢集上進行,而不是基於 KafkaClusterTestKit
(EmbeddedKafka
基於此)的叢集。此外,在使用 EmbeddedKafka
的 kraft
模式執行多個 KafkaListener
方法時,還觀察到其他一些競態條件。在這些問題解決之前,EmbeddedKafka
中 kraft
的預設值將保持為 false
。
Kafka Streams 互動式查詢支援
一個用於訪問 Kafka Streams 互動式查詢中所用可查詢儲存的新 API KafkaStreamsInteractiveQuerySupport
已引入。更多詳細資訊請參見Kafka Streams 互動式支援。
TransactionIdSuffixStrategy
引入了一個新的介面 TransactionIdSuffixStrategy
,用於管理 transactional.id
字尾。當設定 maxCache
大於零時,預設實現 DefaultTransactionIdSuffixStrategy
可以在特定範圍內重用 transactional.id
,否則將透過遞增計數器動態生成字尾。更多資訊請參見固定事務 ID 字尾。
非同步 @KafkaListener 返回
@KafkaListener
(和 @KafkaHandler
)方法現在可以返回非同步返回型別,包括 CompletableFuture<?>
、Mono<?>
和 Kotlin 的 suspend
函式。更多資訊請參見非同步返回。
基於丟擲異常將訊息路由到自定義 DLT
現在可以基於在訊息處理期間丟擲的異常型別,將訊息重定向到自定義 DLT。重定向規則可以透過 RetryableTopic.exceptionBasedDltRouting
或 RetryTopicConfigurationBuilder.dltRoutingRules
設定。自定義 DLT 以及其他重試和死信主題都會自動建立。更多資訊請參見基於丟擲異常將訊息路由到自定義 DLT。
ContainerProperties 的 transactionManager 屬性已棄用
棄用 ContainerProperties
中的 transactionManager
屬性,轉而使用 KafkaAwareTransactionManager
,相較於通用的 PlatformTransactionManager
,這是一個更窄的型別。參見ContainerProperties 和 事務同步。
回滾後處理
提供了一個新的 AfterRollbackProcessor
API processBatch
。更多資訊請參見回滾後處理器。
改變 @RetryableTopic SameIntervalTopicReuseStrategy 預設值
將 @RetryableTopic
的 SameIntervalTopicReuseStrategy
屬性預設值改為 SINGLE_TOPIC
。參見最大間隔指數延遲的單一主題。
非阻塞重試支援類級別的 @KafkaListener
非阻塞重試支援類級別的 @KafkaListener。參見非阻塞重試。
RetryTopicConfigurationProvider 中支援處理類級別的 @RetryableTopic。
提供了一個新的公共 API,用於查詢 RetryTopicConfiguration
。參見查詢 RetryTopicConfiguration
RetryTopicConfigurer 支援處理 MultiMethodKafkaListenerEndpoint。
RetryTopicConfigurer
支援處理和註冊 MultiMethodKafkaListenerEndpoint
。MultiMethodKafkaListenerEndpoint
為屬性 defaultMethod
和 methods
提供 getter/setter
方法。修改嚴格針對 MethodKafkaListenerEndpoint
型別的 EndpointCustomizer
。EndpointHandlerMethod
添加了新的建構函式,用於為提供的 bean 構建例項。提供新的類 EndpointHandlerMultiMethod
,用於處理重試端點的多方法。
基於使用者提供的函式跳轉到偏移量的新 API 方法
ConsumerCallback
提供了一個新的 API,用於基於使用者定義的函式跳轉到偏移量,該函式以 consumer 中的當前偏移量作為引數。更多詳細資訊請參見Seek API 文件。
@PartitionOffset 支援 SeekPosition
向 @PartitionOffset
新增 seekPosition
屬性以支援 TopicPartitionOffset.SeekPosition
。更多詳細資訊請參見手動分配。
TopicPartitionOffset 中接受計算跳轉偏移量函式的新的建構函式
TopicPartitionOffset
有一個新的建構函式,它接受一個使用者提供的函式,用於計算跳轉到哪個偏移量。使用此建構函式時,框架會呼叫該函式,並將當前 consumer 偏移量位置作為輸入引數。更多詳細資訊請參見Seek API 文件。
Spring Boot 應用名稱作為預設客戶端 ID 字首
對於定義了應用名稱的 Spring Boot 應用,該名稱現在被用作某些客戶端型別的自動生成的客戶端 ID 的預設字首。更多詳細資訊請參見預設客戶端 ID 字首。
增強的 MessageListenerContainers 獲取
ListenerContainerRegistry
提供了兩個新的 API,用於動態查詢和過濾 MessageListenerContainer
例項。getListenerContainersMatching(Predicate<String> idMatcher)
用於按 ID 過濾,另一個是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)
用於按 ID 和容器屬性過濾。
更多資訊請參見@KafkaListener
生命週期管理 API 文件。
透過提供更多跟蹤標籤增強可觀測性
KafkaTemplateObservation
提供更多跟蹤標籤(低基數)。KafkaListenerObservation
提供了一個新的 API,用於查詢高基數鍵名以及更多跟蹤標籤(高基數或低基數)。參見Micrometer 可觀測性
3.1 版本相對於 3.0 版本的新特性
本節介紹從 3.0 版本到 3.1 版本的變化。有關更早版本中的變化,請參見變更歷史。
EmbeddedKafkaBroker
現在提供了一個額外的實現,用於使用 Kraft
而不是 Zookeeper。更多資訊請參見嵌入式 Kafka Broker。
JsonDeserializer
發生反序列化異常時,SerializationException
訊息不再包含形式為 Can’t deserialize data [[123, 34, 98, 97, 122, …
的資料;每資料位元組的數值陣列沒有用處,並且對於大量資料來說會非常冗長。與 ErrorHandlingDeserializer
一起使用時,傳送到錯誤處理器的 DeserializationException
包含 data
屬性,該屬性包含無法反序列化的原始資料。不與 ErrorHandlingDeserializer
一起使用時,KafkaConsumer
會不斷為同一條記錄發出異常,顯示主題/分割槽/偏移量以及 Jackson 丟擲的原因。
ContainerPostProcessor
可以透過在 @KafkaListener
註解上指定 ContainerPostProcessor
的 bean 名稱,對監聽器容器應用後處理。這發生在容器建立之後,以及在容器工廠上配置的任何 ContainerCustomizer
之後。更多資訊請參見容器工廠。
ErrorHandlingDeserializer
現在可以將一個 Validator
新增到此反序列化器中;如果委託的 Deserializer
成功反序列化物件,但該物件驗證失敗,則會丟擲一個類似於發生反序列化異常的異常。這允許將原始原始資料傳遞給錯誤處理器。更多資訊請參見使用 ErrorHandlingDeserializer
。
可重試主題
當使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
時,將字尾 -retry-5000
改為 -retry
。如果想保留後綴 -retry-5000
,請使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")
。更多資訊請參見主題命名。
監聽器容器變化
手動分配分割槽時,如果 consumer 的 group.id
為 null
,則 AckMode
現在會自動強制設定為 MANUAL
。更多資訊請參見手動分配所有分割槽。
3.0 版本相對於 2.9 版本的新特性
可觀測性
現在支援使用 Micrometer 啟用計時器和跟蹤的可觀測性。更多資訊請參見可觀測性。
原生映象
提供了建立原生映象的支援。更多資訊請參見原生映象。
全域性單一嵌入式 Kafka
嵌入式 Kafka(EmbeddedKafkaBroker
)現在可以作為整個測試計劃的單一全域性例項啟動。更多資訊請參見在多個測試類中使用相同的 Broker(s)。
可重試主題變化
此功能不再被視為實驗性功能(就其 API 而言),此功能本身自 2.7 版本以來就已支援,但存在高於正常水平的破壞性 API 變更的可能性。
此版本更改了非阻塞重試基礎設施 bean 的引導方式,以避免某些應用中與應用初始化有關的時序問題。
現在可以為重試容器設定不同的 concurrency
;預設情況下,concurrency
與主容器相同。
@RetryableTopic
現在可以用作自定義註解上的元註解,包括支援 @AliasFor
屬性。
更多資訊請參見配置。
重試主題的預設複製因子現在是 -1
(使用 broker 預設值)。如果您的 broker 版本早於 2.4,您現在需要顯式設定此屬性。
現在可以在同一個應用上下文中為同一個主題配置多個 @RetryableTopic
監聽器。之前,這是不可能的。更多資訊請參見多個監聽器監聽同一主題。
RetryTopicConfigurationSupport
中存在破壞性 API 變更;具體來說,如果您覆蓋了 destinationTopicResolver
、kafkaConsumerBackoffManager
和/或 retryTopicConfigurer
的 bean 定義方法,這些方法現在需要一個 ObjectProvider<RetryTopicComponentFactory>
引數。
監聽器容器變化
與 consumer 認證和授權失敗相關的事件現在由容器釋出。更多資訊請參見應用事件。
現在可以自定義 consumer 執行緒使用的執行緒名稱。更多資訊請參見容器執行緒命名。
容器屬性 restartAfterAuthException
已新增。更多資訊請參見監聽器容器屬性。
KafkaTemplate
變化
此類返回的 future 現在是 CompletableFuture
s 而不是 ListenableFuture
s。關於使用此版本進行遷移的幫助,請參見使用 KafkaTemplate
。
ReplyingKafkaTemplate
變化
此類返回的 future 現在是 CompletableFuture
s 而不是 ListenableFuture
s。關於使用此版本進行遷移的幫助,請參見使用 ReplyingKafkaTemplate
和 使用 Message<?>
進行請求/回覆。
@KafkaListener
變化
現在可以使用自定義關聯頭,該關聯頭將在任何回覆訊息中回顯。更多資訊請參見使用 ReplyingKafkaTemplate
末尾的說明。
現在可以手動提交批處理的一部分,在整個批處理完成處理之前。更多資訊請參見提交偏移量。
KafkaHeaders
變化
在 2.9.x 中已棄用的 KafkaHeaders
中的四個常量現已移除。
-
取代
MESSAGE_KEY
,請使用KEY
。 -
取代
PARTITION_ID
,請使用PARTITION
類似地,RECEIVED_MESSAGE_KEY
被 RECEIVED_KEY
取代,RECEIVED_PARTITION_ID
被 RECEIVED_PARTITION
取代。
測試變化
3.0.7 版本引入了 MockConsumerFactory
和 MockProducerFactory
。更多資訊請參見Mock Consumer 和 Producer。
從 3.0.10 版本開始,嵌入式 Kafka broker 預設將 Spring Boot 屬性 spring.kafka.bootstrap-servers
設定為嵌入式 broker 的地址。
2.9 版本相對於 2.8 版本的新特性
錯誤處理器變化
現在可以將 DefaultErrorHandler
配置為暫停容器一個 poll 並使用前一個 poll 的剩餘結果,而不是跳轉到剩餘記錄的偏移量。更多資訊請參見DefaultErrorHandler。
DefaultErrorHandler
現在有一個 BackOffHandler
屬性。更多資訊請參見Back Off Handler。
監聽器容器變化
interceptBeforeTx
現在適用於所有事務管理器(之前僅在使用 KafkaAwareTransactionManager
時應用)。參見[interceptBeforeTx]。
提供了一個新的容器屬性 pauseImmediate
,這允許容器在當前記錄處理後暫停 consumer,而不是在前一個 poll 的所有記錄處理後。參見[pauseImmediate]。
與 consumer 認證和授權相關的事件
Header Mapper 變化
現在可以配置應該對映哪些入站頭。在 2.8.8 或更高版本中也可用。更多資訊請參見訊息頭。
KafkaTemplate
變化
在 3.0 版本中,此類返回的 future 將是 CompletableFuture
s 而不是 ListenableFuture
s。關於使用此版本進行遷移的幫助,請參見使用 KafkaTemplate
。
ReplyingKafkaTemplate
變化
該模板現在提供了一個方法,用於等待回覆容器上的分配,以避免在回覆容器初始化之前傳送請求時出現的競態條件。在 2.8.8 或更高版本中也可用。參見使用 ReplyingKafkaTemplate
。
在 3.0 版本中,此類返回的 future 將是 CompletableFuture
s 而不是 ListenableFuture
s。關於使用此版本進行遷移的幫助,請參見使用 ReplyingKafkaTemplate
和 使用 Message<?>
進行請求/回覆。
2.8 版本相對於 2.7 版本的新特性
本節介紹從 2.7 版本到 2.8 版本的變化。有關更早版本中的變化,請參見變更歷史。
包變化
與型別對映相關的類和介面已從 …support.converter
移動到 …support.mapping
。
-
AbstractJavaTypeMapper
-
ClassMapper
-
DefaultJackson2JavaTypeMapper
-
Jackson2JavaTypeMapper
無序手動提交
監聽器容器現在可以配置為接受無序的手動偏移量提交(通常是非同步的)。容器將延遲提交,直到缺失的偏移量被確認。更多資訊請參見手動提交偏移量。
@KafkaListener
變化
現在可以在方法本身上指定監聽器方法是否為批處理監聽器。這使得同一個容器工廠可以用於記錄和批處理監聽器。
更多資訊請參見[batch-listeners]。
批處理監聽器現在可以處理轉換異常。
更多資訊請參見批處理錯誤處理器中的轉換錯誤。
RecordFilterStrategy
與批處理監聽器一起使用時,現在可以在一次呼叫中過濾整個批處理。更多資訊請參見[batch-listeners]末尾的說明。
@KafkaListener
註解現在具有 filter
屬性,用於覆蓋容器工廠在此監聽器上的 RecordFilterStrategy
。
@KafkaListener
註解現在具有 info
屬性;這用於填充新的監聽器容器屬性 listenerInfo
。然後將其用於在每條記錄中填充一個 KafkaHeaders.LISTENER_INFO
頭,可以在 RecordInterceptor
、RecordFilterStrategy
或監聽器本身中使用。更多資訊請參見監聽器資訊頭 和 AbstractMessageListenerContainer 屬性。
KafkaTemplate
變化
現在可以接收單條記錄,給定主題、分割槽和偏移量。更多資訊請參見使用 KafkaTemplate
接收。
CommonErrorHandler
新增
舊版的 GenericErrorHandler
及其用於記錄和批處理監聽器的子介面層次結構已被新的單一介面 CommonErrorHandler
取代,新介面的實現對應於大多數舊版 GenericErrorHandler
的實現。更多資訊請參見 容器錯誤處理器 和 將自定義舊版錯誤處理器實現遷移到 CommonErrorHandler
。
監聽器容器變更
interceptBeforeTx
容器屬性現在預設為 true
。
authorizationExceptionRetryInterval
屬性已重新命名為 authExceptionRetryInterval
,現在除了之前應用於 AuthorizationException
外,也適用於 AuthenticationException
。預設情況下,這兩種異常都被視為致命異常,容器將停止,除非設定了此屬性。
更多資訊請參見 使用 KafkaMessageListenerContainer
和 監聽器容器屬性。
序列化器/反序列化器變更
現在提供了 DelegatingByTopicSerializer
和 DelegatingByTopicDeserializer
。更多資訊請參見 委派序列化器和反序列化器。
DeadLetterPublishingRecover
變更
屬性 stripPreviousExceptionHeaders
現在預設為 true
。
現在有幾種技術可以自定義哪些頭部會被新增到輸出記錄中。
更多資訊請參見 管理死信記錄頭部。
可重試主題變更
現在,您可以對可重試主題和不可重試主題使用相同的工廠。更多資訊請參見 指定 ListenerContainerFactory。
現在有一個可管理的全域性致命異常列表,這些異常將使失敗的記錄直接進入 DLT。請參考 異常分類器 檢視如何管理它。
您現在可以結合使用阻塞和非阻塞重試。更多資訊請參見 結合使用阻塞和非阻塞重試。
使用可重試主題功能時丟擲的 KafkaBackOffException 現在記錄在 DEBUG 級別。如果您需要將日誌級別改回 WARN 或設定為任何其他級別,請參見 更改 KafkaBackOffException 日誌級別。
2.6 與 2.7 之間的變更
Kafka 客戶端版本
此版本需要 2.7.0 版本的 kafka-clients
。自版本 2.7.1 起,它也與 2.8.0 客戶端相容;請參見 覆蓋 Spring Boot 依賴。
使用主題實現非阻塞延遲重試
此版本中增加了這一重要的新功能。當嚴格排序不重要時,失敗的投遞可以傳送到另一個主題以便稍後消費。可以配置一系列此類重試主題,並增加延遲時間。更多資訊請參見 非阻塞重試。
監聽器容器變更
onlyLogRecordMetadata
容器屬性現在預設為 true
。
現在提供了一個新的容器屬性 stopImmediate
。
更多資訊請參見 監聽器容器屬性。
在投遞嘗試之間使用 BackOff
的錯誤處理器(例如 SeekToCurrentErrorHandler
和 DefaultAfterRollbackProcessor
)現在會在容器停止後立即退出退避間隔,而不是延遲停止。
繼承 FailedRecordProcessor
的錯誤處理器和回滾後處理器現在可以配置一個或多個 RetryListener
,以接收有關重試和恢復進度的資訊。
RecordInterceptor
現在增加了在監聽器返回(正常或透過丟擲異常)後呼叫的方法。它還有一個子介面 ConsumerAwareRecordInterceptor
。此外,現在還有一個用於批處理監聽器的 BatchInterceptor
。更多資訊請參見 訊息監聽器容器。
@KafkaListener
變更
您現在可以驗證 @KafkaHandler
方法(類級別監聽器)的載荷引數。更多資訊請參見 @KafkaListener
@Payload
驗證。
您現在可以在 MessagingMessageConverter
和 BatchMessagingMessageConverter
上設定 rawRecordHeader
屬性,這使得原始 ConsumerRecord
會被新增到轉換後的 Message<?>
中。這對於例如您希望在監聽器錯誤處理器中使用 DeadLetterPublishingRecoverer
的場景非常有用。更多資訊請參見 監聽器錯誤處理器。
您現在可以在應用程式初始化期間修改 @KafkaListener
註解。更多資訊請參見 @KafkaListener
屬性修改。
DeadLetterPublishingRecover
變更
現在,如果鍵和值都反序列化失敗,原始值會發布到 DLT。以前,值會被填充,但鍵的 DeserializationException
會留在頭部中。如果您子類化了恢復器並重寫了 createProducerRecord
方法,這是一個破壞性的 API 變更。
此外,恢復器在釋出到目標分割槽之前,會驗證目標解析器選擇的分割槽確實存在。
更多資訊請參見 釋出死信記錄。
ChainedKafkaTransactionManager
已棄用
更多資訊請參見 事務。
ReplyingKafkaTemplate
變更
現在有一種機制可以在檢查回覆時,如果存在某些條件,則異常地終止 future。
已新增對傳送和接收 spring-messaging
Message<?>
的支援。
更多資訊請參見 使用 ReplyingKafkaTemplate
。
Kafka Streams 變更
預設情況下,StreamsBuilderFactoryBean
現在配置為不清理本地狀態。更多資訊請參見 配置。
KafkaAdmin
變更
已新增新的方法 createOrModifyTopics
和 describeTopics
。已新增 KafkaAdmin.NewTopics
以方便在單個 bean 中配置多個主題。更多資訊請參見 [configuring-topics]。
MessageConverter
變更
現在可以將 spring-messaging
SmartMessageConverter
新增到 MessagingMessageConverter
中,從而允許根據 contentType
頭部進行內容協商。更多資訊請參見 Spring Messaging 訊息轉換。
按順序啟動 @KafkaListener
更多資訊請參見 按順序啟動 @KafkaListener
。
ExponentialBackOffWithMaxRetries
提供了一種新的 BackOff
實現,使得配置最大重試次數更加方便。更多資訊請參見 ExponentialBackOffWithMaxRetries
實現。
條件委派錯誤處理器
這些新的錯誤處理器可以配置為根據異常型別委派給不同的錯誤處理器。更多資訊請參見 委派錯誤處理器。
2.5 與 2.6 之間的變更
監聽器容器變更
預設的 EOSMode
現在是 BETA
。更多資訊請參見 精確一次語義。
各種錯誤處理器(繼承 FailedRecordProcessor
)和 DefaultAfterRollbackProcessor
現在在恢復失敗時會重置 BackOff
。此外,您現在可以根據失敗的記錄和/或異常來選擇使用的 BackOff
。
您現在可以在容器屬性中配置 adviceChain
。更多資訊請參見 監聽器容器屬性。
當容器配置為釋出 ListenerContainerIdleEvent
時,在釋出空閒事件後接收到記錄時,它現在會發布 ListenerContainerNoLongerIdleEvent
。更多資訊請參見 應用事件 和 檢測空閒和無響應的消費者。
@KafkaListener 變更
使用手動分割槽分配時,您現在可以指定一個萬用字元來確定哪些分割槽應重置為初始偏移量。此外,如果監聽器實現了 ConsumerSeekAware
,則在手動分配後會呼叫 onPartitionsAssigned()
方法。(也在版本 2.5.5 中新增)。更多資訊請參見 顯式分割槽分配。
AbstractConsumerSeekAware
中添加了便捷方法,使得 seek 操作更容易。更多資訊請參見 [seek]。
ErrorHandler 變更
FailedRecordProcessor
的子類(例如 SeekToCurrentErrorHandler
、DefaultAfterRollbackProcessor
、RecoveringBatchErrorHandler
)現在可以配置為,如果異常型別與此記錄之前發生的異常型別不同,則重置重試狀態。
生產者工廠變更
您現在可以為生產者設定最大存活時間,超過此時間後它們將被關閉並重新建立。更多資訊請參見 事務。
您現在可以在建立 DefaultKafkaProducerFactory
後更新配置對映。這可能很有用,例如,如果您在憑證更改後必須更新 SSL key/trust store 位置。更多資訊請參見 使用 DefaultKafkaProducerFactory
。
2.4 與 2.5 之間的變更
本節介紹從 2.4 版到 2.5 版的變更。對於早期版本的變更,請參見 變更歷史。
消費者/生產者工廠變更
預設的消費者和生產者工廠現在可以在建立或關閉消費者或生產者時呼叫回撥。提供了原生 Micrometer 指標的實現。更多資訊請參見 工廠監聽器。
您現在可以在執行時更改 bootstrap server 屬性,從而實現故障轉移到另一個 Kafka 叢集。更多資訊請參見 連線到 Kafka。
StreamsBuilderFactoryBean
變更
工廠 bean 現在可以在建立或銷燬 KafkaStreams
時呼叫回撥。提供了原生 Micrometer 指標的實現。更多資訊請參見 KafkaStreams Micrometer 支援。
投遞嘗試頭部
現在有一個選項可以在使用某些錯誤處理器和回滾後處理器時新增一個用於跟蹤投遞嘗試次數的頭部。更多資訊請參見 投遞嘗試頭部。
@KafkaListener 變更
當 @KafkaListener
返回型別為 Message<?>
時,如果需要,預設回覆頭部現在會自動填充。更多資訊請參見 回覆型別 Message<?>。
當傳入記錄的鍵為 null
時,KafkaHeaders.RECEIVED_MESSAGE_KEY
不再填充 null
值;而是完全省略該頭部。
@KafkaListener
方法現在可以指定 ConsumerRecordMetadata
引數,而不是使用離散的頭部來獲取元資料,例如主題、分割槽等。更多資訊請參見 消費者記錄元資料。
監聽器容器變更
assignmentCommitOption
容器屬性現在預設為 LATEST_ONLY_NO_TX
。更多資訊請參見 監聽器容器屬性。
使用事務時,subBatchPerPartition
容器屬性現在預設為 true
。更多資訊請參見 事務。
現在提供了一個新的 RecoveringBatchErrorHandler
。
現在支援靜態組成員資格。更多資訊請參見 訊息監聽器容器。
配置了增量/協同再平衡時,如果偏移量提交失敗且異常為非致命的 RebalanceInProgressException
,容器將嘗試在再平衡完成後重新提交仍分配給此例項的分割槽的偏移量。
現在,記錄監聽器的預設錯誤處理器是 SeekToCurrentErrorHandler
,批處理監聽器是 RecoveringBatchErrorHandler
。更多資訊請參見 容器錯誤處理器。
您現在可以控制標準錯誤處理器有意丟擲的異常的日誌級別。更多資訊請參見 容器錯誤處理器。
已新增 getAssignmentsByClientId()
方法,使得更容易確定併發容器中的哪些消費者被分配了哪些分割槽。更多資訊請參見 監聽器容器屬性。
您現在可以抑制在錯誤、除錯日誌等中記錄完整的 ConsumerRecord
。請參見 監聽器容器屬性 中的 onlyLogRecordMetadata
。
KafkaTemplate 變更
KafkaTemplate
現在可以維護 micrometer 計時器。更多資訊請參見 監控。
KafkaTemplate
現在可以使用 ProducerConfig
屬性進行配置,以覆蓋生產者工廠中的配置。更多資訊請參見 使用 KafkaTemplate
。
現在提供了 RoutingKafkaTemplate
。更多資訊請參見 使用 RoutingKafkaTemplate
。
您現在可以使用 KafkaSendCallback
代替 ListenerFutureCallback
來獲取更窄範圍的異常,從而更容易提取失敗的 ProducerRecord
。更多資訊請參見 使用 KafkaTemplate
。
Kafka 字串序列化器/反序列化器
現在提供了新的 ToStringSerializer
/StringDeserializer
以及關聯的 SerDe
。更多資訊請參見 字串序列化。
JsonDeserializer
JsonDeserializer
現在在確定反序列化型別方面更靈活。更多資訊請參見 使用方法確定型別。
委派序列化器/反序列化器
當出站記錄沒有頭部時,DelegatingSerializer
現在可以處理“標準”型別。更多資訊請參見 委派序列化器和反序列化器。
測試變更
KafkaTestUtils.consumerProps()
輔助記錄現在預設將 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
設定為 earliest
。更多資訊請參見 JUnit。
2.3 與 2.4 之間的變更
ConsumerAwareRebalanceListener
與 ConsumerRebalanceListener
類似,此介面現在增加了一個方法 onPartitionsLost
。更多資訊請參考 Apache Kafka 文件。
與 ConsumerRebalanceListener
不同,預設實現**不**呼叫 onPartitionsRevoked
。相反,監聽器容器在呼叫 onPartitionsLost
之後會呼叫該方法;因此,在實現 ConsumerAwareRebalanceListener
時,您不應執行相同的操作。
更多資訊請參見 再平衡監聽器 末尾的 IMPORTANT 注意事項。
KafkaTemplate
KafkaTemplate
現在支援非事務性發布與事務性發布並行。更多資訊請參見 KafkaTemplate
事務性與非事務性發布。
AggregatingReplyingKafkaTemplate
releaseStrategy
現在是一個 BiConsumer
。它現在在超時後(以及記錄到達時)被呼叫;在超時後呼叫時,第二個引數為 true
。
更多資訊請參見 聚合多個回覆。
監聽器容器
ContainerProperties
提供了一個 authorizationExceptionRetryInterval
選項,允許監聽器容器在 KafkaConsumer
丟擲任何 AuthorizationException
後重試。請參見其 JavaDocs 和 使用 KafkaMessageListenerContainer
瞭解更多資訊。
@KafkaListener
@KafkaListener
註解增加了一個新屬性 splitIterables
;預設為 true。當回覆監聽器返回 Iterable
時,此屬性控制返回結果是作為單個記錄傳送,還是為每個元素髮送一個記錄。更多資訊請參見 使用 @SendTo
轉發監聽器結果
批處理監聽器現在可以使用 BatchToRecordAdapter
進行配置;例如,這允許在事務中處理批處理,而監聽器每次獲取一條記錄。使用預設實現時,可以使用 ConsumerRecordRecoverer
來處理批處理中的錯誤,而不會停止整個批處理的程序——這在使用事務時可能很有用。更多資訊請參見 批處理監聽器事務。
Kafka Streams
StreamsBuilderFactoryBean
接受一個新的屬性 KafkaStreamsInfrastructureCustomizer
。這允許在流建立之前配置 builder 和/或 topology。更多資訊請參見 Spring 管理。
2.2 與 2.3 之間的變更
本節介紹從 2.2 版到 2.3 版的變更。
技巧、竅門和示例
已新增新的章節 技巧、竅門和示例。請提交 GitHub issue 和/或 pull request,以便在該章節中新增更多條目。
配置變更
從 2.3.4 版本開始,missingTopicsFatal
容器屬性預設為 false。當此屬性為 true 時,如果 broker 關閉,應用程式將無法啟動;許多使用者受到了此變更的影響;考慮到 Kafka 是一個高可用平臺,我們並未預期在沒有活動 broker 的情況下啟動應用程式是一個常見的用例。
生產者和消費者工廠變更
DefaultKafkaProducerFactory
現在可以配置為每個執行緒建立一個生產者。您還可以在建構函式中提供 Supplier<Serializer>
例項,作為配置類(需要無引數建構函式)或使用 Serializer
例項(然後在所有生產者之間共享)的替代方案。更多資訊請參見 使用 DefaultKafkaProducerFactory
。
DefaultKafkaConsumerFactory
中也提供了使用 Supplier<Deserializer>
例項的相同選項。更多資訊請參見 使用 KafkaMessageListenerContainer
。
監聽器容器變更
以前,當使用監聽器介面卡(例如 @KafkaListener
)呼叫監聽器時,錯誤處理器會接收到 ListenerExecutionFailedException
(其中實際的監聽器異常作為 cause
)。由原生 GenericMessageListener
丟擲的異常會原樣傳遞給錯誤處理器。現在,引數總是 ListenerExecutionFailedException
(其中實際的監聽器異常作為 cause
),它提供了訪問容器的 group.id
屬性的途徑。
由於監聽器容器有自己的提交偏移量的機制,它偏好將 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
設定為 false
。現在,除非在消費者工廠或容器的消費者屬性覆蓋中明確設定,否則它會自動將其設定為 false。
ackOnError
屬性現在預設為 false
。
現在可以在監聽器方法中獲取消費者的 group.id
屬性。更多資訊請參見 獲取消費者 group.id
。
容器增加了一個新的屬性 recordInterceptor
,允許在呼叫監聽器之前檢查或修改記錄。如果需要呼叫多個攔截器,還提供了 CompositeRecordInterceptor
。更多資訊請參見 訊息監聽器容器。
ConsumerSeekAware
增加了新方法,允許您相對於開頭、結尾或當前位置執行 seek 操作,以及 seek 到大於或等於時間戳的第一個偏移量。更多資訊請參見 [seek]。
現在提供了一個便捷類 AbstractConsumerSeekAware
,以簡化 seek 操作。更多資訊請參見 [seek]。
ContainerProperties
提供了一個 idleBetweenPolls
選項,允許監聽器容器的主迴圈在 KafkaConsumer.poll()
呼叫之間休眠。請參見其 JavaDocs 和 使用 KafkaMessageListenerContainer
瞭解更多資訊。
使用 AckMode.MANUAL
(或 MANUAL_IMMEDIATE
)時,您現在可以透過在 Acknowledgment
上呼叫 nack
來觸發重新投遞。更多資訊請參見 提交偏移量。
現在可以使用 Micrometer Timer
監控監聽器效能。更多資訊請參見 監控。
容器現在釋出更多與啟動相關的消費者生命週期事件。更多資訊請參見 應用事件。
事務性批處理監聽器現在支援殭屍防護(zombie fencing)。更多資訊請參見 事務。
監聽器容器工廠現在可以使用 ContainerCustomizer
進行配置,以便在每個容器建立和配置後進一步進行配置。更多資訊請參見 容器工廠。
ErrorHandler 變更
SeekToCurrentErrorHandler
現在將某些異常視為致命異常,並停用這些異常的重試,在首次失敗時即呼叫恢復器。
SeekToCurrentErrorHandler
和 SeekToCurrentBatchErrorHandler
現在可以配置為在投遞嘗試之間應用 BackOff
(執行緒休眠)。
從 2.3.2 版本開始,在錯誤處理器恢復失敗記錄後返回時,恢復記錄的偏移量將被提交。
DeadLetterPublishingRecoverer
在與 ErrorHandlingDeserializer
結合使用時,現在將傳送到死信主題的訊息的載荷設定為無法反序列化的原始值。以前,它為 null
,使用者程式碼需要從訊息頭部提取 DeserializationException
。更多資訊請參見 釋出死信記錄。
TopicBuilder
提供了一個新類 TopicBuilder
,用於更方便地建立用於自動主題配置的 NewTopic
@Bean
。更多資訊請參見 [configuring-topics]。
Kafka Streams 變更
您現在可以對由 @EnableKafkaStreams
建立的 StreamsBuilderFactoryBean
執行額外配置。更多資訊請參見 Streams 配置。
現在提供了一個 RecoveringDeserializationExceptionHandler
,它允許恢復帶有反序列化錯誤的記錄。它可以與 DeadLetterPublishingRecoverer
結合使用,將這些記錄傳送到死信主題。更多資訊請參見 從反序列化異常中恢復。
提供了 HeaderEnricher
轉換器,它使用 SpEL 生成頭部值。更多資訊請參見 頭部豐富器。
提供了 MessagingTransformer
。這使得 Kafka Streams topology 可以與 spring-messaging 元件(例如 Spring Integration 流)進行互動。更多資訊請參見 MessagingProcessor
和 [從 KStream
呼叫 Spring Integration 流]。
JSON 元件變更
現在,所有支援 JSON 的元件預設都配置了由 JacksonUtils.enhancedObjectMapper()
產生的 Jackson ObjectMapper
。JsonDeserializer
現在提供了基於 TypeReference
的建構函式,以更好地處理目標泛型容器型別。此外,還引入了 JacksonMimeTypeModule
,用於將 org.springframework.util.MimeType
序列化為純字串。更多資訊請參見其 JavaDocs 和 序列化、反序列化和訊息轉換。
提供了 ByteArrayJsonMessageConverter
以及所有 JSON 轉換器的新超類 JsonMessageConverter
。此外,現在還提供了 StringOrBytesSerializer
;它可以序列化 ProducerRecord
中的 byte[]
、Bytes
和 String
值。更多資訊請參見 Spring Messaging 訊息轉換。
JsonSerializer
、JsonDeserializer
和 JsonSerde
現在具有流暢的 API,使得程式設計配置更加簡單。請參見 javadoc、序列化、反序列化和訊息轉換 以及 Streams JSON 序列化和反序列化 瞭解更多資訊。
ReplyingKafkaTemplate
當回覆超時時,future 將以 KafkaReplyTimeoutException
異常完成,而不是 KafkaException
。
此外,現在提供了一個過載的 sendAndReceive
方法,允許按訊息指定回覆超時時間。
AggregatingReplyingKafkaTemplate
透過聚合來自多個接收者的回覆來擴充套件 ReplyingKafkaTemplate
。更多資訊請參見 聚合多個回覆。
事務變更
您現在可以在 KafkaTemplate
和 KafkaTransactionManager
上覆蓋生產者工廠的 transactionIdPrefix
。更多資訊請參見 transactionIdPrefix
。
新的委派序列化器/反序列化器
框架現在提供了一個委派 Serializer
和 Deserializer
,它利用頭部來支援生成和消費具有多種鍵/值型別的記錄。更多資訊請參見 委派序列化器和反序列化器。
新的重試反序列化器
框架現在提供了一個委派的 RetryingDeserializer
,用於在發生瞬時錯誤(例如網路問題)時重試序列化。更多資訊請參見 重試反序列化器。
2.1 與 2.2 之間的變更
類和包變更
ContainerProperties
類已從 org.springframework.kafka.listener.config
包移動到 org.springframework.kafka.listener
包。
AckMode
列舉已從 AbstractMessageListenerContainer
移動到 ContainerProperties
。
setBatchErrorHandler()
和 setErrorHandler()
方法已從 ContainerProperties
移動到 AbstractMessageListenerContainer
和 AbstractKafkaListenerContainerFactory
。
回滾後處理
提供了一個新的 AfterRollbackProcessor
策略。更多資訊請參見 回滾後處理器。
ConcurrentKafkaListenerContainerFactory
變更
您現在可以使用 ConcurrentKafkaListenerContainerFactory
建立和配置任何 ConcurrentMessageListenerContainer
,而不僅僅是用於 @KafkaListener
註解的容器。更多資訊請參見 容器工廠。
監聽器容器變更
已新增新的容器屬性(missingTopicsFatal
)。更多資訊請參見 使用 KafkaMessageListenerContainer
。
消費者停止時,現在會發出 ConsumerStoppedEvent
。更多資訊請參見 執行緒安全。
批處理監聽器可以選擇接收完整的 ConsumerRecords<?, ?>
物件,而不是 List<ConsumerRecord<?, ?>
。更多資訊請參見 [batch-listeners]。
DefaultAfterRollbackProcessor
和 SeekToCurrentErrorHandler
現在可以恢復(跳過)持續失敗的記錄,並且預設在失敗 10 次後執行此操作。它們可以配置為將失敗的記錄釋出到死信主題。
從 2.2.4 版本開始,在選擇死信主題名稱時可以使用消費者的 group ID。
已新增 ConsumerStoppingEvent
。更多資訊請參見 應用事件。
當容器配置為 AckMode.MANUAL_IMMEDIATE
時(自 2.2.4 版起),SeekToCurrentErrorHandler
現在可以配置為提交已恢復記錄的偏移量。
@KafkaListener 變更
您現在可以透過在註解上設定屬性來覆蓋監聽器容器工廠的 concurrency
和 autoStartup
屬性。您現在可以新增配置來決定將哪些頭資訊(如果有)複製到回覆訊息中。更多資訊請參閱 @KafkaListener
註解。
您現在可以將 @KafkaListener
作為元註解用於您自己的註解。更多資訊請參閱 將 @KafkaListener
作為元註解。
現在更容易為 @Payload
驗證配置一個 Validator
。更多資訊請參閱 @KafkaListener
@Payload
驗證。
您現在可以直接在註解上指定 Kafka 消費者屬性;這些屬性將覆蓋消費者工廠中定義的同名屬性(自版本 2.2.4 起)。更多資訊請參閱 註解屬性。
頭資訊對映變更
型別為 MimeType
和 MediaType
的頭資訊現在在 RecordHeader
值中對映為簡單的字串。以前,它們被對映為 JSON,並且只有 MimeType
被解碼。MediaType
無法解碼。現在為了互操作性,它們是簡單的字串。
此外,DefaultKafkaHeaderMapper
有一個新的 addToStringClasses
方法,允許指定應使用 toString()
而非 JSON 對映的型別。更多資訊請參閱 訊息頭資訊。
嵌入式 Kafka 變更
KafkaEmbedded
類及其 KafkaRule
介面已被棄用,轉而使用 EmbeddedKafkaBroker
及其 JUnit 4 包裝器 EmbeddedKafkaRule
。@EmbeddedKafka
註解現在填充一個 EmbeddedKafkaBroker
bean,而不是已棄用的 KafkaEmbedded
。此變更允許在 JUnit 5 測試中使用 @EmbeddedKafka
。@EmbeddedKafka
註解現在具有 ports
屬性,用於指定填充 EmbeddedKafkaBroker
的埠。更多資訊請參閱 測試應用程式。
JsonSerializer/Deserializer 增強
您現在可以使用生產者和消費者屬性提供型別對映資訊。
反序列化器上提供了新的建構函式,允許使用提供的目標型別覆蓋型別頭資訊。
JsonDeserializer
現在預設刪除任何型別資訊頭資訊。
您現在可以使用 Kafka 屬性將 JsonDeserializer
配置為忽略型別資訊頭資訊(自版本 2.2.3 起)。
更多資訊請參閱 序列化、反序列化和訊息轉換。
Kafka Streams 變更
streams 配置 bean 現在必須是一個 KafkaStreamsConfiguration
物件,而不是一個 StreamsConfig
物件。
StreamsBuilderFactoryBean
已從包 …core
移至 …config
。
引入了 KafkaStreamBrancher
,以便在基於 KStream
例項構建條件分支時提供更好的使用者體驗。
更多資訊請參閱 Apache Kafka Streams 支援 和 配置。
事務 ID
當事務由監聽器容器啟動時,transactional.id
現在是 transactionIdPrefix
附加 <group.id>.<topic>.<partition>
。此變更允許正確地隔離殭屍程序,如此處所述。
2.0 和 2.1 之間的變更
JSON 改進
StringJsonMessageConverter
和 JsonSerializer
現在在 Headers
中新增型別資訊,允許轉換器和 JsonDeserializer
在接收時基於訊息本身而不是固定的配置型別建立特定型別。更多資訊請參閱 序列化、反序列化和訊息轉換。
停止容器的錯誤處理器
現在為記錄和批次監聽器提供了容器錯誤處理器,它們將監聽器丟擲的任何異常視為致命錯誤,並停止容器。更多資訊請參閱 處理異常。
暫停和恢復容器
監聽器容器現在具有 pause()
和 resume()
方法(自版本 2.1.3 起)。更多資訊請參閱 暫停和恢復監聽器容器。
有狀態重試
自版本 2.1.3 起,您可以配置有狀態重試。更多資訊請參閱 有狀態重試。
客戶端 ID
自版本 2.1.1 起,您現在可以在 @KafkaListener
上設定 client.id
字首。以前,要自定義客戶端 ID,您需要為每個監聽器配置單獨的消費者工廠(和容器工廠)。當您使用併發時,字首後會新增 -n
以提供唯一的客戶端 ID。
記錄偏移量提交日誌
預設情況下,使用 DEBUG
日誌級別記錄主題偏移量提交日誌。自版本 2.1.2 起,ContainerProperties
中的新屬性 commitLogLevel
允許您指定這些訊息的日誌級別。更多資訊請參閱 使用 KafkaMessageListenerContainer
。
預設 @KafkaHandler
自版本 2.1.3 起,您可以將類級別 @KafkaListener
上的一個 @KafkaHandler
註解指定為預設註解。更多資訊請參閱 類級別 @KafkaListener
。
ReplyingKafkaTemplate
自版本 2.1.3 起,提供了一個 KafkaTemplate
的子類來支援請求/回覆語義。更多資訊請參閱 使用 ReplyingKafkaTemplate
。
從 2.0 遷移指南
請參閱 2.0 到 2.1 遷移 指南。
1.3 和 2.0 之間的變更
@KafkaListener
變更
您現在可以使用 @SendTo
註解 @KafkaListener
方法(以及類和 @KafkaHandler
方法)。如果方法返回結果,則會轉發到指定的主題。更多資訊請參閱 使用 @SendTo
轉發監聽器結果。
訊息監聽器
訊息監聽器現在可以感知 Consumer
物件。更多資訊請參閱 [message-listeners]。
使用 ConsumerAwareRebalanceListener
重平衡監聽器現在可以在重平衡通知期間訪問 Consumer
物件。更多資訊請參閱 重平衡監聽器。
1.2 和 1.3 之間的變更
事務支援
0.11.0.0 客戶端庫增加了對事務的支援。已新增 KafkaTransactionManager
及其他事務支援。更多資訊請參閱 事務。
頭資訊支援
0.11.0.0 客戶端庫增加了對訊息頭資訊的支援。現在可以將它們對映到 spring-messaging
MessageHeaders
並從中對映。更多資訊請參閱 訊息頭資訊。
Kafka 時間戳支援
KafkaTemplate
現在支援新增帶有時間戳的記錄的 API。已引入新的 KafkaHeaders
以支援 timestamp
。此外,還添加了新的 KafkaConditions.timestamp()
和 KafkaMatchers.hasTimestamp()
測試工具。更多詳情請參閱 使用 KafkaTemplate
、@KafkaListener
註解 和 測試應用程式。
@KafkaListener
變更
您現在可以配置 KafkaListenerErrorHandler
來處理異常。更多資訊請參閱 處理異常。
預設情況下,@KafkaListener
的 id
屬性現在用作 group.id
屬性,覆蓋消費者工廠中配置的屬性(如果存在)。此外,您可以在註解上顯式配置 groupId
。以前,您需要為每個監聽器使用單獨的容器工廠(和消費者工廠)來使用不同的 group.id
值。要恢復使用工廠配置的 group.id
的先前行為,請將註解上的 idIsGroup
屬性設定為 false
。
@EmbeddedKafka
註解
為了方便起見,提供了類級別的測試註解 @EmbeddedKafka
,用於將 KafkaEmbedded
註冊為 bean。更多資訊請參閱 測試應用程式。
Kerberos 配置
現在提供了配置 Kerberos 的支援。更多資訊請參閱 JAAS 和 Kerberos。
1.0 和 1.1 之間的變更
Seek
您現在可以尋找每個主題或分割槽的偏移位置。當使用組管理且 Kafka 分配分割槽時,您可以使用此功能在初始化期間設定初始位置。當檢測到空閒容器或在應用程式執行的任何任意點時,您也可以進行尋求。更多資訊請參閱 [seek]。