主題命名

重試主題和DLT透過在主主題後附加提供的值或預設值進行命名,然後附加該主題的延遲或索引。

示例

"my-topic" → "my-topic-retry-0", "my-topic-retry-1", …​, "my-topic-dlt"

"my-other-topic" → "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", …​, "my-topic-myDltSuffix"

預設行為是為每次嘗試建立單獨的重試主題,並附加一個索引值:retry-0, retry-1, …​, retry-n。因此,預設情況下,重試主題的數量是配置的maxAttempts減1。

重試主題和DLT字尾

您可以指定重試和DLT主題將使用的字尾。

@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .retryTopicSuffix("-my-retry-suffix")
            .dltTopicSuffix("-my-dlt-suffix")
            .create(template);
}
預設字尾分別是重試主題的"-retry"和DLT的"-dlt"。

附加主題索引或延遲

您可以在後綴後面附加主題的索引或延遲值。

@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .suffixTopicsWithIndexValues()
            .create(template);
    }
預設行為是使用延遲值作為字尾,但對於具有多個主題的固定延遲配置,主題會以主題索引作為字尾。

固定延遲重試的單個主題

如果您使用固定延遲策略,例如FixedBackOffPolicyNoBackOffPolicy,您可以使用單個主題來實現非阻塞重試。此主題將以提供的或預設的字尾結尾,並且不會附加索引或延遲值。

以前的FixedDelayStrategy現在已棄用,可以替換為SameIntervalTopicReuseStrategy
@RetryableTopic(backOff = @BackOff(2_000), sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(5)
            .useSingleTopicForSameIntervals()
            .create(template);
}
預設行為是為每次嘗試建立單獨的重試主題,並附加其索引值:retry-0, retry-1, …​

maxInterval 指數延遲的單個主題

如果您使用指數退避策略 (ExponentialBackOffPolicy),您可以使用單個重試主題來實現延遲為配置的maxInterval的嘗試的非阻塞重試。

這個“最終”重試主題將以提供的或預設的字尾結尾,並且會附加索引或maxInterval值。

透過選擇對具有maxInterval延遲的重試使用單個主題,配置一個長時間重試的指數重試策略可能變得更加可行,因為在這種方法中您不需要大量的(重試)主題。

從3.2版本開始,預設行為是在使用指數退避時,重用相同間隔的重試主題,重試主題會以延遲值作為字尾,最後一個重試主題會重用相同間隔(對應於maxInterval延遲)。

例如,當配置指數退避策略,initialInterval=1_000multiplier=2,和maxInterval=16_000,為了持續重試一小時,需要將maxAttempts配置為229,預設情況下所需的重試主題將是

  • -retry-1000

  • -retry-2000

  • -retry-4000

  • -retry-8000

  • -retry-16000

當使用重試主題數量等於配置的maxAttempts減1的策略時,最後一個重試主題(對應於maxInterval延遲)將附加一個額外的索引,如下所示

  • -retry-1000

  • -retry-2000

  • -retry-4000

  • -retry-8000

  • -retry-16000-0

  • -retry-16000-1

  • -retry-16000-2

  • ...​

  • -retry-16000-224

如果需要多個主題,可以使用以下配置實現。

@RetryableTopic(attempts = 230,
    backOff = @BackOff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
    sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1_000, 2, 16_000)
            .maxAttempts(230)
            .useSingleTopicForSameIntervals()
            .create(template);
}

自定義命名策略

更復雜的命名策略可以透過註冊一個實現RetryTopicNamesProviderFactory的bean來實現。預設實現是SuffixingRetryTopicNamesProviderFactory,可以透過以下方式註冊不同的實現

@Override
protected RetryTopicComponentFactory createComponentFactory() {
    return new RetryTopicComponentFactory() {
        @Override
        public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
            return new CustomRetryTopicNamesProviderFactory();
        }
    };
}

例如,以下實現除了標準字尾外,還在重試/dlt主題名稱中添加了一個字首

public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {

    @Override
    public RetryTopicNamesProvider createRetryTopicNamesProvider(
                DestinationTopic.Properties properties) {

        if (properties.isMainEndpoint()) {
            return new SuffixingRetryTopicNamesProvider(properties);
        }
        else {
            return new SuffixingRetryTopicNamesProvider(properties) {

                @Override
                public String getTopicName(String topic) {
                    return "my-prefix-" + super.getTopicName(topic);
                }

            };
        }
    }

}
© . This site is unofficial and not affiliated with VMware.