DLT 策略
該框架提供了幾種處理 DLT(死信主題)的策略。你可以提供一個 DLT 處理方法,使用預設的日誌記錄方法,或者完全不使用 DLT。此外,你還可以選擇 DLT 處理失敗時的行為。
DLT 處理方法
你可以指定用於處理主題 DLT 的方法,以及該處理失敗時的行為。
為此,你可以在帶有 @RetryableTopic
註解的類的某個方法上使用 @DltHandler
註解。請注意,該類中所有帶有 @RetryableTopic
註解的方法將使用相同的方法處理 DLT。
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
DLT 處理方法也可以透過 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)
方法提供,將應處理 DLT 訊息的 bean 名稱和方法名稱作為引數傳入。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
如果沒有提供 DLT 處理程式,則使用預設的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod 。 |
從 2.8 版本開始,如果你完全不想在此應用程式中從 DLT 消費,包括透過預設處理程式(或者你希望延遲消費),則可以控制 DLT 容器是否啟動,這獨立於容器工廠的 autoStartup
屬性。
使用 @RetryableTopic
註解時,將 autoStartDltHandler
屬性設定為 false
;使用配置構建器時,使用 autoStartDltHandler(false)
。
之後你可以透過 KafkaListenerEndpointRegistry
啟動 DLT 處理程式。
DLT 失敗行為
如果 DLT 處理失敗,有兩種可用的行為:ALWAYS_RETRY_ON_ERROR
和 FAIL_ON_ERROR
。
前者會將記錄轉發回 DLT 主題,以便它不會阻塞其他 DLT 記錄的處理。後者則在不轉發訊息的情況下結束 consumer 的執行。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
預設行為是 ALWAYS_RETRY_ON_ERROR 。 |
從 2.8.3 版本開始,如果記錄導致丟擲致命異常(例如 DeserializationException ),ALWAYS_RETRY_ON_ERROR 將不會將記錄路由回 DLT,因為通常此類異常總是會丟擲。 |
被認為是致命的異常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
你可以使用 DestinationTopicResolver
bean 上的方法向此列表新增或刪除異常。
有關更多資訊,請參見異常分類器。
配置無 DLT
框架還提供了不為主題配置 DLT 的可能性。在這種情況下,重試次數用盡後,處理簡單地結束。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotConfigureDlt()
.create(template);
}