功能
大多數功能都適用於 @RetryableTopic 註解和 RetryTopicConfiguration bean。
退避配置
退避配置依賴於 Spring Retry 專案中的 BackOffPolicy 介面。
它包括
-
固定退避
-
指數退避
-
隨機指數退避
-
均勻隨機退避
-
無退避
-
自定義退避
@RetryableTopic(attempts = 5,
backOff = @BackOff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(4)
.create(template);
}
您還可以提供 Spring Retry 的 SleepingBackOffPolicy 介面的自定義實現。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackoff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
預設的退避策略是 FixedBackOffPolicy,最大嘗試次數為 3 次,間隔為 1000 毫秒。 |
ExponentialBackOffPolicy 的預設最大延遲為 30 秒。如果您的退避策略需要大於此值的延遲,請相應地調整 maxDelay 屬性。 |
第一次嘗試計入 maxAttempts,因此如果您提供 maxAttempts 值為 4,則將有原始嘗試加 3 次重試。 |
全域性超時
您可以設定重試過程的全域性超時。如果達到該時間,下一次消費者丟擲異常時,訊息將直接進入 DLT,如果 DLT 不可用,則僅結束處理。
@RetryableTopic(backOff = @BackOff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(2_000)
.timeoutAfter(5_000)
.create(template);
}
| 預設情況下未設定超時,也可以透過提供 -1 作為超時值來實現。 |
異常分類器
您可以指定要重試哪些異常,不重試哪些異常。您還可以將其設定為遍歷原因以查詢巢狀異常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
| 預設行為是重試所有異常,而不遍歷原因。 |
自 2.8.3 版本起,有一個全域性致命異常列表,它將導致記錄被髮送到 DLT 而不進行任何重試。有關致命異常的預設列表,請參閱 DefaultErrorHandler。您可以透過覆蓋擴充套件 RetryTopicConfigurationSupport 的 @Configuration 類中的 configureNonBlockingRetries 方法來向此列表新增或從中刪除異常。有關更多資訊,請參閱 配置全域性設定和功能。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
| 要停用致命異常分類,只需清除提供的列表。 |
包含和排除主題
您可以透過 .includeTopic(String topic)、.includeTopics(Collection<String> topics)、.excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法決定哪些主題將由 RetryTopicConfiguration bean 處理,哪些不處理。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
| 預設行為是包含所有主題。 |
主題自動建立
除非另有說明,否則框架將使用由 KafkaAdmin bean 消費的 NewTopic bean 自動建立所需的主題。您可以指定建立主題的分割槽數和複製因子,並且可以關閉此功能。從 3.0 版本開始,預設複製因子為 -1,表示使用 broker 預設值。如果您的 broker 版本早於 2.4,則需要設定一個明確的值。
| 請注意,如果您不使用 Spring Boot,則必須提供一個 KafkaAdmin bean 才能使用此功能。 |
@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
| 預設情況下,主題以一個分割槽和 -1 的複製因子(表示使用 broker 預設值)自動建立。如果您的 broker 版本早於 2.4,則需要設定一個明確的值。 |
故障頭管理
在考慮如何管理故障頭(原始頭和異常頭)時,框架委託給 DeadLetterPublishingRecoverer 來決定是追加還是替換頭。
預設情況下,它顯式將 appendOriginalHeaders 設定為 false,並將 stripPreviousExceptionHeaders 留給 DeadLetterPublishingRecover 使用的預設值。
這意味著在預設配置下,只保留第一個“原始”和最後一個異常頭。這是為了避免在涉及許多重試步驟時建立過大的訊息(例如,由於堆疊跟蹤頭)。
有關更多資訊,請參閱 管理死信記錄頭。
要重新配置框架以將不同設定用於這些屬性,請透過覆蓋擴充套件 RetryTopicConfigurationSupport 的 @Configuration 類中的 configureCustomizers 方法來配置 DeadLetterPublishingRecoverer 自定義器。有關更多詳細資訊,請參閱 配置全域性設定和功能。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
從 2.8.4 版本開始,如果您希望新增自定義頭(除了工廠新增的重試資訊頭),您可以向工廠新增一個 headersFunction - factory.setHeadersFunction((rec, ex) -> { ... })。
預設情況下,新增的任何頭都將是累積的 - Kafka 頭可以包含多個值。從 2.9.5 版本開始,如果函式返回的 Headers 包含型別為 DeadLetterPublishingRecoverer.SingleRecordHeader 的頭,則該頭的任何現有值將被刪除,並且只保留新的單個值。
自定義 DeadLetterPublishingRecoverer
如 故障頭管理 中所述,可以自定義框架建立的預設 DeadLetterPublishingRecoverer 例項。但是,對於某些用例,有必要子類化 DeadLetterPublishingRecoverer,例如覆蓋 createProducerRecord() 以修改傳送到重試(或死信)主題的內容。從 3.0.9 版本開始,您可以覆蓋 RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory() 方法以提供 DeadLetterPublisherCreator 例項,例如
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
建議您在構造自定義例項時使用提供的解析器。
基於丟擲異常的訊息路由到自定義 DLT
從 3.2.0 版本開始,可以根據在處理訊息期間丟擲的異常型別將訊息路由到自定義 DLT。為此,需要指定路由。路由自定義包括附加目標的規範。目標反過來又包括兩個設定:suffix 和 exceptions。當丟擲 exceptions 中指定的異常型別時,包含 suffix 的 DLT 將被視為訊息的目標主題,然後才考慮通用 DLT。使用註解或 RetryTopicConfiguration bean 進行配置的示例
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(template);
}
suffix 在自定義 DLT 名稱中位於通用 dltTopicSuffix 之前。考慮所提供的示例,導致 DeserializationException 的訊息將被路由到 my-annotated-topic-deserialization-dlt 而不是 my-annotated-topic-dlt。自定義 DLT 將按照 主題自動建立 中所述的相同規則建立。