特性

大多數特性對於 `@RetryableTopic` 註解和 `RetryTopicConfiguration` bean 都可用。

BackOff 配置

BackOff 配置依賴於 Spring Retry 專案中的 `BackOffPolicy` 介面。

包括

  • 固定間隔 Back Off

  • 指數間隔 Back Off

  • 隨機指數間隔 Back Off

  • 均勻隨機間隔 Back Off

  • 無 Back Off

  • 自定義 Back Off

@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(4)
            .create(template);
}

你還可以提供 Spring Retry 的 `SleepingBackOffPolicy` 介面的自定義實現

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackoff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}
預設的 back off 策略是 `FixedBackOffPolicy`,最多重試 3 次,間隔 1000ms。
`ExponentialBackOffPolicy` 的預設最大延遲為 30 秒。如果你的 back off 策略需要大於此值的延遲,請相應調整 `maxDelay` 屬性。
第一次嘗試會計入 `maxAttempts`,因此如果你提供 `maxAttempts` 值為 4,則會有原始嘗試加上 3 次重試。

全域性超時

你可以為重試過程設定全域性超時時間。如果達到該時間,消費者下次丟擲異常時,訊息將直接傳送到 DLT,或者如果沒有 DLT 可用,則直接結束處理。

@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}
預設情況下不設定超時,這也可以透過提供 -1 作為超時值來實現。

異常分類器

你可以指定哪些異常需要重試,哪些不需要。你還可以設定遍歷原因以查詢巢狀異常。

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
預設行為是對所有異常進行重試,且不遍歷原因。

自 2.8.3 版本起,有一個全域性致命異常列表,這些異常會導致記錄直接傳送到 DLT 而不進行任何重試。請參閱 DefaultErrorHandler 檢視預設的致命異常列表。你可以透過在繼承 `RetryTopicConfigurationSupport` 的 `@Configuration` 類中覆蓋 `configureNonBlockingRetries` 方法來向此列表新增或從中移除異常。更多資訊請參閱 Configuring Global Settings and Features

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要停用致命異常分類,只需清空提供的列表即可。

包含和排除主題

你可以透過 `.includeTopic(String topic)`、`.includeTopics(Collection<String> topics)`、`.excludeTopic(String topic)` 和 `.excludeTopics(Collection<String> topics)` 方法決定哪些主題將被 `RetryTopicConfiguration` bean 處理,哪些不被處理。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
預設行為是包含所有主題。

主題自動建立

除非另有說明,否則框架將使用 `KafkaAdmin` bean 消費的 `NewTopic` bean 自動建立所需的主題。你可以指定建立主題時的分割槽數和副本因子,並且可以關閉此功能。從 3.0 版本開始,預設副本因子為 -1,表示使用 broker 的預設設定。如果你的 broker 版本低於 2.4,則需要設定一個明確的值。

請注意,如果你不使用 Spring Boot,則必須提供一個 `KafkaAdmin` bean 才能使用此功能。
@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
預設情況下,主題會自動建立,分割槽數為 1,副本因子為 -1(表示使用 broker 的預設設定)。如果你的 broker 版本低於 2.4,則需要設定一個明確的值。

失敗頭部管理

在考慮如何管理失敗頭部(原始頭部和異常頭部)時,框架委託給 `DeadLetterPublishingRecoverer` 來決定是附加還是替換頭部。

預設情況下,它明確將 `appendOriginalHeaders` 設定為 `false`,並讓 `stripPreviousExceptionHeaders` 使用 `DeadLetterPublishingRecover` 的預設設定。

這意味著在預設配置下,只保留第一個“原始”頭部和最後一個異常頭部。這是為了避免在涉及多次重試時建立過大的訊息(例如由於堆疊跟蹤頭部)。

更多資訊請參閱 管理死信記錄頭部

要重新配置框架以便對這些屬性使用不同的設定,請透過在繼承 `RetryTopicConfigurationSupport` 的 `@Configuration` 類中覆蓋 `configureCustomizers` 方法來配置 `DeadLetterPublishingRecoverer` 定製器。更多詳細資訊請參閱 配置全域性設定和特性

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}

從 2.8.4 版本開始,如果你希望新增自定義頭部(除了工廠新增的重試資訊頭部),你可以向工廠新增一個 `headersFunction` - `factory.setHeadersFunction((rec, ex) -> { ... })`。

預設情況下,新增的任何頭部都是累積的 - Kafka 頭部可以包含多個值。從 2.9.5 版本開始,如果函式返回的 Headers 包含型別為 `DeadLetterPublishingRecoverer.SingleRecordHeader` 的頭部,則該頭部任何現有值都將被移除,只保留新的單個值。

自定義 DeadLetterPublishingRecoverer

失敗頭部管理 中所示,可以定製框架建立的預設 `DeadLetterPublishingRecoverer` 例項。然而,對於某些用例,需要繼承 `DeadLetterPublishingRecoverer`,例如覆蓋 `createProducerRecord()` 方法來修改傳送到重試(或死信)主題的內容。從 3.0.9 版本開始,你可以覆蓋 `RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory()` 方法來提供一個 `DeadLetterPublisherCreator` 例項,例如

@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
        configureDeadLetterPublishingContainerFactory() {

    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}

建議你在構建自定義例項時使用提供的解析器。

基於丟擲的異常將訊息路由到自定義 DLT

從 3.2.0 版本開始,可以根據處理訊息過程中丟擲的異常型別將訊息路由到自定義 DLT。為此,需要指定路由。路由定製包括額外目標的規範。目標又包括兩個設定:`suffix` 和 `exceptions`。當丟擲 `exceptions` 中指定的異常型別時,包含 `suffix` 的 DLT 將被視為訊息的目標主題,優先順序高於通用 DLT。以下是使用註解或 `RetryTopicConfiguration` bean 進行配置的示例

@RetryableTopic(exceptionBasedDltRouting = {
    @ExceptionBasedDltDestination(
        suffix = "-deserialization", exceptions = {DeserializationException.class}
    )}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
            .create(template);
}

在自定義 DLT 名稱中,`suffix` 出現在通用的 `dltTopicSuffix` 之前。考慮到提供的示例,導致 `DeserializationException` 的訊息將被路由到 `my-annotated-topic-deserialization-dlt`,而不是 `my-annotated-topic-dlt`。自定義 DLT 的建立將遵循與 主題自動建立 中所述相同的規則。