功能

大多數功能都適用於 @RetryableTopic 註解和 RetryTopicConfiguration bean。

退避配置

退避配置依賴於 Spring Retry 專案中的 BackOffPolicy 介面。

它包括

  • 固定退避

  • 指數退避

  • 隨機指數退避

  • 均勻隨機退避

  • 無退避

  • 自定義退避

@RetryableTopic(attempts = 5,
    backOff = @BackOff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(4)
            .create(template);
}

您還可以提供 Spring Retry 的 SleepingBackOffPolicy 介面的自定義實現。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackoff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}
預設的退避策略是 FixedBackOffPolicy,最大嘗試次數為 3 次,間隔為 1000 毫秒。
ExponentialBackOffPolicy 的預設最大延遲為 30 秒。如果您的退避策略需要大於此值的延遲,請相應地調整 maxDelay 屬性。
第一次嘗試計入 maxAttempts,因此如果您提供 maxAttempts 值為 4,則將有原始嘗試加 3 次重試。

全域性超時

您可以設定重試過程的全域性超時。如果達到該時間,下一次消費者丟擲異常時,訊息將直接進入 DLT,如果 DLT 不可用,則僅結束處理。

@RetryableTopic(backOff = @BackOff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}
預設情況下未設定超時,也可以透過提供 -1 作為超時值來實現。

異常分類器

您可以指定要重試哪些異常,不重試哪些異常。您還可以將其設定為遍歷原因以查詢巢狀異常。

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
預設行為是重試所有異常,而不遍歷原因。

自 2.8.3 版本起,有一個全域性致命異常列表,它將導致記錄被髮送到 DLT 而不進行任何重試。有關致命異常的預設列表,請參閱 DefaultErrorHandler。您可以透過覆蓋擴充套件 RetryTopicConfigurationSupport@Configuration 類中的 configureNonBlockingRetries 方法來向此列表新增或從中刪除異常。有關更多資訊,請參閱 配置全域性設定和功能

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要停用致命異常分類,只需清除提供的列表。

包含和排除主題

您可以透過 .includeTopic(String topic)、.includeTopics(Collection<String> topics)、.excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法決定哪些主題將由 RetryTopicConfiguration bean 處理,哪些不處理。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
預設行為是包含所有主題。

主題自動建立

除非另有說明,否則框架將使用由 KafkaAdmin bean 消費的 NewTopic bean 自動建立所需的主題。您可以指定建立主題的分割槽數和複製因子,並且可以關閉此功能。從 3.0 版本開始,預設複製因子為 -1,表示使用 broker 預設值。如果您的 broker 版本早於 2.4,則需要設定一個明確的值。

請注意,如果您不使用 Spring Boot,則必須提供一個 KafkaAdmin bean 才能使用此功能。
@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
預設情況下,主題以一個分割槽和 -1 的複製因子(表示使用 broker 預設值)自動建立。如果您的 broker 版本早於 2.4,則需要設定一個明確的值。

故障頭管理

在考慮如何管理故障頭(原始頭和異常頭)時,框架委託給 DeadLetterPublishingRecoverer 來決定是追加還是替換頭。

預設情況下,它顯式將 appendOriginalHeaders 設定為 false,並將 stripPreviousExceptionHeaders 留給 DeadLetterPublishingRecover 使用的預設值。

這意味著在預設配置下,只保留第一個“原始”和最後一個異常頭。這是為了避免在涉及許多重試步驟時建立過大的訊息(例如,由於堆疊跟蹤頭)。

有關更多資訊,請參閱 管理死信記錄頭

要重新配置框架以將不同設定用於這些屬性,請透過覆蓋擴充套件 RetryTopicConfigurationSupport@Configuration 類中的 configureCustomizers 方法來配置 DeadLetterPublishingRecoverer 自定義器。有關更多詳細資訊,請參閱 配置全域性設定和功能

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}

從 2.8.4 版本開始,如果您希望新增自定義頭(除了工廠新增的重試資訊頭),您可以向工廠新增一個 headersFunction - factory.setHeadersFunction((rec, ex) -> { ... })

預設情況下,新增的任何頭都將是累積的 - Kafka 頭可以包含多個值。從 2.9.5 版本開始,如果函式返回的 Headers 包含型別為 DeadLetterPublishingRecoverer.SingleRecordHeader 的頭,則該頭的任何現有值將被刪除,並且只保留新的單個值。

自定義 DeadLetterPublishingRecoverer

故障頭管理 中所述,可以自定義框架建立的預設 DeadLetterPublishingRecoverer 例項。但是,對於某些用例,有必要子類化 DeadLetterPublishingRecoverer,例如覆蓋 createProducerRecord() 以修改傳送到重試(或死信)主題的內容。從 3.0.9 版本開始,您可以覆蓋 RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory() 方法以提供 DeadLetterPublisherCreator 例項,例如

@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
        configureDeadLetterPublishingContainerFactory() {

    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}

建議您在構造自定義例項時使用提供的解析器。

基於丟擲異常的訊息路由到自定義 DLT

從 3.2.0 版本開始,可以根據在處理訊息期間丟擲的異常型別將訊息路由到自定義 DLT。為此,需要指定路由。路由自定義包括附加目標的規範。目標反過來又包括兩個設定:suffixexceptions。當丟擲 exceptions 中指定的異常型別時,包含 suffix 的 DLT 將被視為訊息的目標主題,然後才考慮通用 DLT。使用註解或 RetryTopicConfiguration bean 進行配置的示例

@RetryableTopic(exceptionBasedDltRouting = {
    @ExceptionBasedDltDestination(
        suffix = "-deserialization", exceptions = {DeserializationException.class}
    )}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
            .create(template);
}

suffix 在自定義 DLT 名稱中位於通用 dltTopicSuffix 之前。考慮所提供的示例,導致 DeserializationException 的訊息將被路由到 my-annotated-topic-deserialization-dlt 而不是 my-annotated-topic-dlt。自定義 DLT 將按照 主題自動建立 中所述的相同規則建立。

© . This site is unofficial and not affiliated with VMware.