配置
從 2.9 版本開始,對於預設配置,應在 @Configuration
註解的類中使用 @EnableKafkaRetryTopic
註解。這使得該功能能夠正確啟動,並允許注入該功能的一些元件以便在執行時查詢。
如果您新增此註解,則無需再新增 @EnableKafka ,因為 @EnableKafkaRetryTopic 是透過元註解方式包含 @EnableKafka 的。 |
此外,從該版本開始,為了對該功能的元件和全域性特性進行更高階的配置,應在 @Configuration
類中繼承 RetryTopicConfigurationSupport
類,並重寫相應的方法。更多詳情請參考 配置全域性設定和特性。
預設情況下,重試 topic 的容器與主容器具有相同的併發度。從 3.0 版本開始,您可以為重試容器設定不同的 concurrency
(可以在註解上設定,也可以在 RetryTopicConfigurationBuilder
中設定)。
只能使用上述技術中的一種,並且只能有一個 @Configuration 類繼承 RetryTopicConfigurationSupport 。 |
使用 @RetryableTopic
註解
要為 @KafkaListener
註解的方法配置重試 topic 和 DLT,只需在該方法上新增 @RetryableTopic
註解,Spring for Apache Kafka 將會使用預設配置啟動所有必要的 topic 和消費者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
從 3.2 開始,對類上 @KafkaListener
的 @RetryableTopic
支援為
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
您可以在同一個類中指定一個方法來處理 DLT 訊息,只需使用 @DltHandler
註解標記該方法。如果沒有提供 DltHandler 方法,則會建立一個預設消費者,該消費者僅記錄消費情況。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果您未指定 kafkaTemplate 名稱,將查詢名稱為 defaultRetryTopicKafkaTemplate 的 bean。如果未找到 bean,則會丟擲異常。 |
從 3.0 版本開始,@RetryableTopic
註解可以用作自定義註解的元註解;例如
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
使用 RetryTopicConfiguration
bean
您還可以透過在 @Configuration
註解的類中建立 RetryTopicConfiguration
bean 來配置非阻塞重試支援。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
這將為所有使用 @KafkaListener
註解的方法中的 topic 建立重試 topic 和一個 DLT,以及相應的消費者,並使用預設配置。KafkaTemplate
例項是訊息轉發所必需的。
為了更精細地控制如何處理每個 topic 的非阻塞重試,可以提供多個 RetryTopicConfiguration
bean。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics(List.of("my-topic", "my-other-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics(List.of("my-topic", "my-other-topic"))
.retryOn(MyException.class)
.create(template);
}
重試 topic 和 DLT 的消費者將被分配到一個消費者組,其 group id 是您在 @KafkaListener 註解的 groupId 引數中提供的值與 topic 字尾的組合。如果您未提供任何值,它們將都屬於同一個組,並且重試 topic 上的重新平衡將導致主 topic 上不必要的重新平衡。 |
如果消費者配置了 ErrorHandlingDeserializer 來處理反序列化異常,那麼配置 KafkaTemplate 及其生產者使用一個能夠處理普通物件以及由反序列化異常產生的原始 byte[] 值的序列化器是很重要的。模板的泛型值型別應該是 Object 。一種技術是使用 DelegatingByTypeSerializer ;示例如下 |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
可以在同一個 topic 上使用多個 @KafkaListener 註解,無論是否手動分配分割槽,並且可以結合非阻塞重試,但對於給定的 topic 只會使用一個配置。最好使用單個 RetryTopicConfiguration bean 來配置此類 topic;如果在同一個 topic 上使用了多個 @RetryableTopic 註解,它們都應該具有相同的值,否則其中一個將被應用於該 topic 的所有監聽器,而其他註解的值將被忽略。 |
配置全域性設定和特性
自 2.9 起,已移除了之前透過 bean 覆蓋來配置元件的方法(由於前述 API 的實驗性,此更改未進行廢棄提示)。這不會改變使用 RetryTopicConfiguration
bean 的方法,僅影響基礎設施元件的配置。現在,應在一個(單個)@Configuration
類中繼承 RetryTopicConfigurationSupport
類,並重寫相應的方法。示例如下
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
當使用這種配置方法時,不應使用 @EnableKafkaRetryTopic 註解,以防止因 bean 重複導致上下文啟動失敗。請改為使用簡單的 @EnableKafka 註解。 |
當 autoCreateTopics
為 true 時,主 topic 和重試 topic 將使用指定的分割槽數和副本因子建立。從 3.0 版本開始,預設的副本因子為 -1
,表示使用 broker 的預設設定。如果您的 broker 版本早於 2.4,則需要設定一個明確的值。要覆蓋特定 topic(例如主 topic 或 DLT)的這些值,只需新增一個帶有所需屬性的 NewTopic
@Bean
;這將覆蓋自動建立屬性。
預設情況下,記錄使用接收到的原始記錄的分割槽釋出到重試 topic。如果重試 topic 的分割槽少於主 topic,您應該適當地配置框架;示例如下。 |
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
函式的引數是消費者記錄和下一個 topic 的名稱。您可以返回一個特定的分割槽號,或者返回 null
表示應由 KafkaProducer
決定分割槽。
預設情況下,當記錄透過重試 topic 轉換時,重試頭(嘗試次數、時間戳)的所有值都會被保留。從 2.9.6 版本開始,如果您只想保留這些頭的最後一個值,請使用上面所示的 configureDeadLetterPublishingContainerFactory()
方法將工廠的 retainAllRetryHeaderValues
屬性設定為 false
。
查詢 RetryTopicConfiguration
嘗試提供 RetryTopicConfiguration
的例項,可以透過 @RetryableTopic
註解建立,或者在沒有註解時從 bean 容器中獲取。
如果在容器中找到了 bean,會進行檢查以確定提供的 topic 是否應該由任何此類例項處理。
如果提供了 @RetryableTopic
註解,則會查詢帶有 @DltHandler
註解的方法。
從 3.2 開始,提供了新的 API 來在 @RetryableTopic
註解在類上時建立 RetryTopicConfiguration
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}
@RetryableTopic
public static class AnnotatedClass {
// NoOps
}