重試和死信處理
預設情況下,當你在消費者繫結中配置重試(例如 `maxAttempts`)和 `enableDlq` 時,這些功能是在 binder 內部執行的,listener container 或 Kafka consumer 不參與其中。
在某些情況下,將此功能移至 listener container 中會更好,例如
-
重試和延遲的總和會超過消費者 `max.poll.interval.ms` 屬性的值,這可能會導致分割槽重平衡。
-
你希望將死信釋出到不同的 Kafka 叢集。
-
你希望為錯誤處理器新增重試監聽器。
-
...
要配置將此功能從 binder 移至 container,請定義一個型別為 `ListenerContainerWithDlqAndRetryCustomizer` 的 `@Bean`。此介面包含以下方法
/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
目標解析器 (destination resolver) 和 `BackOff` 是根據繫結屬性(如果已配置)建立的。`KafkaTemplate` 使用 `spring.kafka….` 屬性中的配置。然後,你可以使用這些來建立自定義的錯誤處理器和死信釋出器;例如
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}
};
}
現在,只需要將單個重試延遲設定得大於消費者 `max.poll.interval.ms` 屬性的值即可。
當使用多個 binder 時,'ListenerContainerWithDlqAndRetryCustomizer' bean 會被 'DefaultBinderFactory' 覆蓋。要使該 bean 生效,你需要使用 'BinderCustomizer' 來設定 container customizer(參見 [binder-customizer])
@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}