異常處理

本節介紹在使用 Spring for Apache Kafka 時可能出現的各種異常的處理方式。

監聽器錯誤處理器

從 2.0 版本開始,@KafkaListener 註解新增了一個屬性:errorHandler

您可以使用 errorHandler 提供 KafkaListenerErrorHandler 實現的 bean 名稱。這個函式式介面有一個方法,如下所示:

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

您可以訪問訊息轉換器生成的 spring-messaging Message<?> 物件以及監聽器丟擲的異常(該異常被包裝在 ListenerExecutionFailedException 中)。錯誤處理器可以丟擲原始異常或新異常,這些異常會被拋到容器。錯誤處理器返回的任何值都會被忽略。

從 2.7 版本開始,您可以在 MessagingMessageConverterBatchMessagingMessageConverter 上設定 rawRecordHeader 屬性,這將導致原始的 ConsumerRecord 被新增到已轉換的 Message<?>KafkaHeaders.RAW_DATA 頭部中。例如,如果您希望在監聽器錯誤處理器中使用 DeadLetterPublishingRecoverer,這非常有用。它可能用於請求/回覆場景,在這種場景下,您希望在幾次重試後,將失敗結果傳送給傳送方,並在死信主題中捕獲失敗的記錄。

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

它有一個子介面(ConsumerAwareListenerErrorHandler),透過以下方法可以訪問 consumer 物件:

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

另一個子介面(ManualAckListenerErrorHandler)在使用手動 AckMode 時提供對 Acknowledgment 物件的訪問。

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

在任何一種情況下,您都不應該對 consumer 執行 seek 操作,因為容器對此一無所知。

容器錯誤處理器

從 2.8 版本開始,遺留的 ErrorHandlerBatchErrorHandler 介面已被新的 CommonErrorHandler 取代。這些錯誤處理器可以處理 record 監聽器和 batch 監聽器中的錯誤,允許單個監聽器容器工廠為兩種型別的監聽器建立容器。框架提供了 CommonErrorHandler 的實現來替代大多數遺留框架錯誤處理器實現。

有關將自定義遺留錯誤處理器實現遷移到 CommonErrorHandler 的資訊,請參閱將自定義遺留錯誤處理器實現遷移到 CommonErrorHandler

使用事務時,預設情況下未配置錯誤處理器,以便異常可以回滾事務。事務容器的錯誤處理由 AfterRollbackProcessor 處理。如果在事務中使用自定義錯誤處理器,則必須丟擲異常才能使事務回滾。

這個介面有一個預設方法 isAckAfterHandle(),容器會呼叫它來確定如果錯誤處理器返回而沒有丟擲異常,是否應該提交 offset(s);預設情況下它返回 true。

通常,框架提供的錯誤處理器在錯誤未被“處理”(例如,執行 seek 操作後)時會丟擲異常。預設情況下,容器會在 ERROR 級別記錄此類異常。所有框架錯誤處理器都擴充套件了 KafkaExceptionLogLevelAware,它允許您控制這些異常的日誌級別。

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

您可以指定一個全域性錯誤處理器,用於容器工廠中的所有監聽器。以下示例展示瞭如何進行設定:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

預設情況下,如果帶註解的監聽器方法丟擲異常,該異常會拋到容器,並根據容器配置處理訊息。

容器在呼叫錯誤處理器之前提交任何待處理的 offset 提交。

如果您使用 Spring Boot,只需將錯誤處理器新增為 @Bean,Boot 會將其新增到自動配置的工廠中。

退避處理器

DefaultErrorHandler 這樣的錯誤處理器使用 BackOff 來確定重試投遞前需要等待多長時間。從 2.9 版本開始,您可以配置自定義的 BackOffHandler。預設處理器僅暫停執行緒,直到退避時間過去(或容器停止)。框架還提供了 ContainerPausingBackOffHandler,它會暫停監聽器容器,直到退避時間過去,然後恢復容器。當延遲時間長於 consumer 屬性 max.poll.interval.ms 時,這非常有用。請注意,實際退避時間的精度會受到 pollTimeout 容器屬性的影響。

DefaultErrorHandler

這個新的錯誤處理器取代了 SeekToCurrentErrorHandlerRecoveringBatchErrorHandler,它們在之前的幾個版本中一直是預設的錯誤處理器。一個區別是,對於 batch 監聽器(當丟擲 BatchListenerFailedException 以外的異常時),回退行為等同於重試完整的批次

從 2.9 版本開始,DefaultErrorHandler 可以配置為提供與 seek 未處理記錄 offset 相同的語義(如下所述),但實際上並不執行 seek 操作。相反,記錄由監聽器容器保留,並在錯誤處理器退出後(以及執行單次暫停的 poll() 以保持 consumer 活躍後;如果使用了非阻塞重試ContainerPausingBackOffHandler,暫停可能會持續多次 poll)重新提交給監聽器。錯誤處理器向容器返回一個結果,指示當前失敗的記錄是否可以重新提交,或者是否已恢復且不會再次傳送給監聽器。要啟用此模式,請將屬性 seekAfterError 設定為 false

錯誤處理器可以恢復(跳過)持續失敗的記錄。預設情況下,在失敗十次後,失敗的記錄會被記錄日誌(在 ERROR 級別)。您可以為處理器配置一個自定義的 recoverer (BiConsumer) 和一個 BackOff,用於控制投遞嘗試次數和每次嘗試之間的延遲。使用 FixedBackOff 並設定 FixedBackOff.UNLIMITED_ATTEMPTS 會導致(實際上是)無限重試。以下示例配置了失敗三次後進行恢復:

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

要使用此處理器的自定義例項配置監聽器容器,請將其新增到容器工廠。

例如,使用 @KafkaListener 容器工廠,您可以按如下方式新增 DefaultErrorHandler

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

對於 record 監聽器,這將重試投遞最多 2 次(共 3 次嘗試),每次間隔 1 秒,而不是預設配置(FixedBackOff(0L, 9))。重試耗盡後,失敗只會被記錄日誌。

例如,如果 poll 返回六條記錄(分割槽 0、1、2 各兩條),並且監聽器在第四條記錄上丟擲異常,容器會透過提交前三條訊息的 offset 來確認它們。DefaultErrorHandler 會 seek 到分割槽 1 的 offset 1 和分割槽 2 的 offset 0。下一次 poll() 會返回三條未處理的記錄。

如果 AckModeBATCH,容器會在呼叫錯誤處理器之前提交前兩個分割槽的 offset。

對於 batch 監聽器,監聽器必須丟擲 BatchListenerFailedException 來指示批次中的哪些記錄失敗了。

事件序列如下:

  • 提交索引之前的記錄的 offset。

  • 如果重試次數未耗盡,則執行 seek 操作,以便所有剩餘記錄(包括失敗的記錄)都將重新投遞。

  • 如果重試次數耗盡,嘗試恢復失敗的記錄(預設僅記錄日誌),並執行 seek 操作,以便剩餘記錄(不包括失敗的記錄)將重新投遞。已恢復記錄的 offset 會被提交。

  • 如果重試次數耗盡且恢復失敗,則執行 seek 操作,就像重試次數未耗盡一樣。

從 2.9 版本開始,DefaultErrorHandler 可以配置為提供與 seek 未處理記錄 offset 相同的語義(如上所述),但實際上並不執行 seek 操作。相反,錯誤處理器建立一個新的 ConsumerRecords<?, ?>,其中只包含未處理的記錄,然後將這些記錄提交給監聽器(在執行單次暫停的 poll() 以保持 consumer 活躍後)。要啟用此模式,請將屬性 seekAfterError 設定為 false

預設的 recoverer 在重試次數耗盡後記錄失敗的記錄。您可以使用自定義的 recoverer,或者使用框架提供的 recoverer,例如 DeadLetterPublishingRecoverer

使用 POJO batch 監聽器(例如 List<Thing>)時,如果您沒有完整的 consumer record 可以新增到異常中,只需新增失敗記錄的索引即可:

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

當容器配置為 AckMode.MANUAL_IMMEDIATE 時,錯誤處理器可以配置為提交已恢復記錄的 offset;將 commitRecovered 屬性設定為 true

另請參閱釋出死信記錄

使用事務時,DefaultAfterRollbackProcessor 提供了類似的功能。請參閱回滾後處理器

DefaultErrorHandler 認為某些異常是致命的,會跳過此類異常的重試;recoverer 在第一次失敗時就被呼叫。預設情況下,被認為是致命的異常包括:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因為這些異常在重試投遞時不太可能得到解決。

您可以將更多異常型別新增到不可重試類別中,或者完全替換已分類異常的對映。有關更多資訊,請參閱 DefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications() 的 Javadoc,以及 spring-retryBinaryExceptionClassifier 的 Javadoc。

以下是將 IllegalArgumentException 新增到不可重試異常的示例:

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}

錯誤處理器可以配置一個或多個 RetryListener,接收重試和恢復進度的通知。從 2.8.10 版本開始,添加了用於 batch 監聽器的方法。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

有關更多資訊,請參閱 Javadoc。

如果 recoverer 失敗(丟擲異常),失敗的記錄將包含在 seek 操作中。從 2.5.5 版本開始,如果 recoverer 失敗,BackOff 預設情況下會重置,並且在再次嘗試恢復之前,重新投遞會再次經歷退避過程。在早期版本中,BackOff 不會重置,並在下一次失敗時再次嘗試恢復。要恢復到之前的行為,請將處理器的 resetStateOnRecoveryFailure 屬性設定為 false

從 2.6 版本開始,您現在可以為處理器提供一個 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根據失敗的記錄和/或異常來確定使用的 BackOff

handler.setBackOffFunction((record, ex) -> { ... });

如果函式返回 null,則使用處理器的預設 BackOff

從 2.6.3 版本開始,將 resetStateOnExceptionChange 設定為 true,如果異常型別在失敗之間發生變化,重試序列將重新啟動(包括選擇新的 BackOff,如果已配置)。預設情況下,不考慮異常型別。

從 2.9 版本開始,預設情況下此值為 true

另請參閱投遞嘗試次數頭部

Batch 錯誤處理器中的轉換錯誤

從 2.8 版本開始,使用 ByteArrayDeserializerBytesDeserializerStringDeserializer 以及 DefaultErrorHandlerMessageConverter 時,batch 監聽器現在可以正確處理轉換錯誤。發生轉換錯誤時,payload 被設定為 null,並且會將反序列化異常新增到記錄頭部中,類似於 ErrorHandlingDeserializer。監聽器中提供了一個 ConversionException 列表,因此監聽器可以丟擲 BatchListenerFailedException 來指示發生轉換異常的第一個索引。

示例

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

重試完整的批次

當 batch 監聽器丟擲 BatchListenerFailedException 以外的異常時,這現在是 DefaultErrorHandler 的回退行為。

無法保證在批次重新投遞時,批次中的記錄數量相同和/或重新投遞的記錄順序相同。因此,很難為批次輕鬆維護重試狀態。FallbackBatchErrorHandler 採用以下方法。如果 batch 監聽器丟擲非 BatchListenerFailedException 的異常,則從記憶體中的記錄批次執行重試。為了避免在長時間的重試序列中發生 rebalance,錯誤處理器會暫停 consumer,在每次重試的退避睡眠之前進行 poll,然後再次呼叫監聽器。如果/當重試耗盡時,會為批次中的每條記錄呼叫 ConsumerRecordRecoverer。如果 recoverer 丟擲異常,或者執行緒在其睡眠期間被中斷,則批次記錄將在下一次 poll 時重新投遞。在退出之前,無論結果如何,consumer 都會恢復。

此機制不能用於事務。

在等待 BackOff 間隔期間,錯誤處理器會迴圈短暫睡眠,直到達到期望的延遲,同時檢查容器是否已停止,以便睡眠能在 stop() 後儘快退出,而不是導致延遲。

停止容器錯誤處理器

如果監聽器丟擲異常,CommonContainerStoppingErrorHandler 會停止容器。對於 record 監聽器,當 AckModeRECORD 時,已處理記錄的 offset 會被提交。對於 record 監聽器,當 AckMode 是任何手動值時,已確認記錄的 offset 會被提交。對於 record 監聽器,當 AckModeBATCH,或者對於 batch 監聽器,容器重新啟動時會重新處理整個批次。

容器停止後,會丟擲一個包裝了 ListenerExecutionFailedException 的異常。這是為了使事務回滾(如果啟用了事務)。

委託錯誤處理器

CommonDelegatingErrorHandler 可以根據異常型別委託給不同的錯誤處理器。例如,您可能希望對大多數異常呼叫 DefaultErrorHandler,而對其他異常呼叫 CommonContainerStoppingErrorHandler

所有委託必須共享相同的相容屬性(ackAfterHandle, seekAfterError 等)。

記錄日誌錯誤處理器

CommonLoggingErrorHandler 只記錄異常日誌;對於 record 監聽器,前一次 poll 中剩餘的記錄會傳遞給監聽器。對於 batch 監聽器,批次中的所有記錄都會被記錄日誌。

對 Record 和 Batch 監聽器使用不同的通用錯誤處理器

如果您希望對 record 和 batch 監聽器採用不同的錯誤處理策略,框架提供了 CommonMixedErrorHandler,它允許為每種監聽器型別配置特定的錯誤處理器。

通用錯誤處理器總結

  • DefaultErrorHandler

  • CommonContainerStoppingErrorHandler

  • CommonDelegatingErrorHandler

  • CommonLoggingErrorHandler

  • CommonMixedErrorHandler

遺留錯誤處理器及其替代方案

遺留錯誤處理器 替代方案

LoggingErrorHandler

CommonLoggingErrorHandler

BatchLoggingErrorHandler

CommonLoggingErrorHandler

ConditionalDelegatingErrorHandler

DelegatingErrorHandler

ConditionalDelegatingBatchErrorHandler

DelegatingErrorHandler

ContainerStoppingErrorHandler

CommonContainerStoppingErrorHandler

ContainerStoppingBatchErrorHandler

CommonContainerStoppingErrorHandler

SeekToCurrentErrorHandler

DefaultErrorHandler

SeekToCurrentBatchErrorHandler

沒有替代方案,使用配置了無限 BackOffDefaultErrorHandler

RecoveringBatchErrorHandler

DefaultErrorHandler

RetryingBatchErrorHandler

沒有替代方案,使用 DefaultErrorHandler 並丟擲 BatchListenerFailedException 以外的異常。

將自定義遺留錯誤處理器實現遷移到 CommonErrorHandler

請參閱 CommonErrorHandler 中的 Javadoc。

要替換 ErrorHandlerConsumerAwareErrorHandler 實現,您應該實現 handleOne() 並讓 seeksAfterHandle() 返回 false(預設值)。您還應該實現 handleOtherException() 來處理在記錄處理範圍之外發生的異常(例如 consumer 錯誤)。

要替換 RemainingRecordsErrorHandler 實現,您應該實現 handleRemaining() 並重寫 seeksAfterHandle() 以返回 true(錯誤處理器必須執行必要的 seek 操作)。您還應該實現 handleOtherException() - 以處理在記錄處理範圍之外發生的異常(例如 consumer 錯誤)。

要替換任何 BatchErrorHandler 實現,您應該實現 handleBatch()。您還應該實現 handleOtherException() - 以處理在記錄處理範圍之外發生的異常(例如 consumer 錯誤)。

回滾後處理器

使用事務時,如果監聽器丟擲異常(並且如果存在錯誤處理器,它也丟擲異常),事務將回滾。預設情況下,任何未處理的記錄(包括失敗的記錄)會在下一次 poll 時重新獲取。這是透過在 DefaultAfterRollbackProcessor 中執行 seek 操作實現的。對於 batch 監聽器,整個批次記錄會重新處理(容器不知道批次中哪個記錄失敗了)。要修改此行為,您可以為監聽器容器配置自定義的 AfterRollbackProcessor。例如,對於基於 record 的監聽器,您可能希望跟蹤失敗的記錄並在嘗試一定次數後放棄,或許透過將其釋出到死信主題。

從 2.2 版本開始,DefaultAfterRollbackProcessor 現在可以恢復(跳過)持續失敗的記錄。預設情況下,在失敗十次後,失敗的記錄會被記錄日誌(在 ERROR 級別)。您可以為處理器配置自定義的 recoverer (BiConsumer) 和最大失敗次數。將 maxFailures 屬性設定為負數會導致無限重試。以下示例配置了失敗三次後進行恢復:

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

當您不使用事務時,可以透過配置 DefaultErrorHandler 實現類似功能。請參閱容器錯誤處理器

從 3.2 版本開始,現在可以恢復(跳過)持續失敗的整個批次記錄。設定 ContainerProperties.setBatchRecoverAfterRollback(true) 來啟用此功能。

預設行為下,對於 batch 監聽器無法進行恢復,因為框架不知道批次中哪個記錄持續失敗。在這種情況下,應用程式監聽器必須處理持續失敗的記錄。

另請參閱釋出死信記錄

從 2.2.5 版本開始,可以在新事務中呼叫 DefaultAfterRollbackProcessor(在失敗事務回滾後啟動)。然後,如果您使用 DeadLetterPublishingRecoverer 釋出失敗的記錄,處理器會將已恢復記錄在原始 topic/partition 中的 offset 傳送到該事務。要啟用此功能,請在 DefaultAfterRollbackProcessor 上設定 commitRecoveredkafkaTemplate 屬性。

如果 recoverer 失敗(丟擲異常),失敗的記錄將包含在 seek 操作中。從 2.5.5 版本開始,如果 recoverer 失敗,BackOff 預設情況下會重置,並且在再次嘗試恢復之前,重新投遞會再次經歷退避過程。在早期版本中,BackOff 不會重置,並在下一次失敗時再次嘗試恢復。要恢復到之前的行為,請將處理器的 resetStateOnRecoveryFailure 屬性設定為 false

從 2.6 版本開始,您現在可以為處理器提供一個 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根據失敗的記錄和/或異常來確定使用的 BackOff

handler.setBackOffFunction((record, ex) -> { ... });

如果函式返回 null,則使用處理器的預設 BackOff

從 2.6.3 版本開始,將 resetStateOnExceptionChange 設定為 true,如果異常型別在失敗之間發生變化,重試序列將重新啟動(包括選擇新的 BackOff,如果已配置)。預設情況下,不考慮異常型別。

從 2.3.1 版本開始,與 DefaultErrorHandler 類似,DefaultAfterRollbackProcessor 認為某些異常是致命的,會跳過此類異常的重試;recoverer 在第一次失敗時就被呼叫。預設情況下,被認為是致命的異常包括:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因為這些異常在重試投遞時不太可能得到解決。

您可以將更多異常型別新增到不可重試類別中,或者完全替換已分類異常的對映。有關更多資訊,請參閱 DefaultAfterRollbackProcessor.setClassifications() 的 Javadoc,以及 spring-retryBinaryExceptionClassifier 的 Javadoc。

以下是將 IllegalArgumentException 新增到不可重試異常的示例:

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}

另請參閱投遞嘗試次數頭部

使用當前的 kafka-clients,容器無法檢測到 ProducerFencedException 是由 rebalance 引起的,還是由於超時或過期導致 producer 的 transactional.id 被撤銷。因為在大多數情況下,它是由 rebalance 引起的,容器不會呼叫 AfterRollbackProcessor(因為對分割槽執行 seek 操作不合適,因為我們不再被分配這些分割槽)。如果您確保超時時間足夠長以處理每個事務,並定期執行“空”事務(例如,透過 ListenerContainerIdleEvent),則可以避免因超時和過期導致的 fencing。或者,您可以將 stopContainerWhenFenced 容器屬性設定為 true,容器將停止,避免記錄丟失。您可以消費 ConsumerStoppedEvent 並檢查其 Reason 屬性是否為 FENCED 來檢測此情況。由於事件還包含對容器的引用,您可以使用此事件重新啟動容器。

從 2.7 版本開始,在等待 BackOff 間隔期間,錯誤處理器會迴圈短暫睡眠,直到達到期望的延遲,同時檢查容器是否已停止,以便睡眠能在 stop() 後儘快退出,而不是導致延遲。

從 2.7 版本開始,處理器可以配置一個或多個 RetryListener,接收重試和恢復進度的通知。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

有關更多資訊,請參閱 Javadoc。

投遞嘗試次數頭部

以下內容僅適用於 record 監聽器,不適用於 batch 監聽器。

從 2.5 版本開始,使用實現 DeliveryAttemptAwareErrorHandlerAfterRollbackProcessor 時,可以將 KafkaHeaders.DELIVERY_ATTEMPT 頭部 (kafka_deliveryAttempt) 新增到記錄中。此頭部的值是一個從 1 開始遞增的整數。接收原始 ConsumerRecord<?, ?> 時,該整數儲存在 byte[4] 中。

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

使用 @KafkaListener 並配合 DefaultKafkaHeaderMapperSimpleKafkaHeaderMapper 時,可以透過向監聽器方法新增引數 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery 來獲取該頭部。

要啟用此頭部的填充,請將容器屬性 deliveryAttemptHeader 設定為 true。預設情況下停用此功能,以避免查詢每條記錄的狀態並新增頭部帶來的(較小的)開銷。

DefaultErrorHandlerDefaultAfterRollbackProcessor 支援此功能。

Batch 監聽器的投遞嘗試次數頭部

使用 BatchListener 處理 ConsumerRecord 時,與 SingleRecordListener 相比,KafkaHeaders.DELIVERY_ATTEMPT 頭部可能以不同的方式存在。

從 3.3 版本開始,如果您希望在使用 BatchListener 時將 KafkaHeaders.DELIVERY_ATTEMPT 頭部注入到 ConsumerRecord 中,請將 DeliveryAttemptAwareRetryListener 設定為 ErrorHandler 中的 RetryListener

請參考以下程式碼。

final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);

然後,每當批次未能完成時,DeliveryAttemptAwareRetryListener 會將 KafkaHeaders.DELIVERY_ATTMPT 頭部注入到 ConsumerRecord 中。

監聽器資訊頭部

在某些情況下,能夠知道監聽器執行在哪個容器中很有用。

從 2.8.4 版本開始,您現在可以在監聽器容器上設定 listenerInfo 屬性,或在 @KafkaListener 註解上設定 info 屬性。然後,容器會將此資訊新增到所有入站訊息的 KafkaListener.LISTENER_INFO 頭部中;此頭部可用於 record 攔截器、過濾器等,或在監聽器本身中使用。

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

RecordInterceptorRecordFilterStrategy 實現中使用時,頭部以位元組陣列形式存在於 consumer record 中,使用 KafkaListenerAnnotationBeanPostProcessorcharSet 屬性進行轉換。

頭部對映器在從 consumer record 建立 MessageHeaders 時也會轉換為 String,並且永遠不會在出站記錄上對映此頭部。

對於 POJO batch 監聽器,從 2.8.6 版本開始,頭部會複製到批次中的每個成員,並在轉換後也作為單個 String 引數可用。

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
如果 batch 監聽器有過濾器且過濾結果為空批次,您需要在 @Header 引數中新增 required = false,因為空批次不提供該資訊。

如果您接收到 List<Message<Thing>>,則資訊位於每個 Message<?>KafkaHeaders.LISTENER_INFO 頭部中。

有關消費批次的更多資訊,請參閱Batch 監聽器

釋出死信記錄

您可以使用記錄恢復器配置 DefaultErrorHandlerDefaultAfterRollbackProcessor,以便在記錄達到最大失敗次數時進行處理。該框架提供了 DeadLetterPublishingRecoverer,它將失敗的訊息釋出到另一個主題。此恢復器需要一個 KafkaTemplate<Object, Object>,用於傳送記錄。您還可以選擇性地配置一個 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>,它會被呼叫來解析目標主題和分割槽。

預設情況下,死信記錄會被髮送到名為 <originalTopic>-dlt 的主題(原始主題名稱後附加 -dlt),併發送到與原始記錄相同的分割槽。因此,當您使用預設解析器時,死信主題必須至少擁有與原始主題一樣多的分割槽。

如果返回的 TopicPartition 的分割槽為負數,則在 ProducerRecord 中不會設定分割槽,分割槽將由 Kafka 選擇。從版本 2.2.4 開始,任何 ListenerExecutionFailedException(例如,在 @KafkaListener 方法中檢測到異常時丟擲)都增強了 groupId 屬性。這使得目標解析器除了使用 ConsumerRecord 中的資訊外,還可以利用此屬性來選擇死信主題。

以下示例展示瞭如何配置一個自定義目標解析器

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

傳送到死信主題的記錄會增強以下頭部(headers)

  • KafkaHeaders.DLT_EXCEPTION_FQCN:異常類名(通常是 ListenerExecutionFailedException,但也可以是其他)。

  • KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN:異常原因的類名,如果存在的話(從版本 2.8 開始)。

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE:異常的堆疊跟蹤。

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE:異常訊息。

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN:異常類名(僅限鍵反序列化錯誤)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE:異常的堆疊跟蹤(僅限鍵反序列化錯誤)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE:異常訊息(僅限鍵反序列化錯誤)。

  • KafkaHeaders.DLT_ORIGINAL_TOPIC:原始主題。

  • KafkaHeaders.DLT_ORIGINAL_PARTITION:原始分割槽。

  • KafkaHeaders.DLT_ORIGINAL_OFFSET:原始偏移量。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP:原始時間戳。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE:原始時間戳型別。

  • KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP:未能處理該記錄的原始消費者組(從版本 2.8 開始)。

鍵異常僅由 DeserializationException 引起,因此沒有 DLT_KEY_EXCEPTION_CAUSE_FQCN

有兩種機制可以新增更多頭部。

  1. 子類化恢復器並覆蓋 createProducerRecord() 方法 - 呼叫 super.createProducerRecord() 並新增更多頭部。

  2. 提供一個 BiFunction 來接收消費者記錄和異常,並返回一個 Headers 物件;這些頭部將複製到最終的生產者記錄中;另請參閱 管理死信記錄頭部。使用 setHeadersFunction() 方法設定此 BiFunction

第二種實現起來更簡單,但第一種提供了更多可用資訊,包括已經組裝好的標準頭部。

從版本 2.3 開始,當與 ErrorHandlingDeserializer 一起使用時,釋出器將在死信生產者記錄中恢復記錄的 value() 為原始的、未能反序列化的值。在此之前,value() 是 null,使用者程式碼必須從訊息頭部中解碼 DeserializationException。此外,您可以向釋出器提供多個 KafkaTemplate;例如,如果您想釋出 DeserializationException 中的 byte[],以及使用與成功反序列化的記錄不同的序列化器釋出值,這可能就需要這樣做。以下是使用 Stringbyte[] 序列化器配置釋出器的示例

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

釋出器使用 Map 的鍵來查詢適用於即將釋出的值(value())的模板。建議使用 LinkedHashMap,以便按順序檢查鍵。

釋出 null 值且存在多個模板時,恢復器將查詢適用於 Void 類的模板;如果不存在,將使用 values().iterator() 中的第一個模板。

從 2.7 版本開始,您可以使用 setFailIfSendResultIsError 方法,以便在訊息釋出失敗時丟擲異常。您還可以使用 setWaitForSendResultTimeout 方法設定傳送器成功驗證的超時時間。

如果恢復器失敗(丟擲異常),失敗的記錄將包含在 seeks 操作中。從版本 2.5.5 開始,如果恢復器失敗,預設情況下將重置 BackOff,並且在再次嘗試恢復之前,重新投遞將再次經歷回退過程。在早期版本中,BackOff 未被重置,並在下次失敗時再次嘗試恢復。要恢復到先前的行為,請將錯誤處理器的 resetStateOnRecoveryFailure 屬性設定為 false

從 2.6.3 版本開始,將 resetStateOnExceptionChange 設定為 true,如果異常型別在失敗之間發生變化,重試序列將重新啟動(包括選擇新的 BackOff,如果已配置)。預設情況下,不考慮異常型別。

從版本 2.3 開始,恢復器也可以用於 Kafka Streams - 有關更多資訊,請參閱 從反序列化異常中恢復

ErrorHandlingDeserializer 在頭部 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER 中新增反序列化異常(使用 Java 序列化)。預設情況下,這些頭部不會保留在釋出到死信主題的訊息中。從版本 2.7 開始,如果鍵和值都反序列化失敗,則兩者在傳送到 DLT 的記錄中都會填充原始值。

如果入站記錄相互依賴,但可能亂序到達,將失敗的記錄重新發布到原始主題的末尾(若干次)可能比直接將其傳送到死信主題更有用。有關示例,請參閱 此 Stack Overflow 問題

以下錯誤處理器配置將完全實現這一點

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic-dlt", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

從版本 2.7 開始,恢復器檢查目標解析器選擇的分割槽是否存在。如果分割槽不存在,ProducerRecord 中的分割槽將設定為 null,允許 KafkaProducer 選擇分割槽。您可以透過將 verifyPartition 屬性設定為 false 來停用此檢查。

從版本 3.1 開始,將 logRecoveryRecord 屬性設定為 true 將記錄恢復記錄和異常。

管理死信記錄頭部

參考上文的 釋出死信記錄DeadLetterPublishingRecoverer 有兩個屬性用於在頭部已經存在時(例如,當重新處理失敗的死信記錄時,包括使用 非阻塞重試 時)管理頭部。

  • appendOriginalHeaders (預設 true)

  • stripPreviousExceptionHeaders (自版本 2.8 起預設 true)

Apache Kafka 支援多個同名頭部;要獲取“最新”值,您可以使用 headers.lastHeader(headerName);要獲取多個頭部的迭代器,請使用 headers.headers(headerName).iterator()

當重複釋出失敗的記錄時,這些頭部可能會增長(並最終由於 RecordTooLargeException 導致釋出失敗);異常頭部尤其如此,特別是堆疊跟蹤頭部。

這兩個屬性的原因是,雖然您可能只想保留最後的異常資訊,但您可能希望保留記錄在每次失敗時經過哪些主題的歷史記錄。

appendOriginalHeaders 應用於所有名為 ORIGINAL 的頭部,而 stripPreviousExceptionHeaders 應用於所有名為 EXCEPTION 的頭部。

從版本 2.8.4 開始,您現在可以控制將哪些標準頭部新增到輸出記錄中。請參閱 enum HeadersToAdd 以瞭解預設新增的(當前)10 個標準頭部的通用名稱(這些不是實際的頭部名稱,只是一個抽象;實際的頭部名稱由 getHeaderNames() 方法設定,子類可以覆蓋此方法)。

要排除頭部,請使用 excludeHeaders() 方法;例如,要禁止在頭部中新增異常堆疊跟蹤,請使用

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

此外,您可以透過新增一個 ExceptionHeadersCreator 來完全自定義異常頭部的新增;這也會停用所有標準異常頭部。

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

同樣從版本 2.8.4 開始,您現在可以透過 addHeadersFunction 方法提供多個頭部函式。這允許應用額外的函式,即使已經註冊了另一個函式,例如在使用 非阻塞重試 時。

ExponentialBackOffWithMaxRetries 實現

Spring Framework 提供了許多 BackOff 實現。預設情況下,ExponentialBackOff 會無限重試;要在若干次重試嘗試後放棄,需要計算 maxElapsedTime。從版本 2.7.3 開始,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries,它是 ExponentialBackOff 的一個子類,接收 maxRetries 屬性並自動計算 maxElapsedTime,這會更方便一些。

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

這將在 1、2、4、8、10、10 秒後重試,然後呼叫恢復器。