DLT 策略

該框架提供了一些用於處理 DLT 的策略。您可以提供 DLT 處理方法、使用預設的日誌記錄方法,或者根本不使用 DLT。此外,您還可以選擇 DLT 處理失敗時的行為。

DLT 處理方法

您可以指定用於處理主題 DLT 的方法,以及該處理失敗時的行為。

為此,您可以在帶有 @RetryableTopic 註解的類的某個方法中使用 @DltHandler 註解。請注意,該方法將用於該類中所有帶有 @RetryableTopic 註解的方法。

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@DltHandler
public void processDltMessage(MyPojo message) {
    // ... message processing, persistence, etc
}

DLT 處理器方法也可以透過 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) 方法提供,將應處理 DLT 訊息的 bean 名稱和方法名稱作為引數傳入。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
        // ... message processing, persistence, etc
    }
}
如果沒有提供 DLT 處理器,則使用預設的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod

從版本 2.8 開始,如果您根本不希望此應用程式從 DLT 消費,包括透過預設處理器(或者您希望延遲消費),您可以控制 DLT 容器是否啟動,這獨立於容器工廠的 autoStartup 屬性。

使用 @RetryableTopic 註解時,將 autoStartDltHandler 屬性設定為 false;使用配置構建器時,使用 autoStartDltHandler(false)

稍後,您可以透過 KafkaListenerEndpointRegistry 啟動 DLT 處理器。

DLT 失敗行為

如果 DLT 處理失敗,有兩種可能的行為:ALWAYS_RETRY_ON_ERRORFAIL_ON_ERROR

前者將記錄轉發回 DLT 主題,這樣它就不會阻塞其他 DLT 記錄的處理。後者在不轉發訊息的情況下結束執行。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}
預設行為是 ALWAYS_RETRY_ON_ERROR
從版本 2.8.3 開始,如果記錄導致丟擲致命異常,例如 DeserializationExceptionALWAYS_RETRY_ON_ERROR 將不會將記錄路由回 DLT,因為通常此類異常總是會被丟擲。

被認為是致命的異常有:

  • DeserializationException(反序列化異常)

  • MessageConversionException(訊息轉換異常)

  • ConversionException(轉換異常)

  • MethodArgumentResolutionException(方法引數解析異常)

  • NoSuchMethodException(方法未找到異常)

  • ClassCastException(類轉換異常)

您可以使用 DestinationTopicResolver bean 上的方法向此列表新增或從中刪除異常。

有關更多資訊,請參閱異常分類器

配置不使用 DLT

該框架還提供了不為主題配置 DLT 的可能性。在這種情況下,在重試耗盡後,處理簡單地結束。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}
© . This site is unofficial and not affiliated with VMware.