配置

對於預設設定,透過將 @RetryableTopic 註解新增到 @KafkaListener 方法中來啟用非阻塞重試。這是推薦且最簡單的方法,因為它會自動配置所需的重試基礎設施,並使用預設設定建立重試和 DLT 主題。

要匯入非阻塞重試基礎設施並將其元件公開為 bean,請使用 @EnableKafkaRetryTopic 註解 @Configuration 類。這使得該功能的元件能夠被注入和在執行時查詢,並作為高階和全域性配置的基礎。

如果添加了此註解,則無需再新增 @EnableKafka,因為 @EnableKafkaRetryTopic 已被 @EnableKafka 元註解。

對於高階和全域性自定義,請在單個 @Configuration 類中擴充套件 RetryTopicConfigurationSupport 並覆蓋相關方法。有關更多詳細資訊,請參閱配置全域性設定和功能

預設情況下,重試主題的容器將與主容器具有相同的併發性。從 3.0 版本開始,您可以為重試容器設定不同的 concurrency(在註解上或在 RetryTopicConfigurationBuilder 中)。

僅使用上述兩種全域性配置方法之一(@EnableKafkaRetryTopic 或擴充套件 RetryTopicConfigurationSupport)。此外,只有一個 @Configuration 類應該擴充套件 RetryTopicConfigurationSupport

使用 @RetryableTopic 註解

要為 @KafkaListener 註解的方法配置重試主題和 DLT,您只需向其新增 @RetryableTopic 註解,Spring for Apache Kafka 將使用預設配置啟動所有必要的主題和消費者。

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
    // ... message processing
}

自 3.2 起,@RetryableTopic 支援類上的 @KafkaListener 將是

@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {

    @KafkaHandler
    public void processMessage(MyPojo message) {
        // ... message processing
    }

}

您可以透過使用 @DltHandler 註解標記同一類中的方法來處理 DLT 訊息。如果沒有提供 DltHandler 方法,則會建立一個預設消費者,該消費者僅記錄消費情況。

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}
如果您沒有指定 kafkaTemplate 名稱,則將查詢名為 defaultRetryTopicKafkaTemplate 的 bean。如果找不到 bean,則會丟擲異常。

從 3.0 版本開始,@RetryableTopic 註解可以用作自定義註解上的元註解;例如

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {

    @AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
    String parallelism() default "3";

}

使用 RetryTopicConfiguration bean

您還可以透過在 @Configuration 註解的類中建立 RetryTopicConfiguration bean 來配置非阻塞重試支援。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

這將為使用預設配置的 @KafkaListener 註解方法中的所有主題建立重試主題和 DLT,以及相應的消費者。需要 KafkaTemplate 例項用於訊息轉發。

為了更精細地控制每個主題的非阻塞重試處理方式,可以提供多個 RetryTopicConfiguration bean。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(5)
            .concurrency(1)
            .includeTopics(List.of("my-topic", "my-other-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics(List.of("my-topic", "my-other-topic"))
            .retryOn(MyException.class)
            .create(template);
}
重試主題和 DLT 的消費者將被分配到一個消費者組,其組 ID 是您在 @KafkaListener 註解的 groupId 引數中提供的值與主題字尾的組合。如果您不提供任何值,它們都將屬於同一個組,並且重試主題上的重新平衡將導致主主題上不必要的重新平衡。
如果消費者配置了 ErrorHandlingDeserializer 來處理反序列化異常,那麼配置 KafkaTemplate 及其生產者使用能夠同時處理普通物件和原始 byte[] 值(由反序列化異常引起)的序列化器非常重要。模板的通用值型別應為 Object。一種技術是使用 DelegatingByTypeSerializer;示例如下
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
               MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
多個 @KafkaListener 註解可用於同一個主題,無論是否手動分割槽分配以及非阻塞重試,但對於給定主題只會使用一個配置。最好為這些主題的配置使用單個 RetryTopicConfiguration bean;如果同一個主題使用了多個 @RetryableTopic 註解,則它們都應具有相同的值,否則其中一個將應用於該主題的所有監聽器,而其他註解的值將被忽略。

配置全域性設定和功能

自 2.9 版本起,用於配置元件的舊的 bean 覆蓋方法已被移除(由於前面提到的 API 的實驗性,未棄用)。這不會改變 RetryTopicConfiguration bean 方法 - 只會改變基礎設施元件的配置。現在,RetryTopicConfigurationSupport 類應該在(單個)@Configuration 類中擴充套件,並覆蓋適當的方法。示例如下

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}
使用此配置方法時,不應使用 @EnableKafkaRetryTopic 註解,以防止因重複的 bean 導致上下文啟動失敗。請改用簡單的 @EnableKafka 註解。

autoCreateTopics 為 true 時,主主題和重試主題將使用指定的分割槽數和複製因子建立。從 3.0 版本開始,預設複製因子為 -1,表示使用 broker 預設值。如果您的 broker 版本早於 2.4,則需要設定一個顯式值。要覆蓋特定主題(例如主主題或 DLT)的這些值,只需新增一個具有所需屬性的 NewTopic @Bean;這將覆蓋自動建立屬性。

預設情況下,記錄使用接收記錄的原始分割槽釋出到重試主題。如果重試主題的分割槽少於主主題,您應該適當配置框架;示例如下。
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {

    @Override
    protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
        return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
    }

    ...

}

函式的引數是消費者記錄和下一個主題的名稱。您可以返回特定的分割槽號,或返回 null 以指示 KafkaProducer 應確定分割槽。

預設情況下,當記錄在重試主題之間轉換時,重試頭的所有值(嘗試次數、時間戳)都會保留。從 2.9.6 版本開始,如果您只想保留這些頭的最後一個值,請使用上面顯示的 configureDeadLetterPublishingContainerFactory() 方法將工廠的 retainAllRetryHeaderValues 屬性設定為 false

查詢 RetryTopicConfiguration

嘗試透過從 @RetryableTopic 註解建立例項或在沒有可用註解時從 bean 容器建立例項來提供 RetryTopicConfiguration 的例項。

如果在容器中找到 bean,則會進行檢查以確定所提供的主題是否應由任何此類例項處理。

如果提供了 @RetryableTopic 註解,則會查詢 DltHandler 註解方法。

自 3.2 起,提供新的 API 以在類上使用 @RetryableTopic 註解時建立 RetryTopicConfiguration

@Bean
public RetryTopicConfiguration myRetryTopic() {
    RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
    return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}

@RetryableTopic
public static class AnnotatedClass {
    // NoOps
}
© . This site is unofficial and not affiliated with VMware.