變更歷史

3.3 版本相比 3.2 版本的新功能

本節涵蓋從 3.2 版到 3.3 版的更改。有關早期版本的更改,請參閱變更歷史

DLT 主題命名約定

DLT 主題的命名約定已標準化,統一使用“-dlt”字尾。此更改可確保相容性,並避免在不同重試解決方案之間切換時發生衝突。希望保留“.DLT”字尾行為的使用者需要透過設定相應的 DLT 名稱屬性明確選擇加入。

增強的消費者組查詢操作

ConsumerSeekCallback 介面中添加了一個新方法 getGroupId()。此方法允許透過僅定位所需的消費者組來進行更具選擇性的查詢操作。AbstractConsumerSeekAware 現在還可以在多組監聽器場景中註冊、檢索和刪除每個主題分割槽的回撥,而不會遺漏任何回撥。有關更多詳細資訊,請參閱新的 API (getSeekCallbacksFor(TopicPartition topicPartition)getTopicsAndCallbacks())。有關更多詳細資訊,請參閱查詢 API 文件

使用 RecordFilterStrategy 配置 Kafka 監聽器中空批次的處​​理

RecordFilterStrategy 現在支援忽略過濾產生的空批次。這可以透過重寫預設方法 ignoreEmptyBatch() 進行配置,該方法預設為 false,確保即使所有 ConsumerRecords 都被過濾掉,KafkaListener 也會被呼叫。有關更多詳細資訊,請參閱訊息接收過濾文件

ConcurrentContainerStoppedEvent

當所有子容器都停止時,ConcurentContainerMessageListenerContainer 現在會發出 ConcurrentContainerStoppedEvent。有關更多詳細資訊,請參閱應用程式事件ConcurrentContainerStoppedEvent Javadoc。

回覆中的原始記錄鍵

使用 ReplyingKafkaTemplate 時,如果請求中的原始記錄包含鍵,則該鍵也將成為回覆的一部分。有關更多詳細資訊,請參閱參考文件的傳送訊息部分。

自定義 DeadLetterPublishingRecovererFactory 中的日誌記錄

使用 DeadLetterPublishingRecovererFactory 時,使用者應用程式可以重寫 maybeLogListenerException 方法來自定義日誌記錄行為。

自定義 KafkaAdmin 中的 Admin 客戶端

擴充套件 KafkaAdmin 時,使用者應用程式可以重寫 createAdmin 方法來自定義 Admin 客戶端的建立。

自定義 Kafka Streams 的實現

使用 KafkaStreamsCustomizer 時,現在可以透過重寫 initKafkaStreams 方法返回 KafkaStreams 物件的自定義實現。

批處理監聽器的 KafkaHeaders.DELIVERY_ATTEMPT

使用 BatchListener 時,ConsumerRecord 的頭欄位中可以包含 KafkaHeaders.DELIVERY_ATTMPT 頭。如果 DeliveryAttemptAwareRetryListener 設定為錯誤處理程式作為重試監聽器,則每個 ConsumerRecord 都具有傳遞嘗試頭。有關更多詳細資訊,請參閱批處理監聽器的 Kafka 頭

Kafka Metrics Listeners 和 TaskScheduler

MicrometerProducerListenerMicrometerConsumerListenerKafkaStreamsMicrometerListener 現在可以使用 TaskScheduler 進行配置。有關更多資訊,請參閱 KafkaMetricsSupport Javadoc 和Micrometer 支援

3.2 版本相比 3.1 版本的新功能

本節涵蓋從 3.1 版到 3.2 版的更改。有關早期版本的更改,請參閱變更歷史

Kafka 客戶端版本

此版本需要 3.7.0 的 kafka-clients。Kafka 客戶端的 3.7.0 版本引入了新的消費者組協議。有關更多詳細資訊及其限制,請參閱KIP-848。新的消費者組協議是一個早期訪問版本,不適用於生產環境。在此版本中,建議僅用於測試目的。因此,Spring for Apache Kafka 僅在 kafka-client 本身提供的測試級別支援範圍內支援此新的消費者組協議。預設情況下,Spring for Apache Kafka 使用經典的消費者組協議,在測試新的消費者組協議時,需要透過消費者上的 group.protocol 屬性選擇加入。

測試支援更改

EmbeddedKafka 中的 kraft 模式預設停用,希望使用 kraft 模式的使用者必須啟用它。這是由於在使用 EmbeddedKafkakraft 模式時觀察到某些不穩定情況,尤其是在測試新的消費者組協議時。新的消費者組協議僅在 kraft 模式下受支援,因此,在測試新協議時,需要針對真實的 Kafka 叢集而不是基於 KafkaClusterTestKit 的叢集進行測試,EmbeddedKafka 就是基於此的。此外,在使用 EmbeddedKafkakraft 模式執行多個 KafkaListener 方法時,還觀察到一些其他競爭條件。在這些問題解決之前,EmbeddedKafka 上的 kraft 預設值將保持為 false

Kafka Streams 互動式查詢支援

一個新的 API KafkaStreamsInteractiveQuerySupport 用於訪問 Kafka Streams 互動式查詢中使用的可查詢儲存。有關更多詳細資訊,請參閱Kafka Streams 互動式支援

TransactionIdSuffixStrategy

引入了一個新的 TransactionIdSuffixStrategy 介面來管理 transactional.id 字尾。預設實現是 DefaultTransactionIdSuffixStrategy,當設定 maxCache 大於零時,可以在特定範圍內重用 transactional.id,否則字尾將透過遞增計數器動態生成。有關更多資訊,請參閱固定 TransactionIdSuffix

非同步 @KafkaListener 返回

@KafkaListener(和 @KafkaHandler)方法現在可以返回非同步返回型別,包括 CompletableFuture<?>Mono<?> 和 Kotlin suspend 函式。有關更多資訊,請參閱非同步返回

根據丟擲的異常將訊息路由到自定義 DLT

現在可以根據訊息處理過程中丟擲的異常型別將訊息重定向到自定義 DLT。重定向規則可以透過 RetryableTopic.exceptionBasedDltRoutingRetryTopicConfigurationBuilder.dltRoutingRules 設定。自定義 DLT 以及其他重試和死信主題會自動建立。有關更多資訊,請參閱根據丟擲的異常將訊息路由到自定義 DLT

棄用 ContainerProperties transactionManager 屬性

棄用 ContainerProperties 中的 transactionManager 屬性,轉而使用 KafkaAwareTransactionManager,後者是相對於通用 PlatformTransactionManager 的更窄型別。請參閱ContainerProperties事務同步

回滾後處理

提供了新的 AfterRollbackProcessor API processBatch。有關更多資訊,請參閱回滾後處理器

更改 @RetryableTopic SameIntervalTopicReuseStrategy 預設值

@RetryableTopic 屬性 SameIntervalTopicReuseStrategy 預設值更改為 SINGLE_TOPIC。請參閱最大間隔指數延遲的單個主題

非阻塞重試支援類級別 @KafkaListener

非阻塞重試支援類上的 @KafkaListener。請參閱非阻塞重試

在 RetryTopicConfigurationProvider 中支援處理類上的 @RetryableTopic。

提供一個新的公共 API 來查詢 RetryTopicConfiguration。請參閱查詢 RetryTopicConfiguration

RetryTopicConfigurer 支援處理 MultiMethodKafkaListenerEndpoint。

RetryTopicConfigurer 支援處理和註冊 MultiMethodKafkaListenerEndpointMultiMethodKafkaListenerEndpoint 為屬性 defaultMethodmethods 提供 getter/setter。修改嚴格用於 MethodKafkaListenerEndpoint 型別的 EndpointCustomizerEndpointHandlerMethod 添加了新的建構函式,用於為提供的 bean 構造例項。提供新的類 EndpointHandlerMultiMethod 來處理重試端點的多方法。

根據使用者提供的函式查詢偏移量的新 API 方法

ConsumerCallback 提供了一個新的 API,可以根據使用者定義的函式查詢偏移量,該函式將消費者中的當前偏移量作為引數。有關更多詳細資訊,請參閱查詢 API 文件

@PartitionOffset 支援 SeekPosition

@PartitionOffset 新增 seekPosition 屬性以支援 TopicPartitionOffset.SeekPosition。有關更多詳細資訊,請參閱手動分配

TopicPartitionOffset 中接受一個用於計算要查詢的偏移量的函式的新建構函式

TopicPartitionOffset 有一個新的建構函式,它接受一個使用者提供的函式來計算要查詢的偏移量。當使用此建構函式時,框架會呼叫該函式,並以當前消費者偏移量位置作為輸入引數。有關更多詳細資訊,請參閱查詢 API 文件

Spring Boot 應用程式名稱作為預設客戶端 ID 字首

對於定義了應用程式名稱的 Spring Boot 應用程式,此名稱現在用作某些客戶端型別自動生成的客戶端 ID 的預設字首。有關更多詳細資訊,請參閱預設客戶端 ID 字首

增強的 MessageListenerContainers 檢索

ListenerContainerRegistry 提供了兩個新的 API,用於動態查詢和過濾 MessageListenerContainer 例項。getListenerContainersMatching(Predicate<String> idMatcher) 按 ID 過濾,另一個是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher) 按 ID 和容器屬性過濾。

有關更多資訊,請參閱@KafkaListener 生命週期管理的 API 文件

透過提供更多跟蹤標籤增強觀察

KafkaTemplateObservation 提供了更多跟蹤標籤(低基數)。KafkaListenerObservation 提供了新的 API 來查詢高基數鍵名和更多跟蹤標籤(高或低基數)。請參閱Micrometer 觀察

3.1 版本相比 3.0 版本的新功能

本節涵蓋從 3.0 版到 3.1 版的更改。有關早期版本的更改,請參閱變更歷史

Kafka 客戶端版本

此版本需要 3.6.0 的 kafka-clients

EmbeddedKafkaBroker

現在提供了一個額外的實現,以使用 Kraft 而不是 Zookeeper。有關更多資訊,請參閱嵌入式 Kafka 代理

JsonDeserializer

當發生反序列化異常時,SerializationException 訊息不再包含格式為 Can’t deserialize data [[123, 34, 98, 97, 122, …​ 的資料;每個資料位元組的數值陣列沒有用,對於大量資料可能會很冗長。當與 ErrorHandlingDeserializer 一起使用時,傳送給錯誤處理程式的 DeserializationException 包含 data 屬性,該屬性包含無法反序列化的原始資料。當不與 ErrorHandlingDeserializer 一起使用時,KafkaConsumer 將持續為同一記錄發出異常,顯示主題/分割槽/偏移量以及 Jackson 丟擲的原因。

ContainerPostProcessor

可以透過在 @KafkaListener 註解上指定 ContainerPostProcessor 的 bean 名稱來對監聽器容器應用後處理。這發生在容器建立之後以及在容器工廠上配置的任何 ContainerCustomizer 配置之後。有關更多資訊,請參閱容器工廠

ErrorHandlingDeserializer

現在可以將 Validator 新增到此反序列化器中;如果委託 Deserializer 成功反序列化物件,但該物件未能透過驗證,則會丟擲類似於發生反序列化異常的異常。這允許將原始原始資料傳遞給錯誤處理程式。有關更多資訊,請參閱使用 ErrorHandlingDeserializer

可重試主題

@RetryableTopic(backOff = @BackOff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) 時,將字尾 -retry-5000 更改為 -retry。如果您想保留後綴 -retry-5000,請使用 @RetryableTopic(backOff = @BackOff(delay = 5000), attempts = "2")。有關更多資訊,請參閱主題命名

監聽器容器更改

當手動分配分割槽,且消費者 group.idnull 時,AckMode 現在會自動強制轉換為 MANUAL。有關更多資訊,請參閱手動分配所有分割槽

3.0 版本相比 2.9 版本的新功能

Kafka 客戶端版本

此版本需要 3.3.1 的 kafka-clients

精確一次語義

不再支援 EOSMode.V1(又名 ALPHA)。

使用事務時,最小代理版本為 2.5。

有關更多資訊,請參閱精確一次語義KIP-447

觀察

現在支援使用 Micrometer 啟用計時器和跟蹤的觀察。有關更多資訊,請參閱觀察

原生映象

提供了建立原生映象的支援。有關更多資訊,請參閱原生映象

全域性單個嵌入式 Kafka

嵌入式 Kafka (EmbeddedKafkaBroker) 現在可以作為整個測試計劃的單個全域性例項啟動。有關更多資訊,請參閱為多個測試類使用相同的代理

可重試主題更改

此功能不再被視為實驗性功能(就其 API 而言),該功能本身自 2.7 版本以來就已受支援,但出現 API 破壞性更改的可能性高於正常情況。

此版本中非阻塞重試基礎設施 bean 的引導方式已更改,以避免在某些應用程式中出現應用程式初始化相關的時序問題。

您現在可以為重試容器設定不同的 concurrency;預設情況下,併發性與主容器相同。

@RetryableTopic 現在可以用作自定義註解上的元註解,包括支援 @AliasFor 屬性。

有關更多資訊,請參閱配置

重試主題的預設複製因子現在是 -1(使用代理預設值)。如果您的代理版本早於 2.4,則現在需要明確設定該屬性。

您現在可以在同一個應用程式上下文中為同一個主題配置多個 @RetryableTopic 監聽器。以前這是不可能的。有關更多資訊,請參閱多個監聽器,相同主題

RetryTopicConfigurationSupport 中存在破壞性 API 更改;具體來說,如果您重寫 destinationTopicResolverkafkaConsumerBackoffManager 和/或 retryTopicConfigurer 的 bean 定義方法;這些方法現在需要一個 ObjectProvider<RetryTopicComponentFactory> 引數。

監聽器容器更改

現在,容器會發布與消費者身份驗證和授權失敗相關的事件。有關更多資訊,請參閱應用程式事件

您現在可以自定義消費者執行緒使用的執行緒名稱。有關更多資訊,請參閱容器執行緒命名

已新增容器屬性 restartAfterAuthException。有關更多資訊,請參閱監聽器容器屬性

KafkaTemplate 更改

此類返回的 Future 現在是 CompletableFuture,而不是 ListenableFuture。請參閱使用 KafkaTemplate

ReplyingKafkaTemplate 更改

此類返回的 Future 現在是 CompletableFuture,而不是 ListenableFuture。請參閱使用 ReplyingKafkaTemplate使用 Message<?> 進行請求/回覆

@KafkaListener 更改

您現在可以使用自定義關聯頭,該頭將在任何回覆訊息中回顯。有關更多資訊,請參閱使用 ReplyingKafkaTemplate 末尾的註釋。

您現在可以在處理整個批次之前手動提交批次的部分內容。有關更多資訊,請參閱提交偏移量

KafkaHeaders 更改

KafkaHeaders 中在 2.9.x 中棄用的四個常量現已移除。

  • 使用 KEY 代替 MESSAGE_KEY

  • 使用 PARTITION 代替 PARTITION_ID

同樣,RECEIVED_KEY 替換 RECEIVED_MESSAGE_KEYRECEIVED_PARTITION 替換 RECEIVED_PARTITION_ID

測試更改

版本 3.0.7 引入了 MockConsumerFactoryMockProducerFactory。有關更多資訊,請參閱模擬消費者和生產者

從 3.0.10 版本開始,嵌入式 Kafka 代理預設將 Spring Boot 屬性 spring.kafka.bootstrap-servers 設定為嵌入式代理的地址。

2.9 版本相比 2.8 版本的新功能

Kafka 客戶端版本

此版本需要 3.2.0 的 kafka-clients

錯誤處理程式更改

DefaultErrorHandler 現在可以配置為暫停容器一次輪詢,並使用上一次輪詢的剩餘結果,而不是查詢剩餘記錄的偏移量。有關更多資訊,請參閱DefaultErrorHandler

DefaultErrorHandler 現在有一個 BackOffHandler 屬性。有關更多資訊,請參閱Back Off 處理程式

監聽器容器更改

interceptBeforeTx 現在適用於所有事務管理器(以前僅在使用 KafkaAwareTransactionManager 時應用)。請參閱[interceptBeforeTx]

提供了一個新的容器屬性 pauseImmediate,它允許容器在處理當前記錄後暫停消費者,而不是在處理完上一次輪詢中的所有記錄之後。請參閱[pauseImmediate]

與消費者身份驗證和授權相關的事件

Header Mapper 更改

您現在可以配置哪些入站頭應該被對映。在 2.8.8 或更高版本中也可用。有關更多資訊,請參閱訊息頭

KafkaTemplate 更改

在 3.0 版本中,此類返回的 Future 將是 CompletableFuture,而不是 ListenableFuture。有關在使用此版本時進行過渡的幫助,請參閱使用 KafkaTemplate

ReplyingKafkaTemplate 更改

該模板現在提供了一種在回覆容器上等待分配的方法,以避免在回覆容器初始化之前傳送請求時出現競爭條件。在 2.8.8 或更高版本中也可用。請參閱使用 ReplyingKafkaTemplate

在 3.0 版本中,此類返回的 Future 將是 CompletableFuture,而不是 ListenableFuture。有關在使用此版本時進行過渡的幫助,請參閱使用 ReplyingKafkaTemplate使用 Message<?> 進行請求/回覆

2.8 版本相比 2.7 版本的新功能

本節涵蓋從 2.7 版到 2.8 版的更改。有關早期版本的更改,請參閱變更歷史

Kafka 客戶端版本

此版本需要 3.0.0 的 kafka-clients

包更改

與型別對映相關的類和介面已從 ...support.converter 移動到 ...support.mapping

  • AbstractJavaTypeMapper

  • ClassMapper

  • DefaultJackson2JavaTypeMapper

  • Jackson2JavaTypeMapper

亂序手動提交

現在可以配置監聽器容器以接受亂序(通常是非同步)的手動偏移量提交。容器將推遲提交,直到確認缺失的偏移量。有關更多資訊,請參閱手動提交偏移量

@KafkaListener 更改

現在可以在方法本身上指定監聽器方法是否是批處理監聽器。這允許同一個容器工廠用於記錄監聽器和批處理監聽器。

有關更多資訊,請參閱[批處理監聽器]

批處理監聽器現在可以處理轉換異常。

有關更多資訊,請參閱使用批處理錯誤處理程式進行轉換錯誤

RecordFilterStrategy 在與批處理監聽器一起使用時,現在可以在一次呼叫中過濾整個批處理。有關更多資訊,請參閱[批處理監聽器]末尾的註釋。

@KafkaListener 註解現在具有 filter 屬性,用於僅為此監聽器覆蓋容器工廠的 RecordFilterStrategy

@KafkaListener 註解現在具有 info 屬性;這用於填充新的監聽器容器屬性 listenerInfo。然後,這用於在每個記錄中填充 KafkaHeaders.LISTENER_INFO 頭,該頭可以在 RecordInterceptorRecordFilterStrategy 或監聽器本身中使用。有關更多資訊,請參閱監聽器資訊頭AbstractMessageListenerContainer 屬性

KafkaTemplate 更改

現在,給定主題、分割槽和偏移量,您可以接收單個記錄。有關更多資訊,請參閱使用 KafkaTemplate 接收

新增 CommonErrorHandler

舊的 GenericErrorHandler 及其用於記錄和批處理監聽器的子介面層次結構已被新的單個介面 CommonErrorHandler 取代,其實現對應於 GenericErrorHandler 的大多數舊實現。有關更多資訊,請參閱容器錯誤處理程式將自定義舊版錯誤處理程式實現遷移到 CommonErrorHandler

監聽器容器更改

interceptBeforeTx 容器屬性現在預設為 true

authorizationExceptionRetryInterval 屬性已重新命名為 authExceptionRetryInterval,現在除了以前的 AuthorizationException 之外,還適用於 AuthenticationException。這兩個異常都被認為是致命的,容器預設會停止,除非設定此屬性。

有關更多資訊,請參閱使用 KafkaMessageListenerContainer監聽器容器屬性

序列化器/反序列化器更改

現在提供了 DelegatingByTopicSerializerDelegatingByTopicDeserializer。有關更多資訊,請參閱委託序列化器和反序列化器

DeadLetterPublishingRecover 更改

屬性 stripPreviousExceptionHeaders 現在預設為 true

現在有幾種技術可以自定義新增到輸出記錄中的頭。

有關更多資訊,請參閱管理死信記錄頭

可重試主題更改

現在可以將同一個工廠用於可重試和不可重試的主題。有關更多資訊,請參閱指定 ListenerContainerFactory

現在有一個可管理的致命異常全域性列表,這些異常將使失敗的記錄直接進入 DLT。請參閱異常分類器瞭解如何管理它。

您現在可以結合使用阻塞和非阻塞重試。有關更多資訊,請參閱結合阻塞和非阻塞重試

現在,在使用可重試主題功能時丟擲的 KafkaBackOffException 會以 DEBUG 級別記錄。如果需要將日誌級別改回 WARN 或設定為任何其他級別,請參閱更改 KafkaBackOffException 日誌級別

2.6 與 2.7 之間的更改

Kafka 客戶端版本

此版本需要 2.7.0 的 kafka-clients。它也與 2.7.1 版以來的 2.8.0 客戶端相容;請參閱覆蓋 Spring Boot 依賴項

使用主題的非阻塞延遲重試

此版本中添加了這一重要新功能。當嚴格排序不重要時,失敗的傳遞可以傳送到另一個主題,以便稍後消費。可以配置一系列此類重試主題,並增加延遲。有關更多資訊,請參閱非阻塞重試

監聽器容器更改

onlyLogRecordMetadata 容器屬性現在預設為 true

現在提供了一個新的容器屬性 stopImmediate

有關更多資訊,請參閱監聽器容器屬性

在傳遞嘗試之間使用 BackOff 的錯誤處理程式(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessor)現在會在容器停止後不久退出退避間隔,而不是延遲停止。

擴充套件 FailedRecordProcessor 的錯誤處理程式和回滾後處理器現在可以配置一個或多個 RetryListener 以接收有關重試和恢復進度的資訊。

RecordInterceptor 現在有在監聽器返回(正常或丟擲異常)後呼叫的額外方法。它還有一個子介面 ConsumerAwareRecordInterceptor。此外,現在還有一個用於批處理監聽器的 BatchInterceptor。有關更多資訊,請參閱訊息監聽器容器

@KafkaListener 更改

您現在可以驗證 @KafkaHandler 方法(類級別監聽器)的有效負載引數。有關更多資訊,請參閱@KafkaListener @Payload 驗證

您現在可以在 MessagingMessageConverterBatchMessagingMessageConverter 上設定 rawRecordHeader 屬性,這將導致原始 ConsumerRecord 新增到轉換後的 Message<?> 中。例如,如果您希望在監聽器錯誤處理程式中使用 DeadLetterPublishingRecoverer,這將很有用。有關更多資訊,請參閱監聽器錯誤處理程式

您現在可以在應用程式初始化期間修改 @KafkaListener 註解。有關更多資訊,請參閱@KafkaListener 屬性修改

DeadLetterPublishingRecover 更改

現在,如果鍵和值都反序列化失敗,原始值將釋出到 DLT。以前,值被填充,但鍵 DeserializationException 留在頭中。如果您子類化了恢復器並重寫了 createProducerRecord 方法,則存在一個破壞性 API 更改。

此外,恢復器在釋出到 DLT 之前會驗證目標解析器選擇的分割槽是否實際存在。

有關更多資訊,請參閱釋出死信記錄

ChainedKafkaTransactionManager 已棄用

有關更多資訊,請參閱事務

ReplyingKafkaTemplate 更改

現在有一個機制可以檢查回覆並在存在某些條件時使 future 異常失敗。

已新增對傳送和接收 spring-messaging Message<?> 的支援。

有關更多資訊,請參閱使用 ReplyingKafkaTemplate

Kafka Streams 更改

預設情況下,StreamsBuilderFactoryBean 現在配置為不清理本地狀態。有關更多資訊,請參閱配置

KafkaAdmin 更改

已新增新方法 createOrModifyTopicsdescribeTopics。已新增 KafkaAdmin.NewTopics 以方便在單個 bean 中配置多個主題。有關更多資訊,請參閱[配置主題]

MessageConverter 更改

現在可以將 spring-messaging SmartMessageConverter 新增到 MessagingMessageConverter 中,從而允許基於 contentType 頭進行內容協商。有關更多資訊,請參閱Spring Messaging 訊息轉換

排序 @KafkaListener

有關更多資訊,請參閱按順序啟動 @KafkaListener

ExponentialBackOffWithMaxRetries

提供了一種新的 BackOff 實現,使配置最大重試次數更加方便。有關更多資訊,請參閱ExponentialBackOffWithMaxRetries 實現

條件委託錯誤處理程式

這些新的錯誤處理程式可以配置為委託給不同的錯誤處理程式,具體取決於異常型別。有關更多資訊,請參閱委託錯誤處理程式

2.5 與 2.6 之間的更改

Kafka 客戶端版本

此版本需要 2.6.0 的 kafka-clients

偵聽器容器的更改

預設的 EOSMode 現在是 BETA。有關更多資訊,請參閱精確一次語義

各種錯誤處理程式(擴充套件 FailedRecordProcessor)和 DefaultAfterRollbackProcessor 現在會在恢復失敗時重置 BackOff。此外,您現在可以根據失敗的記錄和/或異常選擇要使用的 BackOff

您現在可以在容器屬性中配置 adviceChain。有關更多資訊,請參閱監聽器容器屬性

當容器配置為釋出 ListenerContainerIdleEvent 時,它現在會在釋出空閒事件後收到記錄時釋出 ListenerContainerNoLongerIdleEvent。有關更多資訊,請參閱應用程式事件檢測空閒和無響應的消費者

@KafkaListener 更改

當使用手動分割槽分配時,您現在可以指定一個萬用字元來確定哪些分割槽應該重置為初始偏移量。此外,如果監聽器實現了 ConsumerSeekAware,則 onPartitionsAssigned() 在手動分配後被呼叫。(在 2.5.5 版本中也已新增)。有關更多資訊,請參閱顯式分割槽分配

已向 AbstractConsumerSeekAware 添加了便利方法,以簡化查詢。有關更多資訊,請參閱[查詢]

ErrorHandler 更改

FailedRecordProcessor 的子類(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessorRecoveringBatchErrorHandler)現在可以配置為在異常型別與先前在此記錄中發生的異常型別不同時重置重試狀態。

生產者工廠更改

您現在可以設定生產者的最大生命週期,在此之後它們將被關閉並重新建立。有關更多資訊,請參閱事務

您現在可以在建立 DefaultKafkaProducerFactory 後更新配置對映。例如,如果您在憑據更改後必須更新 SSL 金鑰/信任儲存位置,這可能很有用。有關更多資訊,請參閱使用 DefaultKafkaProducerFactory

2.4 與 2.5 之間的更改

本節涵蓋從 2.4 版到 2.5 版的更改。有關早期版本的更改,請參閱變更歷史

消費者/生產者工廠更改

預設的消費者和生產者工廠現在可以在建立或關閉消費者或生產者時呼叫回撥。提供了本機 Micrometer 指標的實現。有關更多資訊,請參閱工廠監聽器

您現在可以在執行時更改引導伺服器屬性,從而實現故障轉移到另一個 Kafka 叢集。有關更多資訊,請參閱連線到 Kafka

StreamsBuilderFactoryBean 更改

工廠 bean 現在可以在建立或銷燬 KafkaStreams 時呼叫回撥。提供了本機 Micrometer 指標的實現。有關更多資訊,請參閱KafkaStreams Micrometer 支援

Kafka 客戶端版本

此版本需要 2.5.0 的 kafka-clients

類/包更改

SeekUtils 已從 o.s.k.support 包移動到 o.s.k.listener

傳遞嘗試頭

現在有一個選項,在使用某些錯誤處理程式和回滾後處理器時,新增一個跟蹤傳遞嘗試的頭。有關更多資訊,請參閱傳遞嘗試頭

@KafkaListener 更改

@KafkaListener 返回型別為 Message<?> 時,如果需要,現在會自動填充預設回覆頭。有關更多資訊,請參閱回覆型別 Message<?>

當傳入記錄的鍵為 null 時,KafkaHeaders.RECEIVED_MESSAGE_KEY 不再填充 null 值;該頭完全被省略。

@KafkaListener 方法現在可以指定 ConsumerRecordMetadata 引數,而不是使用單獨的頭來獲取元資料,例如主題、分割槽等。有關更多資訊,請參閱消費者記錄元資料

監聽器容器更改

assignmentCommitOption 容器屬性現在預設為 LATEST_ONLY_NO_TX。有關更多資訊,請參閱監聽器容器屬性

在使用事務時,subBatchPerPartition 容器屬性現在預設為 true。有關更多資訊,請參閱事務

現在提供了一個新的 RecoveringBatchErrorHandler

現在支援靜態組員資格。有關更多資訊,請參閱訊息監聽器容器

當配置增量/協作重新平衡時,如果偏移量提交因非致命的 RebalanceInProgressException 而失敗,容器將嘗試重新提交在重新平衡完成後仍分配給此例項的分割槽的偏移量。

現在,記錄監聽器的預設錯誤處理程式是 SeekToCurrentErrorHandler,批處理監聽器的預設錯誤處理程式是 RecoveringBatchErrorHandler。有關更多資訊,請參閱容器錯誤處理程式

您現在可以控制標準錯誤處理程式有意丟擲的異常的日誌級別。有關更多資訊,請參閱容器錯誤處理程式

已新增 getAssignmentsByClientId() 方法,使確定併發容器中哪些消費者分配到哪些分割槽變得更容易。有關更多資訊,請參閱監聽器容器屬性

您現在可以禁止在錯誤、除錯日誌等中記錄整個 ConsumerRecord。請參閱監聽器容器屬性中的 onlyLogRecordMetadata

KafkaTemplate 更改

KafkaTemplate 現在可以維護 Micrometer 計時器。有關更多資訊,請參閱監控

KafkaTemplate 現在可以配置 ProducerConfig 屬性,以覆蓋生產者工廠中的屬性。有關更多資訊,請參閱使用 KafkaTemplate

現在提供了一個 RoutingKafkaTemplate。有關更多資訊,請參閱使用 RoutingKafkaTemplate

您現在可以使用 KafkaSendCallback 而不是 ListenerFutureCallback 來獲取更窄的異常,從而更容易提取失敗的 ProducerRecord。有關更多資訊,請參閱使用 KafkaTemplate

Kafka 字串序列化器/反序列化器

現在提供了新的 ToStringSerializer/StringDeserializer 以及相關的 SerDe。有關更多資訊,請參閱字串序列化

JsonDeserializer

JsonDeserializer 現在具有更大的靈活性來確定反序列化型別。有關更多資訊,請參閱使用方法確定型別

委託序列化器/反序列化器

當出站記錄沒有頭時,DelegatingSerializer 現在可以處理“標準”型別。有關更多資訊,請參閱委託序列化器和反序列化器

測試更改

KafkaTestUtils.consumerProps() 輔助記錄現在預設將 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 設定為 earliest。有關更多資訊,請參閱JUnit

2.3 與 2.4 之間的更改

Kafka 客戶端版本

此版本需要 2.4.0 或更高版本的 kafka-clients,並支援新的增量再平衡功能。

ConsumerAwareRebalanceListener

ConsumerRebalanceListener 類似,此介面現在具有一個額外的方法 onPartitionsLost。有關更多資訊,請參閱 Apache Kafka 文件。

ConsumerRebalanceListener 不同,預設實現不呼叫 onPartitionsRevoked。相反,監聽器容器將在呼叫 onPartitionsLost 後呼叫該方法;因此,在實現 ConsumerAwareRebalanceListener 時,您不應該做同樣的事情。

有關更多資訊,請參閱再平衡監聽器末尾的 IMPORTANT 註釋。

GenericErrorHandler

isAckAfterHandle() 預設實現現在預設返回 true。

KafkaTemplate

KafkaTemplate 現在支援非事務性發布與事務性發布。有關更多資訊,請參閱KafkaTemplate 事務性和非事務性發布

AggregatingReplyingKafkaTemplate

releaseStrategy 現在是一個 BiConsumer。它現在在超時後(以及記錄到達時)呼叫;第二個引數在超時後呼叫時為 true

有關更多資訊,請參閱聚合多個回覆

監聽器容器

ContainerProperties 提供了一個 authorizationExceptionRetryInterval 選項,允許監聽器容器在 KafkaConsumer 丟擲任何 AuthorizationException 後重試。有關更多資訊,請參閱其 Javadoc 和使用 KafkaMessageListenerContainer

@KafkaListener

@KafkaListener 註解有一個新屬性 splitIterables;預設為 true。當回覆監聽器返回 Iterable 時,此屬性控制返回結果是作為單個記錄傳送還是為每個元素髮送一個記錄。有關更多資訊,請參閱使用 @SendTo 轉發監聽器結果

批處理監聽器現在可以配置 BatchToRecordAdapter;例如,這允許在事務中處理批處理,而監聽器一次獲取一條記錄。使用預設實現,ConsumerRecordRecoverer 可用於處理批處理中的錯誤,而無需停止整個批處理的程序 - 這在使用事務時可能很有用。有關更多資訊,請參閱使用批處理監聽器的事務

Kafka Streams

StreamsBuilderFactoryBean 接受一個新的屬性 KafkaStreamsInfrastructureCustomizer。這允許在建立流之前配置構建器和/或拓撲。有關更多資訊,請參閱Spring 管理

2.2 與 2.3 之間的更改

本節涵蓋從 2.2 版到 2.3 版的更改。

提示、技巧和示例

已新增新章節提示、技巧和示例。請提交 GitHub 問題和/或拉取請求以新增更多條目到該章節。

Kafka 客戶端版本

此版本需要 2.3.0 或更高版本的 kafka-clients

類/包更改

TopicPartitionInitialOffset 已棄用,取而代之的是 TopicPartitionOffset

配置更改

從 2.3.4 版本開始,missingTopicsFatal 容器屬性預設值為 false。當此屬性為 true 時,如果代理關閉,應用程式將無法啟動;許多使用者受到此更改的影響;鑑於 Kafka 是一個高可用平臺,我們沒有預料到在沒有活動代理的情況下啟動應用程式會是一個常見用例。

生產者和消費者工廠更改

DefaultKafkaProducerFactory 現在可以配置為為每個執行緒建立一個生產者。您還可以在建構函式中提供 Supplier<Serializer> 例項,作為配置類(需要無參建構函式)或使用 Serializer 例項(然後由所有生產者共享)的替代方案。有關更多資訊,請參閱使用 DefaultKafkaProducerFactory

DefaultKafkaConsumerFactory 中提供了相同的選項,使用 Supplier<Deserializer> 例項。有關更多資訊,請參閱使用 KafkaMessageListenerContainer

監聽器容器更改

以前,當使用監聽器介面卡(例如 @KafkaListener)呼叫監聽器時,錯誤處理程式會收到 ListenerExecutionFailedException(實際監聽器異常作為 cause)。由原生 GenericMessageListener 丟擲的異常會原封不動地傳遞給錯誤處理程式。現在 ListenerExecutionFailedException 始終是引數(實際監聽器異常作為 cause),它提供了對容器 group.id 屬性的訪問。

因為監聽器容器有自己的提交偏移量機制,所以它傾向於 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIGfalse。現在它會自動將其設定為 false,除非在消費者工廠或容器的消費者屬性覆蓋中明確設定。

ackOnError 屬性現在預設為 false

現在可以在監聽器方法中獲取消費者的 group.id 屬性。有關更多資訊,請參閱獲取消費者 group.id

容器有一個新屬性 recordInterceptor,允許在呼叫監聽器之前檢查或修改記錄。還提供了一個 CompositeRecordInterceptor,以防您需要呼叫多個攔截器。有關更多資訊,請參閱訊息監聽器容器

ConsumerSeekAware 具有新的方法,允許您相對於開始、結束或當前位置執行查詢,並查詢大於或等於時間戳的第一個偏移量。有關更多資訊,請參閱[查詢]

現在提供了一個方便的類 AbstractConsumerSeekAware 來簡化查詢。有關更多資訊,請參閱[查詢]

ContainerProperties 提供了一個 idleBetweenPolls 選項,允許監聽器容器中的主迴圈在 KafkaConsumer.poll() 呼叫之間休眠。有關更多資訊,請參閱其 Javadoc 和使用 KafkaMessageListenerContainer

當使用 AckMode.MANUAL(或 MANUAL_IMMEDIATE)時,您現在可以透過在 Acknowledgment 上呼叫 nack 來導致重新傳遞。有關更多資訊,請參閱提交偏移量

現在可以使用 Micrometer Timer 監控監聽器效能。有關更多資訊,請參閱監控

容器現在釋出與啟動相關的額外消費者生命週期事件。有關更多資訊,請參閱應用程式事件

事務批處理監聽器現在可以支援殭屍圍欄。有關更多資訊,請參閱事務

監聽器容器工廠現在可以配置 ContainerCustomizer,以便在建立和配置每個容器後進一步配置它們。有關更多資訊,請參閱容器工廠

ErrorHandler 更改

SeekToCurrentErrorHandler 現在將某些異常視為致命異常,並停用它們的重試,在第一次失敗時呼叫恢復器。

SeekToCurrentErrorHandlerSeekToCurrentBatchErrorHandler 現在可以配置為在傳遞嘗試之間應用 BackOff(執行緒睡眠)。

從 2.3.2 版本開始,當錯誤處理程式在恢復失敗記錄後返回時,恢復的記錄的偏移量將被提交。

DeadLetterPublishingRecoverer 在與 ErrorHandlingDeserializer 結合使用時,現在將傳送到死信主題的訊息有效負載設定為無法反序列化的原始值。以前,它是 null,使用者程式碼需要從訊息頭中提取 DeserializationException。有關更多資訊,請參閱釋出死信記錄

TopicBuilder

提供了一個新類 TopicBuilder,用於更方便地建立 NewTopic @Bean,用於自動主題配置。有關更多資訊,請參閱[配置主題]

Kafka Streams 更改

您現在可以對 @EnableKafkaStreams 建立的 StreamsBuilderFactoryBean 進行額外配置。有關更多資訊,請參閱流配置

現在提供了 RecoveringDeserializationExceptionHandler,它允許恢復具有反序列化錯誤的記錄。它可以與 DeadLetterPublishingRecoverer 結合使用,將這些記錄傳送到死信主題。有關更多資訊,請參閱從反序列化異常中恢復

已提供 HeaderEnricher 轉換器,使用 SpEL 生成頭值。有關更多資訊,請參閱Header Enricher

已提供 MessagingTransformer。這允許 Kafka Streams 拓撲與 Spring Messaging 元件(例如 Spring Integration 流)進行互動。有關更多資訊,請參閱MessagingProcessorKStream 呼叫 Spring Integration 流

JSON 元件更改

現在所有 JSON 感知元件都預設配置了一個由 JacksonUtils.enhancedObjectMapper() 生成的 Jackson ObjectMapperJsonDeserializer 現在提供了基於 TypeReference 的建構函式,以便更好地處理目標泛型容器型別。此外,還引入了一個 JacksonMimeTypeModule,用於將 org.springframework.util.MimeType 序列化為純字串。有關更多資訊,請參閱其 JavaDocs 和序列化、反序列化和訊息轉換

已提供 ByteArrayJsonMessageConverter 以及所有 Json 轉換器的新超類 JsonMessageConverter。此外,現在還提供了 StringOrBytesSerializer;它可以序列化 ProducerRecord 中的 byte[]BytesString 值。有關更多資訊,請參閱Spring Messaging 訊息轉換

JsonSerializerJsonDeserializerJsonSerde 現在具有流式 API,可簡化程式設計配置。有關更多資訊,請參閱 javadocs、序列化、反序列化和訊息轉換以及流 JSON 序列化和反序列化

ReplyingKafkaTemplate

當回覆超時時,future 會以 KafkaReplyTimeoutException 而不是 KafkaException 異常完成。

此外,現在提供了一個過載的 sendAndReceive 方法,允許根據每個訊息指定回覆超時。

AggregatingReplyingKafkaTemplate

透過聚合來自多個接收器的回覆來擴充套件 ReplyingKafkaTemplate。有關更多資訊,請參閱聚合多個回覆

事務更改

您現在可以在 KafkaTemplateKafkaTransactionManager 上覆蓋生產者工廠的 transactionIdPrefix。有關更多資訊,請參閱transactionIdPrefix

新委託序列化器/反序列化器

該框架現在提供了一個委託 SerializerDeserializer,利用標頭檔案來啟用生產和消費具有多種鍵/值型別的記錄。有關更多資訊,請參閱委託序列化器和反序列化器

新重試反序列化器

該框架現在提供了一個委託 RetryingDeserializer,用於在可能發生網路問題等瞬時錯誤時重試序列化。有關更多資訊,請參閱重試反序列化器

2.1 與 2.2 之間的更改

Kafka 客戶端版本

此版本需要 2.0.0 或更高版本的 kafka-clients

類和包更改

ContainerProperties 類已從 org.springframework.kafka.listener.config 移動到 org.springframework.kafka.listener

AckMode 列舉已從 AbstractMessageListenerContainer 移動到 ContainerProperties

setBatchErrorHandler()setErrorHandler() 方法已從 ContainerProperties 移動到 AbstractMessageListenerContainerAbstractKafkaListenerContainerFactory

回滾後處理

提供了一個新的 AfterRollbackProcessor 策略。有關更多資訊,請參閱回滾後處理器

ConcurrentKafkaListenerContainerFactory 更改

您現在可以使用 ConcurrentKafkaListenerContainerFactory 建立和配置任何 ConcurrentMessageListenerContainer,而不僅僅是用於 @KafkaListener 註解的容器。有關更多資訊,請參閱容器工廠

監聽器容器更改

已新增一個新的容器屬性 (missingTopicsFatal)。有關更多資訊,請參閱使用 KafkaMessageListenerContainer

當消費者停止時,現在會發出 ConsumerStoppedEvent。有關更多資訊,請參閱執行緒安全

批處理監聽器可以選擇接收完整的 ConsumerRecords<?, ?> 物件,而不是 List<ConsumerRecord<?, ?>。有關更多資訊,請參閱[批處理監聽器]

DefaultAfterRollbackProcessorSeekToCurrentErrorHandler 現在可以恢復(跳過)持續失敗的記錄,並且預設在 10 次失敗後執行此操作。它們可以配置為將失敗的記錄釋出到死信主題。

從 2.2.4 版本開始,消費者組 ID 可以在選擇死信主題名稱時使用。

已新增 ConsumerStoppingEvent。有關更多資訊,請參閱應用程式事件

當容器配置為 AckMode.MANUAL_IMMEDIATE 時,SeekToCurrentErrorHandler 現在可以配置為提交恢復的記錄的偏移量(從 2.2.4 開始)。

@KafkaListener 更改

您現在可以透過在註解上設定屬性來覆蓋監聽器容器工廠的 concurrencyautoStartup 屬性。您現在可以新增配置以確定哪些頭(如果有)被複制到回覆訊息中。有關更多資訊,請參閱@KafkaListener 註解

您現在可以在自己的註解上將 @KafkaListener 用作元註解。有關更多資訊,請參閱@KafkaListener 作為元註解

現在更容易配置用於 @Payload 驗證的 Validator。有關更多資訊,請參閱@KafkaListener @Payload 驗證

您現在可以直接在註解上指定 kafka 消費者屬性;這些屬性將覆蓋消費者工廠中定義的同名屬性(從 2.2.4 版本開始)。有關更多資訊,請參閱註解屬性

頭對映更改

型別為 MimeTypeMediaType 的頭現在在 RecordHeader 值中對映為簡單字串。以前,它們被對映為 JSON,並且只有 MimeType 被解碼。MediaType 無法解碼。它們現在是簡單的字串,以實現互操作性。

此外,JsonKafkaHeaderMapper 有一個新的 addToStringClasses 方法,允許指定應使用 toString() 而不是 JSON 對映的型別。有關更多資訊,請參閱訊息頭

嵌入式 Kafka 更改

KafkaEmbedded 類及其 KafkaRule 介面已棄用,取而代之的是 EmbeddedKafkaBroker 及其 JUnit 4 EmbeddedKafkaRule 包裝器。@EmbeddedKafka 註解現在填充一個 EmbeddedKafkaBroker bean,而不是已棄用的 KafkaEmbedded。此更改允許在 JUnit 5 測試中使用 @EmbeddedKafka@EmbeddedKafka 註解現在具有 ports 屬性,用於指定填充 EmbeddedKafkaBroker 的埠。有關更多資訊,請參閱測試應用程式

JsonSerializer/Deserializer 增強功能

您現在可以透過使用生產者和消費者屬性來提供型別對映資訊。

反序列化器上提供了新的建構函式,允許使用提供的目標型別覆蓋型別頭資訊。

JsonDeserializer 現在預設刪除任何型別資訊頭。

您現在可以配置 JsonDeserializer 以透過使用 Kafka 屬性忽略型別資訊頭(從 2.2.3 開始)。

有關更多資訊,請參閱序列化、反序列化和訊息轉換

Kafka Streams 更改

流配置 bean 現在必須是 KafkaStreamsConfiguration 物件,而不是 StreamsConfig 物件。

StreamsBuilderFactoryBean 已從包 ...core 移動到 ...config

引入了 KafkaStreamBrancher,以改善在 KStream 例項之上構建條件分支時的使用者體驗。

有關更多資訊,請參閱Apache Kafka Streams 支援配置

事務 ID

當監聽器容器啟動事務時,transactional.id 現在是 transactionIdPrefix 附加 <group.id>.<topic>.<partition>。此更改允許正確隔離殭屍程序,如此處所述

2.0 與 2.1 之間的更改

Kafka 客戶端版本

此版本需要 1.0.0 或更高版本的 kafka-clients

1.1.x 客戶端在 2.2 版本中原生支援。

JSON 改進

StringJsonMessageConverterJsonSerializer 現在在 Headers 中新增型別資訊,允許轉換器和 JsonDeserializer 在接收時建立特定型別,基於訊息本身而不是固定的配置型別。有關更多資訊,請參閱序列化、反序列化和訊息轉換

容器停止錯誤處理程式

現在為記錄和批處理監聽器提供了容器錯誤處理程式,它們將監聽器丟擲的任何異常視為致命錯誤,並停止容器。有關更多資訊,請參閱處理異常

暫停和恢復容器

監聽器容器現在具有 pause()resume() 方法(從 2.1.3 版本開始)。有關更多資訊,請參閱暫停和恢復監聽器容器

有狀態重試

從 2.1.3 版本開始,您可以配置有狀態重試。有關更多資訊,請參閱有狀態重試

客戶端 ID

從版本 2.1.1 開始,您現在可以在 @KafkaListener 上設定 client.id 字首。以前,為了自定義客戶端 ID,每個監聽器都需要單獨的消費者工廠(和容器工廠)。當您使用併發時,字首會以 -n 作為字尾,以提供唯一的客戶端 ID。

日誌記錄偏移量提交

預設情況下,主題偏移量提交的日誌記錄以 DEBUG 日誌級別執行。從版本 2.1.2 開始,ContainerProperties 中的一個新屬性 commitLogLevel 允許您指定這些訊息的日誌級別。有關更多資訊,請參閱使用 KafkaMessageListenerContainer

預設 @KafkaHandler

從版本 2.1.3 開始,您可以將類級別 @KafkaListener 上的一個 @KafkaHandler 註解指定為預設註解。有關更多資訊,請參閱類上的 @KafkaListener

ReplyingKafkaTemplate

從版本 2.1.3 開始,提供了一個 KafkaTemplate 的子類來支援請求/回覆語義。有關更多資訊,請參閱使用 ReplyingKafkaTemplate

ChainedKafkaTransactionManager

版本 2.1.3 引入了 ChainedKafkaTransactionManager。(現已棄用)。

從 2.0 遷移指南

請參閱2.0 到 2.1 遷移指南。

1.3 和 2.0 之間的變化

Spring Framework 和 Java 版本

Spring for Apache Kafka 專案現在需要 Spring Framework 5.0 和 Java 8。

@KafkaListener 變更

您現在可以使用 @SendTo 註解 @KafkaListener 方法(以及類和 @KafkaHandler 方法)。如果方法返回結果,它將被轉發到指定的主題。有關更多資訊,請參閱使用 @SendTo 轉發監聽器結果

訊息監聽器

訊息監聽器現在可以感知 Consumer 物件。有關更多資訊,請參閱[訊息監聽器]

使用 ConsumerAwareRebalanceListener

重新平衡監聽器現在可以在重新平衡通知期間訪問 Consumer 物件。有關更多資訊,請參閱重新平衡監聽器

1.2 和 1.3 之間的變化

事務支援

0.11.0.0 客戶端庫增加了對事務的支援。KafkaTransactionManager 和其他事務支援已新增。有關更多資訊,請參閱事務

Header 支援

0.11.0.0 客戶端庫增加了對訊息 header 的支援。這些現在可以對映到 spring-messaging MessageHeaders 和從 spring-messaging MessageHeaders 對映。有關更多資訊,請參閱訊息 Header

建立主題

0.11.0.0 客戶端庫提供了一個 AdminClient,您可以使用它來建立主題。KafkaAdmin 使用此客戶端自動新增定義為 @Bean 例項的主題。

Kafka 時間戳支援

KafkaTemplate 現在支援一個 API 來新增帶有時間戳的記錄。已引入與 timestamp 支援相關的新 KafkaHeaders。此外,還添加了新的 KafkaConditions.timestamp()KafkaMatchers.hasTimestamp() 測試實用程式。有關更多詳細資訊,請參閱使用 KafkaTemplate@KafkaListener 註解測試應用程式

@KafkaListener 變更

您現在可以配置 KafkaListenerErrorHandler 來處理異常。有關更多資訊,請參閱處理異常

預設情況下,@KafkaListenerid 屬性現在用作 group.id 屬性,覆蓋消費者工廠中配置的屬性(如果存在)。此外,您可以在註解上顯式配置 groupId。以前,您需要一個單獨的容器工廠(和消費者工廠)才能為監聽器使用不同的 group.id 值。要恢復使用工廠配置的 group.id 的先前行為,請將註解上的 idIsGroup 屬性設定為 false

@EmbeddedKafka 註解

為方便起見,提供了一個測試類級別 @EmbeddedKafka 註解,用於將 KafkaEmbedded 註冊為 bean。有關更多資訊,請參閱測試應用程式

Kerberos 配置

現在提供 Kerberos 配置支援。有關更多資訊,請參閱JAAS 和 Kerberos

1.1 和 1.2 之間的變化

此版本使用 0.10.2.x 客戶端。

1.0 和 1.1 之間的變化

Kafka 客戶端

此版本使用 Apache Kafka 0.10.x.x 客戶端。

批處理監聽器

監聽器可以配置為接收 consumer.poll() 操作返回的整個訊息批次,而不是一次接收一條。

空有效載荷

當您使用日誌壓縮時,空有效載荷用於“刪除”鍵。

初始偏移量

當顯式分配分割槽時,您現在可以配置相對於消費者組當前位置的初始偏移量,而不是絕對或相對於當前末尾的偏移量。

查詢

您現在可以查詢每個主題或分割槽的位置。當使用組管理並且 Kafka 分配分割槽時,您可以使用它在初始化期間設定初始位置。當檢測到空閒容器時,或者在應用程式執行的任何任意點,您也可以查詢。有關更多資訊,請參閱[查詢]

© . This site is unofficial and not affiliated with VMware.