最新動態?
4.0 版本相比 3.3 版本的新特性
本節涵蓋從 3.3 版本到 4.0 版本的變化。有關早期版本的更改,請參閱更改歷史記錄。
Apache Kafka 4.0 客戶端升級
Spring for Apache Kafka 已升級到使用 Apache Kafka 客戶端版本 4.0.0。此次升級帶來了一些重要的變化:
-
所有基於 ZooKeeper 的功能已被刪除,因為 Kafka 4.0 已完全轉換為 KRaft 模式
-
ZooKeeper 依賴項已從專案中移除
-
嵌入式 Kafka 測試框架現在完全使用 KRaft 模式
-
EmbeddedKafkaZKBroker類已被刪除,所有功能現在由EmbeddedKafkaKraftBroker處理
嵌入式 Kafka 測試框架更改
測試基礎架構已顯著更新
-
已刪除
EmbeddedKafkaRuleJUnit 4 規則 -
@EmbeddedKafka註解透過刪除與 ZooKeeper 相關的屬性而得到簡化 -
kraft屬性已被刪除,因為 KRaft 模式現在是唯一選項 -
與 ZooKeeper 相關的屬性,如
zookeeperPort、zkConnectionTimeout和zkSessionTimeout已被刪除 -
KafkaClusterTestKit 匯入現在使用 KRaft 模式的新包
-
一些測試已更新,以解決 KRaft 模式中靜態埠分配的限制
-
已調整測試中的複製因子,以適應 KRaft 要求
Producer 介面更新
已實現 Kafka Producer 介面中的新方法
-
registerMetricForSubscription -
unregisterMetricFromSubscription
已刪除的棄用功能
已刪除幾個棄用項
-
已從執行時提示中刪除已棄用的
partitioner類 -
已刪除使用
String consumerGroupId的已棄用sendOffsetsToTransaction方法
Kafka Streams API 更改
-
KafkaStreamBrancher已更新為使用新的split()和branch()方法,而不是已棄用的branch()方法 -
DeserializationExceptionHandler已更新為使用新的ErrorHandlerContext
與 Apache Kafka 4.0.0 相關的內部 API 更新
-
BrokerAddress類現在使用org.apache.kafka.server.network.BrokerEndPoint而不是已棄用的kafka.cluster.BrokerEndPoint -
GlobalEmbeddedKafkaTestExecutionListener已更新為僅在 KRaft 模式下工作
新的消費者再平衡協議
Spring for Apache Kafka 4.0 支援 Kafka 4.0 的新消費者再平衡協議 - KIP-848。有關詳細資訊,請參閱新消費者再平衡協議文件。
支援多值頭部
JsonKafkaHeaderMapper 和 SimpleKafkaHeaderMapper 支援 Kafka 記錄的多值頭部對映。更多詳細資訊可在支援多值頭部對映中找到。
配置附加的 RecordInterceptor
監聽器容器現在透過 getRecordInterceptor() 支援攔截器自定義。有關詳細資訊,請參閱訊息監聽器容器部分。
批處理監聽器中的每條記錄可觀察性
現在,在使用批處理監聽器時,可以獲取每條記錄的觀察。有關更多資訊,請參閱批處理監聽器的可觀察性。
Kafka 佇列(共享消費者)支援
Spring for Apache Kafka 現在透過共享消費者提供對 Kafka 佇列的早期訪問支援,共享消費者是 Apache Kafka 4.0.0 的一部分並實現了 KIP-932。這使得協作消費成為可能,多個消費者可以同時從相同的分割槽消費,與傳統消費者組相比提供更好的負載分配。有關更多資訊,請參閱Kafka 佇列(共享消費者)。
Jackson 3 支援
Spring for Apache Kafka 現在全面支援 Jackson 3,並與現有的 Jackson 2 支援並行。Jackson 3 在可用時會自動檢測並優先使用,提供增強的效能和現代 JSON 處理能力。
所有 Jackson 2 類現在都有 Jackson 3 對應類,具有一致的命名和改進的型別安全
-
JsonKafkaHeaderMapper替換DefaultKafkaHeaderMapper -
JacksonJsonSerializer/Deserializer替換JsonSerializer/Deserializer -
JacksonJsonSerde替換JsonSerde -
JacksonJsonMessageConverter家族替換JsonMessageConverter家族 -
JacksonProjectingMessageConverter替換ProjectingMessageConverter -
DefaultJacksonJavaTypeMapper替換DefaultJackson2JavaTypeMapper
新的 Jackson 3 類使用 JsonMapper 而不是通用的 ObjectMapper,以增強型別安全,並利用 Jackson 3 改進的模組系統和效能最佳化。
遷移路徑:現有應用程式可與 Jackson 2 保持不變地繼續工作。要遷移到 Jackson 3,只需將 Jackson 3 新增到您的類路徑中,並更新類引用以使用新的 Jackson 3 等效項。當兩個版本都存在時,框架會自動檢測並優先使用 Jackson 3。
向後相容性:所有 Jackson 2 類都已棄用,但仍完全可用。它們將在未來的主要版本中移除。
有關配置示例,請參閱序列化、反序列化和訊息轉換。
Spring Retry 依賴項移除
Spring for Apache Kafka 已移除對 Spring Retry 的依賴,轉而使用 Spring Framework 7 中引入的核心重試支援。這是一個重大更改,影響整個框架的重試配置和 API。
BackOffValuesGenerator,用於預先生成所需的 BackOff 值,現在直接與 Spring Framework 的 BackOff 介面配合使用,而不是 BackOffPolicy。這些值隨後由監聽器基礎設施管理,Spring Retry 不再參與。
從配置角度來看,Spring Kafka 嚴重依賴 Spring Retry 的 @Backoff 註解。由於 Spring Framework 中沒有等效項,該註解已作為 @BackOff 移至 Spring Kafka,並進行了以下改進:
-
統一命名:使用
@BackOff而不是@Backoff以保持一致性 -
表示式評估:所有字串屬性都支援 SpEL 表示式和屬性佔位符
-
持續時間格式支援:字串屬性接受
java.util.Duration格式(例如,“2s”、“500ms”) -
增強文件:改進了 Javadoc,提供了更清晰的解釋
遷移示例
// Before
@RetryableTopic(backoff = @Backoff(delay = 2000, maxDelay = 10000, multiplier = 2))
// After
@RetryableTopic(backOff = @BackOff(delay = 2000, maxDelay = 10000, multiplier = 2))
// With new duration format support
@RetryableTopic(backOff = @BackOff(delayString = "2s", maxDelayString = "10s", multiplier = 2))
// With property placeholders
@RetryableTopic(backOff = @BackOff(delayString = "${retry.delay}", multiplierString = "${retry.multiplier}"))
RetryingDeserializer 不再提供 RecoveryCallback,但提供了一個接受 RetryException 作為輸入的等效函式。這包含丟擲的異常以及重試次數
// Before
retryingDeserializer.setRecoveryCallback(context -> {
return fallbackValue;
});
// After
retryingDeserializer.setRecoveryCallback(retryException -> {
return fallbackValue;
});
BinaryExceptionClassifier 的使用已被新引入的 ExceptionMatcher 取代,後者提供了一個更完善的 API。
其他更改包括:
-
DestinationTopicPropertiesFactory使用ExceptionMatcher而不是BinaryExceptionClassifier -
RetryTopicConfigurationBuilder中的uniformRandomBackoff方法已被棄用,轉而支援抖動 -
錯誤處理工具已更新,以適應新的異常匹配系統
-
Kafka Streams 重試模板現在使用 Spring Framework 的重試支援
應用程式必須更新其配置以使用新的 Spring Framework 重試 API,但重試行為和功能保持不變。