異常處理
本節描述了在使用 Spring for Apache Kafka 時如何處理可能出現的各種異常。
監聽器錯誤處理器
從 2.0 版本開始,@KafkaListener 註解新增了一個屬性:errorHandler。
您可以使用 errorHandler 來提供 KafkaListenerErrorHandler 實現的 bean 名稱。此函式式介面有一個方法,如下所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以訪問訊息轉換器生成的 spring-messaging Message<?> 物件以及監聽器丟擲的異常,該異常封裝在 ListenerExecutionFailedException 中。錯誤處理器可以丟擲原始異常或新異常,該異常會被拋給容器。錯誤處理器返回的任何內容都會被忽略。
從 2.7 版本開始,您可以在 MessagingMessageConverter 和 BatchMessagingMessageConverter 上設定 rawRecordHeader 屬性,這會導致原始的 ConsumerRecord 被新增到轉換後的 Message<?> 的 KafkaHeaders.RAW_DATA 標頭中。這很有用,例如,如果您希望在監聽器錯誤處理器中使用 DeadLetterPublishingRecoverer。它可以在請求/回覆場景中使用,在該場景中,您希望在多次重試後,將失敗結果傳送給傳送者,並將失敗的記錄捕獲到死信主題中。
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一個子介面(ConsumerAwareListenerErrorHandler),透過以下方法可以訪問消費者物件:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一個子介面(ManualAckListenerErrorHandler)在使用手動 AckMode 時提供對 Acknowledgment 物件的訪問。
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
在這兩種情況下,您都不應該在消費者上執行任何定址操作,因為容器將無法感知它們。
容器錯誤處理器
從 2.8 版本開始,舊的 ErrorHandler 和 BatchErrorHandler 介面已被新的 CommonErrorHandler 取代。這些錯誤處理器可以處理記錄和批處理監聽器的錯誤,允許單個監聽器容器工廠為兩種型別的監聽器建立容器。提供了 CommonErrorHandler 的實現以替換大多數舊框架錯誤處理器實現。
有關將自定義舊錯誤處理器遷移到 CommonErrorHandler 的資訊,請參見 將自定義舊錯誤處理器實現遷移到 CommonErrorHandler。
當使用事務時,預設情況下不配置錯誤處理器,以便異常會回滾事務。事務容器的錯誤處理由 AfterRollbackProcessor 處理。如果在使用事務時提供自定義錯誤處理器,如果您希望回滾事務,它必須丟擲異常。
該介面有一個預設方法 isAckAfterHandle(),容器呼叫它來確定如果錯誤處理器返回而不丟擲異常,是否應提交偏移量;它預設返回 true。
通常,框架提供的錯誤處理器在錯誤未“處理”(例如,執行定址操作後)時會丟擲異常。預設情況下,此類異常由容器以 ERROR 級別記錄。所有框架錯誤處理器都擴充套件了 KafkaExceptionLogLevelAware,它允許您控制這些異常的日誌級別。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定一個全域性錯誤處理器,用於容器工廠中的所有監聽器。以下示例演示瞭如何實現:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
預設情況下,如果帶註解的監聽器方法丟擲異常,它會被拋給容器,並且訊息將根據容器配置進行處理。
容器在呼叫錯誤處理器之前提交任何掛起的偏移量提交。
如果您正在使用 Spring Boot,您只需將錯誤處理器新增為 @Bean,Boot 將其新增到自動配置的工廠中。
退避處理器
諸如 DefaultErrorHandler 之類的錯誤處理器使用 BackOff 來確定在重試傳遞之前等待多長時間。從 2.9 版本開始,您可以配置自定義的 BackOffHandler。預設處理器只是暫停執行緒,直到退避時間過去(或容器停止)。框架還提供了 ContainerPausingBackOffHandler,它會暫停監聽器容器,直到退避時間過去,然後恢復容器。當延遲時間長於 max.poll.interval.ms 消費者屬性時,這很有用。請注意,實際退避時間的解析將受到 pollTimeout 容器屬性的影響。
DefaultErrorHandler
這個新的錯誤處理器取代了 SeekToCurrentErrorHandler 和 RecoveringBatchErrorHandler,它們已經作為預設錯誤處理器存在了幾個版本。一個不同之處是,對於批處理監聽器(當丟擲 BatchListenerFailedException 以外的異常時)的備用行為相當於 重試完整批次。
從 2.9 版本開始,可以配置 DefaultErrorHandler 以提供與下面討論的尋求未處理記錄偏移量相同的語義,但實際上不進行尋求。相反,記錄由監聽器容器保留,並在錯誤處理器退出後(以及在執行一次暫停的 poll() 以保持消費者活動後;如果正在使用 非阻塞重試 或 ContainerPausingBackOffHandler,暫停可能會持續多次輪詢)重新提交給監聽器。錯誤處理器向容器返回一個結果,指示當前失敗的記錄是否可以重新提交,或者它是否已恢復,然後將不再發送給監聽器。要啟用此模式,請將屬性 seekAfterError 設定為 false。 |
錯誤處理器可以恢復(跳過)持續失敗的記錄。預設情況下,在十次失敗後,失敗的記錄會被記錄(以 ERROR 級別)。您可以使用自定義的恢復器(BiConsumer)和控制傳遞嘗試和每次嘗試之間延遲的 BackOff 來配置處理器。使用 FixedBackOff 和 FixedBackOff.UNLIMITED_ATTEMPTS 會導致(實際上)無限重試。以下示例配置了三次嘗試後的恢復:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要為監聽器容器配置此處理器的自定義例項,請將其新增到容器工廠。
例如,對於 @KafkaListener 容器工廠,您可以按如下方式新增 DefaultErrorHandler:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
對於記錄監聽器,這將重試傳遞最多 2 次(3 次傳遞嘗試),退避時間為 1 秒,而不是預設配置(FixedBackOff(0L, 9))。重試耗盡後,失敗只會記錄日誌。
例如,如果 poll 返回六條記錄(分割槽 0、1、2 各兩條),並且監聽器在第四條記錄上丟擲異常,則容器透過提交其偏移量來確認前三條訊息。DefaultErrorHandler 尋求分割槽 1 的偏移量 1 和分割槽 2 的偏移量 0。下一次 poll() 返回三條未處理的記錄。
如果 AckMode 是 BATCH,則容器在呼叫錯誤處理器之前提交前兩個分割槽的偏移量。
對於批處理監聽器,監聽器必須丟擲 BatchListenerFailedException,指示批處理中的哪些記錄失敗。
事件序列是:
-
提交索引之前的記錄的偏移量。
-
如果重試未耗盡,則執行尋求操作,以便所有剩餘記錄(包括失敗記錄)將重新傳遞。
-
如果重試耗盡,嘗試恢復失敗記錄(預設只記錄日誌),並執行尋求操作,以便剩餘記錄(不包括失敗記錄)將重新傳遞。已恢復記錄的偏移量將被提交。
-
如果重試耗盡且恢復失敗,則像重試未耗盡一樣執行尋求操作。
從 2.9 版本開始,可以配置 DefaultErrorHandler 以提供與上面討論的尋求未處理記錄偏移量相同的語義,但實際上不進行尋求。相反,錯誤處理器建立一個新的 ConsumerRecords<?, ?>,其中只包含未處理的記錄,然後將其提交給監聽器(在執行一次暫停的 poll() 以保持消費者活動後)。要啟用此模式,請將屬性 seekAfterError 設定為 false。 |
預設的恢復器在重試耗盡後記錄失敗的記錄。您可以使用自定義恢復器,或框架提供的恢復器,例如 DeadLetterPublishingRecoverer。
當使用 POJO 批處理監聽器(例如 List<Thing>),並且您沒有完整的消費者記錄要新增到異常中時,您只需新增失敗記錄的索引:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < things.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
當容器配置為 AckMode.MANUAL_IMMEDIATE 時,可以將錯誤處理器配置為提交已恢復記錄的偏移量;將 commitRecovered 屬性設定為 true。
另請參見 釋出死信記錄。
當使用事務時,DefaultAfterRollbackProcessor 提供了類似的功能。請參見 回滾後處理器。
DefaultErrorHandler 認為某些異常是致命的,並跳過這些異常的重試;恢復器在第一次失敗時被呼叫。預設情況下被認為是致命的異常有:
-
DeserializationException -
MessageConversionException -
ConversionException -
MethodArgumentResolutionException -
NoSuchMethodException -
ClassCastException
因為這些異常不太可能在重試傳遞時得到解決。
您可以將更多異常型別新增到不可重試類別中,或者完全替換分類異常的對映。有關更多資訊,請參見 DefaultErrorHandler.addNotRetryableException() 和 DefaultErrorHandler.setClassifications() 的 Javadoc,以及 ExceptionMatcher。
這是一個將 IllegalArgumentException 新增到不可重試異常的示例:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
DefaultErrorHandler 只處理繼承自 RuntimeException 的異常。繼承自 Error 的異常會完全繞過錯誤處理器,導致消費者立即終止,關閉 Kafka 連線,並跳過所有重試/恢復機制。這種關鍵的區別意味著應用程式可能會報告健康狀態,儘管已終止的消費者不再處理訊息。始終確保訊息處理程式碼中丟擲的異常明確地擴充套件自 RuntimeException 而不是 Error,以允許正確的錯誤處理。換句話說,如果應用程式丟擲異常,請確保它擴充套件自 RuntimeException,而不是無意中繼承自 Error。像 OutOfMemoryError、IllegalAccessError 和應用程式無法控制的其他錯誤仍然被視為 Error,並且不會重試。 |
錯誤處理器可以配置一個或多個 RetryListener,接收重試和恢復進度的通知。從 2.8.10 版本開始,添加了批處理監聽器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
有關更多資訊,請參見 JavaDocs。
如果恢復器失敗(丟擲異常),失敗的記錄將包含在定址操作中。如果恢復器失敗,預設情況下 BackOff 將被重置,並且重新傳遞將再次透過退避操作,然後再次嘗試恢復。要在恢復失敗後跳過重試,請將錯誤處理器的 resetStateOnRecoveryFailure 設定為 false。 |
您可以為錯誤處理器提供一個 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根據失敗的記錄和/或異常來確定要使用的 BackOff:
handler.setBackOffFunction((record, ex) -> { ... });
如果函式返回 null,則將使用處理器的預設 BackOff。
將 resetStateOnExceptionChange 設定為 true,如果異常型別在失敗之間發生變化,則重試序列將重新啟動(包括選擇一個新的 BackOff,如果已配置)。當為 false(2.9 版本之前的預設值)時,不考慮異常型別。
從 2.9 版本開始,這現在預設為 true。
另請參見 傳遞嘗試頭。
批處理監聽器與死信主題的錯誤處理
非阻塞重試 (@RetryableTopic 註解) 不支援批處理監聽器。對於批處理監聽器與死信主題功能,請使用 DefaultErrorHandler 和 DeadLetterPublishingRecoverer。 |
使用 BatchListenerFailedException
要指示批次中哪個特定記錄失敗,請丟擲 BatchListenerFailedException:
@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, Order>> records) {
for (ConsumerRecord<String, Order> record : records) {
try {
process(record.value());
}
catch (Exception e) {
// Identifies the failed record for error handling
throw new BatchListenerFailedException("Failed to process", e, record);
}
}
}
對於沒有 ConsumerRecord 的 POJO 批處理監聽器,請改用索引:
@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<Order> orders) {
for (int i = 0; i < orders.size(); i++) {
try {
process(orders.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", e, i);
}
}
}
為批處理監聽器配置死信主題
在您的批處理監聽器容器工廠上配置一個帶有 DeadLetterPublishingRecoverer 的 DefaultErrorHandler:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
ConsumerFactory<String, Order> consumerFactory,
KafkaTemplate<String, Order> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
// Configure Dead Letter Publishing
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + "-dlt", record.partition()));
// Configure retries: 3 attempts with 1 second between each
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
new FixedBackOff(1000L, 2L)); // 2 retries = 3 total attempts
factory.setCommonErrorHandler(errorHandler);
return factory;
}
批處理錯誤處理的工作原理
當丟擲 BatchListenerFailedException 時,DefaultErrorHandler 會:
-
提交失敗記錄之前所有記錄的偏移量
-
根據
BackOff配置重試失敗記錄(以及後續記錄) -
當重試耗盡時,釋出到 DLT - 只有失敗的記錄被髮送到 DLT
-
提交失敗記錄的偏移量並重新傳遞剩餘記錄進行處理
批處理包含 6 條記錄,其中索引為 2 的記錄失敗的示例流程:
-
第一次嘗試:記錄 0、1 成功處理;記錄 2 失敗
-
容器提交記錄 0、1 的偏移量
-
重試嘗試 1:記錄 2、3、4、5 被重試
-
重試嘗試 2:記錄 2、3、4、5 再次被重試
-
重試耗盡後:記錄 2 釋出到 DLT 並提交其偏移量
-
容器繼續處理記錄 3、4、5
跳過特定異常的重試
預設情況下,DefaultErrorHandler 會重試所有異常,除了致命異常(如 DeserializationException、MessageConversionException 等)。要跳過您自己的異常型別的重試,請使用異常分類配置錯誤處理器。
錯誤處理器檢查 BatchListenerFailedException 的原因以確定是否應跳過重試:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
ConsumerFactory<String, Order> consumerFactory,
KafkaTemplate<String, Order> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
new FixedBackOff(1000L, 2L));
// Add custom exception types that should skip retries and go directly to DLT
errorHandler.addNotRetryableExceptions(ValidationException.class, InvalidFormatException.class);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
現在在您的監聽器中:
@KafkaListener(id = "batch-listener", topics = "orders", containerFactory = "batchFactory")
public void processOrders(List<ConsumerRecord<String, Order>> records) {
for (ConsumerRecord<String, Order> record : records) {
try {
process(record.value());
}
catch (DatabaseException e) {
// Will be retried 3 times (according to BackOff configuration)
throw new BatchListenerFailedException("Database error", e, record);
}
catch (ValidationException e) {
// Skips retries - goes directly to DLT
// (because ValidationException is configured as not retryable)
throw new BatchListenerFailedException("Validation failed", e, record);
}
}
}
錯誤處理器檢查 BatchListenerFailedException 的原因(第二個引數)。如果原因被歸類為不可重試,則記錄會立即傳送到 DLT,而不進行重試。 |
偏移量提交行為
瞭解偏移量提交對於批處理錯誤處理很重要:
-
AckMode.BATCH(批處理監聽器最常見)
-
失敗記錄之前的偏移量在錯誤處理之前提交
-
失敗記錄的偏移量在成功恢復(DLT 釋出)後提交
-
-
AckMode.MANUAL_IMMEDIATE:
-
設定
errorHandler.setCommitRecovered(true)以提交已恢復記錄的偏移量 -
您在監聽器中控制確認時機
-
手動確認示例
@KafkaListener(id = "manual-batch", topics = "myTopic", containerFactory = "manualBatchFactory")
public void listen(List<ConsumerRecord<String, Order>> records, Acknowledgment ack) {
for (ConsumerRecord<String, Order> record : records) {
try {
process(record.value());
}
catch (Exception e) {
throw new BatchListenerFailedException("Processing failed", e, record);
}
}
ack.acknowledge();
}
批處理錯誤處理器中的轉換錯誤
從 2.8 版本開始,批處理監聽器現在可以正確處理轉換錯誤,當使用帶有 ByteArrayDeserializer、BytesDeserializer 或 StringDeserializer 的 MessageConverter 和 DefaultErrorHandler 時。當發生轉換錯誤時,有效負載被設定為 null,並且反序列化異常被新增到記錄頭中,類似於 ErrorHandlingDeserializer。監聽器中會提供一個 ConversionException 列表,因此監聽器可以丟擲 BatchListenerFailedException,指示發生轉換異常的第一個索引。
示例
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
批處理監聽器中的反序列化錯誤
批處理監聽器需要手動處理反序列化錯誤。與記錄監聽器不同,沒有自動錯誤處理器可以檢測並將反序列化失敗路由到 DLT。您必須顯式檢查失敗的記錄並丟擲 BatchListenerFailedException。 |
使用 ErrorHandlingDeserializer 可防止反序列化失敗停止整個批次
@Bean
public ConsumerFactory<String, Order> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Wrap your deserializer with ErrorHandlingDeserializer
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}
在您的監聽器中,您必須手動檢查 null 值,這些值表示反序列化失敗
@KafkaListener(id = "batch-deser", topics = "orders", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, Order>> records) {
for (ConsumerRecord<String, Order> record : records) {
if (record.value() == null) {
// Deserialization failed - throw exception to send to DLT
throw new BatchListenerFailedException("Deserialization failed", record);
}
process(record.value());
}
}
當 DeadLetterPublishingRecoverer 將反序列化失敗釋出到 DLT 時
-
無法反序列化的原始
byte[]資料將作為記錄值恢復 -
異常資訊(類名、訊息、堆疊跟蹤)將新增到標準 DLT 異常頭中
-
原始的
ErrorHandlingDeserializer異常頭預設會被移除(設定 recoverer 的setRetainExceptionHeader(true)以保留它)
重試完整批次
對於批處理監聽器,當監聽器丟擲 BatchListenerFailedException 以外的異常時,這現在是 DefaultErrorHandler 的備用行為。
無法保證當批次重新傳遞時,批次具有相同數量的記錄和/或重新傳遞的記錄順序相同。因此,不可能輕易地維護批次的重試狀態。FallbackBatchErrorHandler 採用以下方法。如果批處理監聽器丟擲不是 BatchListenerFailedException 的異常,則從記憶體中的記錄批次執行重試。為了避免在擴充套件重試序列期間發生再平衡,錯誤處理器會暫停消費者,在每次重試時,在退避休眠之前對其進行輪詢,並再次呼叫監聽器。如果/當重試耗盡時,會為批次中的每條記錄呼叫 ConsumerRecordRecoverer。如果恢復器丟擲異常,或者執行緒在其休眠期間被中斷,則記錄批次將在下一次輪詢時重新傳遞。在退出之前,無論結果如何,消費者都會恢復。
| 此機制不能與事務一起使用。 |
在等待 BackOff 間隔時,錯誤處理器將以短時休眠迴圈,直到達到所需的延遲,同時檢查容器是否已停止,從而允許休眠在 stop() 後立即退出,而不是導致延遲。
容器停止錯誤處理器
如果監聽器丟擲異常,CommonContainerStoppingErrorHandler 將停止容器。對於記錄監聽器,當 AckMode 為 RECORD 時,已處理記錄的偏移量將提交。對於記錄監聽器,當 AckMode 為任何手動值時,已確認記錄的偏移量將提交。對於記錄監聽器,當 AckMode 為 BATCH 時,或對於批處理監聽器,當容器重新啟動時,整個批次將重播。
容器停止後,會丟擲一個包裝 ListenerExecutionFailedException 的異常。這是為了導致事務回滾(如果啟用了事務)。
委派錯誤處理器
CommonDelegatingErrorHandler 可以根據異常型別委託給不同的錯誤處理器。例如,您可能希望為大多數異常呼叫 DefaultErrorHandler,或者為其他異常呼叫 CommonContainerStoppingErrorHandler。
所有委託必須共享相同的相容屬性(ackAfterHandle、seekAfterError…)。
通用錯誤處理器摘要
-
DefaultErrorHandler -
CommonContainerStoppingErrorHandler -
CommonDelegatingErrorHandler -
CommonLoggingErrorHandler -
CommonMixedErrorHandler
舊版錯誤處理器及其替代品
| 舊版錯誤處理器 | 替代 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
無替代品,使用具有無限 |
|
|
|
無替代品,使用 |
將自定義舊版錯誤處理器實現遷移到 CommonErrorHandler
請參閱 CommonErrorHandler 中的 JavaDocs。
要替換 ErrorHandler 或 ConsumerAwareErrorHandler 實現,您應該實現 handleOne() 並讓 seeksAfterHandle() 返回 false(預設)。您還應該實現 handleOtherException() 來處理在記錄處理範圍之外發生的異常(例如消費者錯誤)。
要替換 RemainingRecordsErrorHandler 實現,您應該實現 handleRemaining() 並重寫 seeksAfterHandle() 以返回 true(錯誤處理器必須執行必要的尋求)。您還應該實現 handleOtherException() - 以處理在記錄處理範圍之外發生的異常(例如消費者錯誤)。
要替換任何 BatchErrorHandler 實現,您應該實現 handleBatch()。您還應該實現 handleOtherException() - 以處理在記錄處理範圍之外發生的異常(例如消費者錯誤)。
回滾後處理器
使用事務時,如果監聽器丟擲異常(並且錯誤處理器,如果存在,也丟擲異常),事務將回滾。預設情況下,任何未處理的記錄(包括失敗的記錄)將在下一次輪詢時重新獲取。這是透過在 DefaultAfterRollbackProcessor 中執行 seek 操作來實現的。對於批處理監聽器,整個批處理記錄都會被重新處理(容器不知道批處理中的哪個記錄失敗)。要修改此行為,您可以使用自定義 AfterRollbackProcessor 配置監聽器容器。例如,對於基於記錄的監聽器,您可能希望跟蹤失敗的記錄並在嘗試一定次數後放棄,也許透過將其釋出到死信主題。
從 2.2 版本開始,DefaultAfterRollbackProcessor 現在可以恢復(跳過)持續失敗的記錄。預設情況下,在十次失敗後,失敗的記錄會被記錄(以 ERROR 級別)。您可以配置處理器,使其具有自定義的恢復器(BiConsumer)和最大失敗次數。將 maxFailures 屬性設定為負數會導致無限重試。以下示例配置了三次嘗試後的恢復:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
當您不使用事務時,可以透過配置 DefaultErrorHandler 來實現類似的功能。請參閱 容器錯誤處理器。
從 3.2 版本開始,恢復現在可以恢復(跳過)持續失敗的整個批處理記錄。將 ContainerProperties.setBatchRecoverAfterRollback(true) 設定為 true 以啟用此功能。
| 預設行為是,批處理監聽器無法進行恢復,因為框架不知道批處理中哪個記錄持續失敗。在這種情況下,應用程式監聽器必須處理持續失敗的記錄。 |
另請參見 釋出死信記錄。
從 2.2.5 版本開始,DefaultAfterRollbackProcessor 可以在新事務中呼叫(在失敗事務回滾後啟動)。然後,如果您正在使用 DeadLetterPublishingRecoverer 釋出失敗記錄,處理器將在原始主題/分割槽中將已恢復記錄的偏移量傳送到事務。要啟用此功能,請在 DefaultAfterRollbackProcessor 上設定 commitRecovered 和 kafkaTemplate 屬性。
如果恢復器失敗(丟擲異常),失敗的記錄將包含在定址操作中。從 2.5.5 版本開始,如果恢復器失敗,預設情況下 BackOff 將被重置,並且重新傳遞將再次透過退避操作,然後再次嘗試恢復。在早期版本中,BackOff 未被重置,並在下次失敗時重新嘗試恢復。要恢復到以前的行為,請將處理器的 resetStateOnRecoveryFailure 屬性設定為 false。 |
從 2.6 版本開始,您現在可以為處理器提供一個 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根據失敗的記錄和/或異常來確定要使用的 BackOff:
handler.setBackOffFunction((record, ex) -> { ... });
如果函式返回 null,則將使用處理器的預設 BackOff。
從 2.6.3 版本開始,將 resetStateOnExceptionChange 設定為 true,如果異常型別在失敗之間發生變化,則重試序列將重新啟動(包括選擇一個新的 BackOff,如果已配置)。預設情況下,不考慮異常型別。
從 2.3.1 版本開始,類似於 DefaultErrorHandler,DefaultAfterRollbackProcessor 認為某些異常是致命的,並跳過這些異常的重試;恢復器在第一次失敗時被呼叫。預設情況下被認為是致命的異常有:
-
DeserializationException -
MessageConversionException -
ConversionException -
MethodArgumentResolutionException -
NoSuchMethodException -
ClassCastException
因為這些異常不太可能在重試傳遞時得到解決。
您可以將更多異常型別新增到不可重試類別中,或者完全替換分類異常的對映。有關更多資訊,請參見 DefaultAfterRollbackProcessor.setClassifications() 的 Javadoc,以及 ExceptionMatcher。
這是一個將 IllegalArgumentException 新增到不可重試異常的示例:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另請參見 傳遞嘗試頭。
使用當前的 kafka-clients,容器無法檢測到 ProducerFencedException 是由再平衡引起的,還是由於超時或過期導致生產者的 transactional.id 被撤銷。因為在大多數情況下,它是由再平衡引起的,容器不會呼叫 AfterRollbackProcessor(因為它不適合尋求分割槽,因為我們不再分配它們)。如果您確保超時足夠大以處理每個事務並定期執行“空”事務(例如透過 ListenerContainerIdleEvent),您可以避免因超時和過期而導致的隔離。或者,您可以將 stopContainerWhenFenced 容器屬性設定為 true,容器將停止,避免記錄丟失。您可以消費 ConsumerStoppedEvent 並檢查 Reason 屬性的 FENCED 以檢測此情況。由於事件還具有對容器的引用,您可以使用此事件重新啟動容器。 |
從 2.7 版本開始,在等待 BackOff 間隔時,錯誤處理器將以短時休眠迴圈,直到達到所需的延遲,同時檢查容器是否已停止,從而允許休眠在 stop() 後立即退出,而不是導致延遲。
從 2.7 版本開始,處理器可以配置一個或多個 RetryListener,接收重試和恢復進度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有關更多資訊,請參見 JavaDocs。
傳遞嘗試頭
以下僅適用於記錄監聽器,不適用於批處理監聽器。
從 2.5 版本開始,當使用實現了 DeliveryAttemptAware 的 ErrorHandler 或 AfterRollbackProcessor 時,可以啟用新增 KafkaHeaders.DELIVERY_ATTEMPT 頭(kafka_deliveryAttempt)到記錄中。此頭的值是一個從 1 開始遞增的整數。當接收原始 ConsumerRecord<?, ?> 時,整數位於 byte[4] 中。
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
當使用 @KafkaListener 和 JsonKafkaHeaderMapper 或 SimpleKafkaHeaderMapper 時,可以透過在監聽器方法中新增 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery 作為引數來獲取。
要啟用此標頭的填充,請將容器屬性 deliveryAttemptHeader 設定為 true。預設情況下停用此功能,以避免查詢每個記錄狀態和新增標頭(少量)的開銷。
DefaultErrorHandler 和 DefaultAfterRollbackProcessor 支援此功能。
批處理監聽器的傳遞嘗試頭
當使用 BatchListener 處理 ConsumerRecord 時,KafkaHeaders.DELIVERY_ATTEMPT 頭可能以與 SingleRecordListener 不同的方式存在。
從 3.3 版本開始,如果您想在使用 BatchListener 時將 KafkaHeaders.DELIVERY_ATTEMPT 頭注入到 ConsumerRecord 中,請將 DeliveryAttemptAwareRetryListener 設定為 ErrorHandler 中的 RetryListener。
請參閱以下程式碼。
final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
然後,每當批處理未能完成時,DeliveryAttemptAwareRetryListener 將把 KafkaHeaders.DELIVERY_ATTMPT 頭注入到 ConsumerRecord 中。
監聽器資訊頭
在某些情況下,能夠知道監聽器正在哪個容器中執行是有用的。
從 2.8.4 版本開始,您現在可以設定監聽器容器上的 listenerInfo 屬性,或者在 @KafkaListener 註解上設定 info 屬性。然後,容器會將此資訊新增到所有傳入訊息的 KafkaListener.LISTENER_INFO 頭中;它可以在記錄攔截器、過濾器等中,或在監聽器本身中使用。
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
當在 RecordInterceptor 或 RecordFilterStrategy 實現中使用時,頭在消費者記錄中作為位元組陣列,使用 KafkaListenerAnnotationBeanPostProcessor 的 charSet 屬性進行轉換。
頭對映器在從消費者記錄建立 MessageHeaders 時也會轉換為 String,並且永遠不會在出站記錄上對映此頭。
對於 POJO 批處理監聽器,從 2.8.6 版本開始,頭會複製到批處理的每個成員中,並且在轉換後也作為單個 String 引數可用。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批處理監聽器有過濾器,並且過濾器導致空批處理,則需要向 @Header 引數新增 required = false,因為空批處理沒有資訊。 |
如果您收到 List<Message<Thing>>,則資訊位於每個 Message<?> 的 KafkaHeaders.LISTENER_INFO 頭中。
有關消費批次的更多資訊,請參見 批處理監聽器。
釋出死信記錄
當記錄的失敗次數達到最大值時,您可以為 DefaultErrorHandler 和 DefaultAfterRollbackProcessor 配置一個記錄恢復器。框架提供了 DeadLetterPublishingRecoverer,它將失敗的訊息釋出到另一個主題。恢復器需要一個 KafkaTemplate<Object, Object>,用於傳送記錄。您還可以選擇配置一個 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>,該函式用於解析目標主題和分割槽。
預設情況下,死信記錄被髮送到名為 <originalTopic>-dlt 的主題(原始主題名稱字尾為 -dlt),並且傳送到與原始記錄相同的分割槽。因此,當您使用預設解析器時,死信主題必須至少具有與原始主題相同數量的分割槽。 |
如果返回的 TopicPartition 具有負分割槽,則不會在 ProducerRecord 中設定分割槽,因此分割槽由 Kafka 選擇。從 2.2.4 版本開始,任何 ListenerExecutionFailedException(例如,當在 @KafkaListener 方法中檢測到異常時丟擲)都使用 groupId 屬性進行了增強。這允許目標解析器除了 ConsumerRecord 中的資訊外,還可以使用此資訊來選擇死信主題。
以下示例演示瞭如何連線自定義目標解析器:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
傳送到死信主題的記錄將新增以下標頭:
-
KafkaHeaders.DLT_EXCEPTION_FQCN: 異常類名(通常是ListenerExecutionFailedException,但也可以是其他)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: 異常原因類名,如果存在(自 2.8 版本起)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE: 異常堆疊跟蹤。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE: 異常訊息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: 異常類名(僅限鍵反序列化錯誤)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 異常堆疊跟蹤(僅限鍵反序列化錯誤)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 異常訊息(僅限鍵反序列化錯誤)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC: 原始主題。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION: 原始分割槽。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET: 原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: 原始時間戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: 原始時間戳型別。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: 原始消費者組,未能處理該記錄(自 2.8 版本起)。
鍵異常僅由 DeserializationException 引起,因此沒有 DLT_KEY_EXCEPTION_CAUSE_FQCN。
有兩種機制可以新增更多標頭。
-
子類化恢復器並重寫
createProducerRecord()- 呼叫super.createProducerRecord()並新增更多標頭。 -
提供一個
BiFunction來接收消費者記錄和異常,返回一個Headers物件;其中的標頭將被複制到最終的生產者記錄;另請參閱 管理死信記錄標頭。使用setHeadersFunction()設定BiFunction。
第二種實現更簡單,但第一種提供了更多可用資訊,包括已組裝的標準標頭。
從 2.3 版本開始,當與 ErrorHandlingDeserializer 結合使用時,釋出者會將死信生產者記錄中的記錄 value() 恢復為未能反序列化的原始值。以前,value() 為空,使用者程式碼必須從訊息頭中解碼 DeserializationException。此外,您可以向釋出者提供多個 KafkaTemplate;例如,如果您想釋出來自 DeserializationException 的 byte[] 以及使用與成功反序列化記錄不同的序列化器的值,則可能需要這樣做。這是一個配置釋出者,使其具有使用 String 和 byte[] 序列化器的 KafkaTemplate 的示例:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
釋出者使用對映鍵來查詢適合要釋出的 value() 的模板。建議使用 LinkedHashMap,以便按順序檢查鍵。
當釋出 null 值時,如果有多個模板,恢復器將查詢 Void 類的模板;如果不存在,將使用 values().iterator() 中的第一個模板。
自 2.7 版本起,您可以使用 setFailIfSendResultIsError 方法,以便在訊息釋出失敗時丟擲異常。您還可以使用 setWaitForSendResultTimeout 設定傳送成功驗證的超時時間。
如果恢復器失敗(丟擲異常),失敗的記錄將包含在定址操作中。從 2.5.5 版本開始,如果恢復器失敗,預設情況下 BackOff 將被重置,並且重新傳遞將再次透過退避操作,然後再次嘗試恢復。在早期版本中,BackOff 未被重置,並在下次失敗時重新嘗試恢復。要恢復到以前的行為,請將錯誤處理器的 resetStateOnRecoveryFailure 屬性設定為 false。 |
從 2.6.3 版本開始,將 resetStateOnExceptionChange 設定為 true,如果異常型別在失敗之間發生變化,則重試序列將重新啟動(包括選擇一個新的 BackOff,如果已配置)。預設情況下,不考慮異常型別。
從 2.3 版本開始,恢復器也可以與 Kafka Streams 一起使用 - 有關更多資訊,請參見 從反序列化異常中恢復。
ErrorHandlingDeserializer 將反序列化異常新增到頭 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER 和 ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER 中(使用 Java 序列化)。預設情況下,這些頭不會保留在釋出到死信主題的訊息中。從 2.7 版本開始,如果鍵和值都未能反序列化,則兩者的原始值都將填充到傳送到 DLT 的記錄中。
如果傳入的記錄相互依賴,但可能亂序到達,則將失敗的記錄重新發布到原始主題的尾部(一定次數),而不是直接傳送到死信主題可能很有用。請參見 此 Stack Overflow 問題 以獲取示例。
以下錯誤處理器配置將完全實現這一點:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic-dlt", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
從 2.7 版本開始,恢復器會檢查目標解析器選擇的分割槽是否存在。如果分割槽不存在,則 ProducerRecord 中的分割槽將設定為 null,允許 KafkaProducer 選擇分割槽。您可以透過將 verifyPartition 屬性設定為 false 來停用此檢查。
從 3.1 版本開始,將 logRecoveryRecord 屬性設定為 true 將記錄恢復記錄和異常。
管理死信記錄頭
-
appendOriginalHeaders(預設true) -
stripPreviousExceptionHeaders(預設true,自 2.8 版本起)
Apache Kafka 支援具有相同名稱的多個頭;要獲取“最新”值,可以使用 headers.lastHeader(headerName);要獲取多個頭的迭代器,可以使用 headers.headers(headerName).iterator()。
當重複釋出失敗的記錄時,這些標頭可能會增長(並最終因 RecordTooLargeException 導致釋出失敗);對於異常標頭,特別是堆疊跟蹤標頭,更是如此。
這兩個屬性的原因是,雖然您可能只希望保留最新的異常資訊,但您可能希望保留記錄在每次失敗時經過哪些主題的歷史記錄。
appendOriginalHeaders 應用於所有名為 ORIGINAL 的頭,而 stripPreviousExceptionHeaders 應用於所有名為 EXCEPTION 的頭。
從 2.8.4 版本開始,您現在可以控制哪些標準頭將新增到輸出記錄中。請參閱 enum HeadersToAdd 以瞭解預設新增的(目前)10 個標準頭的通用名稱(這些不是實際的頭名稱,只是一個抽象;實際的頭名稱由 getHeaderNames() 方法設定,子類可以重寫此方法)。
要排除頭,請使用 excludeHeaders() 方法;例如,要禁止在頭中新增異常堆疊跟蹤,請使用:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以透過新增 ExceptionHeadersCreator 來完全自定義異常頭的新增;這也將停用所有標準異常頭。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同樣從 2.8.4 版本開始,您現在可以透過 addHeadersFunction 方法提供多個頭函式。這允許應用額外的函式,即使已經註冊了另一個函式,例如,在使用 非阻塞重試 時。
ExponentialBackOffWithMaxRetries 實現
Spring Framework 提供了許多 BackOff 實現。預設情況下,ExponentialBackOff 將無限期重試;要在一定次數的重試嘗試後放棄,需要計算 maxElapsedTime。自 2.7.3 版本起,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries,這是一個子類,它接收 maxRetries 屬性並自動計算 maxElapsedTime,這更方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
這將分別在 1、2、4、8、10、10 秒後重試,然後呼叫恢復器。