主題命名
重試主題和 DLT 透過在主主題後附加提供或預設的值,然後根據該主題的延遲或索引進行命名。
示例
"my-topic" → "my-topic-retry-0", "my-topic-retry-1", …, "my-topic-dlt"
"my-other-topic" → "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", …, "my-topic-myDltSuffix"
預設行為是為每次嘗試建立單獨的重試主題,並附加索引值:retry-0, retry-1, …, retry-n。因此,預設情況下,重試主題的數量是配置的 `maxAttempts` 減 1。 |
您可以配置字尾,選擇是附加嘗試次數索引還是延遲值,在使用固定回退時使用單個重試主題,以及在使用指數回退時為具有最大間隔的嘗試使用單個重試主題。
重試主題和 DLT 字尾
您可以指定重試主題和 DLT 將使用的字尾。
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
預設字尾分別是 "-retry" 和 "-dlt",用於重試主題和 dlt。 |
附加主題的索引或延遲值
您可以在後綴後附加主題的索引或延遲值。
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
預設行為是使用延遲值作為字尾,但對於具有多個主題的固定延遲配置除外,在這種情況下,主題以後綴加上主題的索引進行命名。 |
固定延遲重試的單個主題
如果您使用固定延遲策略,例如 `FixedBackOffPolicy` 或 `NoBackOffPolicy`,您可以使用單個主題來實現非阻塞重試。這個主題將附加提供或預設的字尾,並且不會附加索引或延遲值。
之前的 `FixedDelayStrategy` 已被棄用,可以使用 `SameIntervalTopicReuseStrategy` 替代。 |
@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.create(template);
}
預設行為是為每次嘗試建立單獨的重試主題,並附加其索引值:retry-0, retry-1, … |
最大間隔指數延遲的單個主題
如果您使用指數回退策略(`ExponentialBackOffPolicy`),您可以使用單個重試主題來完成延遲是配置的 `maxInterval` 的嘗試的非阻塞重試。
這個“最終”重試主題將附加提供或預設的字尾,並會附加索引或 `maxInterval` 值。
透過選擇為具有 `maxInterval` 延遲的重試使用單個主題,配置一個長時間持續重試的指數重試策略可能會變得更可行,因為這種方法不需要大量的重試主題。 |
從 3.2 版本開始,預設行為是在使用指數回退時,對相同間隔重用重試主題;重試主題附加延遲值作為字尾,最後一個重試主題用於相同間隔(對應於 `maxInterval` 延遲)的重用。
例如,當配置指數回退,設定 `initialInterval=1_000`,`multiplier=2` 和 `maxInterval=16_000` 時,為了持續重試一小時,需要將 `maxAttempts` 配置為 229。預設情況下,所需的重試主題將是
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000
當使用重試主題數量等於配置的 `maxAttempts` 減 1 的策略時,最後一個重試主題(對應於 `maxInterval` 延遲)將附加額外的索引作為字尾,例如
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000-0
-
-retry-16000-1
-
-retry-16000-2
-
…
-
-retry-16000-224
如果需要多個主題,可以使用以下配置來完成。
@RetryableTopic(attempts = 230,
backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1_000, 2, 16_000)
.maxAttempts(230)
.useSingleTopicForSameIntervals()
.create(template);
}
自定義命名策略
透過註冊實現 `RetryTopicNamesProviderFactory` 介面的 Bean,可以實現更復雜的命名策略。預設實現是 `SuffixingRetryTopicNamesProviderFactory`,可以透過以下方式註冊不同的實現
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
例如,以下實現除了標準字尾外,還為重試/dlt 主題名稱添加了字首
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if (properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}