重試和死信處理

預設情況下,當您在消費者繫結中配置重試(例如 maxAttempts)和 enableDlq 時,這些功能將在繫結器內部執行,監聽器容器或 Kafka 消費者不參與。

在某些情況下,最好將此功能移至監聽器容器,例如:

  • 重試和延遲的總和將超過消費者的 max.poll.interval.ms 屬性,這可能會導致分割槽再平衡。

  • 您希望將死信釋出到不同的 Kafka 叢集。

  • 您希望將重試監聽器新增到錯誤處理程式。

  • ...​

要將此功能從繫結器移至容器,請定義一個型別為 ListenerContainerWithDlqAndRetryCustomizer@Bean。此介面包含以下方法:

/**
 * Configure the container.
 * @param container the container.
 * @param destinationName the destination name.
 * @param group the group.
 * @param dlqDestinationResolver a destination resolver for the dead letter topic (if
 * enableDlq).
 * @param backOff the backOff using retry properties (if configured).
 * @see #retryAndDlqInBinding(String, String)
 */
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
        @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
        @Nullable BackOff backOff);

/**
 * Return false to move retries and DLQ from the binding to a customized error handler
 * using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
 * configured via
 * {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
 * @param destinationName the destination name.
 * @param group the group.
 * @return false to disable retries and DLQ in the binding
 */
default boolean retryAndDlqInBinding(String destinationName, String group) {
    return true;
}

目標解析器和 BackOff 是根據繫結屬性(如果已配置)建立的。KafkaTemplate 使用 spring.kafka…​. 屬性中的配置。然後,您可以使用它們建立自定義錯誤處理程式和死信釋出器;例如:

@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
    return new ListenerContainerWithDlqAndRetryCustomizer() {

        @Override
        public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                String group,
                @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                @Nullable BackOff backOff) {

            if (destinationName.equals("topicWithLongTotalRetryConfig")) {
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }
        }

        @Override
        public boolean retryAndDlqInBinding(String destinationName, String group) {
            return !destinationName.contains("topicWithLongTotalRetryConfig");
        }

    };
}

現在,只需要一個重試延遲大於消費者的 max.poll.interval.ms 屬性即可。

當使用多個繫結器時,'ListenerContainerWithDlqAndRetryCustomizer' bean 會被 'DefaultBinderFactory' 覆蓋。為了使 bean 生效,您需要使用 'BinderCustomizer' 來設定容器定製器(請參閱 [binder-customizer]

@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
            kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
        }
        else if (binder instanceof KStreamBinder) {
            ...
        }
        else if (binder instanceof RabbitMessageChannelBinder) {
            ...
        }
    };
}
© . This site is unofficial and not affiliated with VMware.