異常處理
本節介紹在使用 Spring for Apache Kafka 時可能出現的各種異常的處理方式。
監聽器錯誤處理器
從 2.0 版本開始,@KafkaListener
註解新增了一個屬性:errorHandler
。
您可以使用 errorHandler
提供 KafkaListenerErrorHandler
實現的 bean 名稱。這個函式式介面有一個方法,如下所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以訪問訊息轉換器生成的 spring-messaging Message<?>
物件以及監聽器丟擲的異常(該異常被包裝在 ListenerExecutionFailedException
中)。錯誤處理器可以丟擲原始異常或新異常,這些異常會被拋到容器。錯誤處理器返回的任何值都會被忽略。
從 2.7 版本開始,您可以在 MessagingMessageConverter
和 BatchMessagingMessageConverter
上設定 rawRecordHeader
屬性,這將導致原始的 ConsumerRecord
被新增到已轉換的 Message<?>
的 KafkaHeaders.RAW_DATA
頭部中。例如,如果您希望在監聽器錯誤處理器中使用 DeadLetterPublishingRecoverer
,這非常有用。它可能用於請求/回覆場景,在這種場景下,您希望在幾次重試後,將失敗結果傳送給傳送方,並在死信主題中捕獲失敗的記錄。
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一個子介面(ConsumerAwareListenerErrorHandler
),透過以下方法可以訪問 consumer 物件:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一個子介面(ManualAckListenerErrorHandler
)在使用手動 AckMode
時提供對 Acknowledgment
物件的訪問。
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
在任何一種情況下,您都不應該對 consumer 執行 seek 操作,因為容器對此一無所知。
容器錯誤處理器
從 2.8 版本開始,遺留的 ErrorHandler
和 BatchErrorHandler
介面已被新的 CommonErrorHandler
取代。這些錯誤處理器可以處理 record 監聽器和 batch 監聽器中的錯誤,允許單個監聽器容器工廠為兩種型別的監聽器建立容器。框架提供了 CommonErrorHandler
的實現來替代大多數遺留框架錯誤處理器實現。
有關將自定義遺留錯誤處理器實現遷移到 CommonErrorHandler
的資訊,請參閱將自定義遺留錯誤處理器實現遷移到 CommonErrorHandler
。
使用事務時,預設情況下未配置錯誤處理器,以便異常可以回滾事務。事務容器的錯誤處理由 AfterRollbackProcessor
處理。如果在事務中使用自定義錯誤處理器,則必須丟擲異常才能使事務回滾。
這個介面有一個預設方法 isAckAfterHandle()
,容器會呼叫它來確定如果錯誤處理器返回而沒有丟擲異常,是否應該提交 offset(s);預設情況下它返回 true。
通常,框架提供的錯誤處理器在錯誤未被“處理”(例如,執行 seek 操作後)時會丟擲異常。預設情況下,容器會在 ERROR
級別記錄此類異常。所有框架錯誤處理器都擴充套件了 KafkaExceptionLogLevelAware
,它允許您控制這些異常的日誌級別。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定一個全域性錯誤處理器,用於容器工廠中的所有監聽器。以下示例展示瞭如何進行設定:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
預設情況下,如果帶註解的監聽器方法丟擲異常,該異常會拋到容器,並根據容器配置處理訊息。
容器在呼叫錯誤處理器之前提交任何待處理的 offset 提交。
如果您使用 Spring Boot,只需將錯誤處理器新增為 @Bean
,Boot 會將其新增到自動配置的工廠中。
退避處理器
像 DefaultErrorHandler
這樣的錯誤處理器使用 BackOff
來確定重試投遞前需要等待多長時間。從 2.9 版本開始,您可以配置自定義的 BackOffHandler
。預設處理器僅暫停執行緒,直到退避時間過去(或容器停止)。框架還提供了 ContainerPausingBackOffHandler
,它會暫停監聽器容器,直到退避時間過去,然後恢復容器。當延遲時間長於 consumer 屬性 max.poll.interval.ms
時,這非常有用。請注意,實際退避時間的精度會受到 pollTimeout
容器屬性的影響。
DefaultErrorHandler
這個新的錯誤處理器取代了 SeekToCurrentErrorHandler
和 RecoveringBatchErrorHandler
,它們在之前的幾個版本中一直是預設的錯誤處理器。一個區別是,對於 batch 監聽器(當丟擲 BatchListenerFailedException
以外的異常時),回退行為等同於重試完整的批次。
從 2.9 版本開始,DefaultErrorHandler 可以配置為提供與 seek 未處理記錄 offset 相同的語義(如下所述),但實際上並不執行 seek 操作。相反,記錄由監聽器容器保留,並在錯誤處理器退出後(以及執行單次暫停的 poll() 以保持 consumer 活躍後;如果使用了非阻塞重試或 ContainerPausingBackOffHandler ,暫停可能會持續多次 poll)重新提交給監聽器。錯誤處理器向容器返回一個結果,指示當前失敗的記錄是否可以重新提交,或者是否已恢復且不會再次傳送給監聽器。要啟用此模式,請將屬性 seekAfterError 設定為 false 。 |
錯誤處理器可以恢復(跳過)持續失敗的記錄。預設情況下,在失敗十次後,失敗的記錄會被記錄日誌(在 ERROR
級別)。您可以為處理器配置一個自定義的 recoverer (BiConsumer
) 和一個 BackOff
,用於控制投遞嘗試次數和每次嘗試之間的延遲。使用 FixedBackOff
並設定 FixedBackOff.UNLIMITED_ATTEMPTS
會導致(實際上是)無限重試。以下示例配置了失敗三次後進行恢復:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要使用此處理器的自定義例項配置監聽器容器,請將其新增到容器工廠。
例如,使用 @KafkaListener
容器工廠,您可以按如下方式新增 DefaultErrorHandler
:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
對於 record 監聽器,這將重試投遞最多 2 次(共 3 次嘗試),每次間隔 1 秒,而不是預設配置(FixedBackOff(0L, 9)
)。重試耗盡後,失敗只會被記錄日誌。
例如,如果 poll
返回六條記錄(分割槽 0、1、2 各兩條),並且監聽器在第四條記錄上丟擲異常,容器會透過提交前三條訊息的 offset 來確認它們。DefaultErrorHandler
會 seek 到分割槽 1 的 offset 1 和分割槽 2 的 offset 0。下一次 poll()
會返回三條未處理的記錄。
如果 AckMode
是 BATCH
,容器會在呼叫錯誤處理器之前提交前兩個分割槽的 offset。
對於 batch 監聽器,監聽器必須丟擲 BatchListenerFailedException
來指示批次中的哪些記錄失敗了。
事件序列如下:
-
提交索引之前的記錄的 offset。
-
如果重試次數未耗盡,則執行 seek 操作,以便所有剩餘記錄(包括失敗的記錄)都將重新投遞。
-
如果重試次數耗盡,嘗試恢復失敗的記錄(預設僅記錄日誌),並執行 seek 操作,以便剩餘記錄(不包括失敗的記錄)將重新投遞。已恢復記錄的 offset 會被提交。
-
如果重試次數耗盡且恢復失敗,則執行 seek 操作,就像重試次數未耗盡一樣。
從 2.9 版本開始,DefaultErrorHandler 可以配置為提供與 seek 未處理記錄 offset 相同的語義(如上所述),但實際上並不執行 seek 操作。相反,錯誤處理器建立一個新的 ConsumerRecords<?, ?> ,其中只包含未處理的記錄,然後將這些記錄提交給監聽器(在執行單次暫停的 poll() 以保持 consumer 活躍後)。要啟用此模式,請將屬性 seekAfterError 設定為 false 。 |
預設的 recoverer 在重試次數耗盡後記錄失敗的記錄。您可以使用自定義的 recoverer,或者使用框架提供的 recoverer,例如 DeadLetterPublishingRecoverer
。
使用 POJO batch 監聽器(例如 List<Thing>
)時,如果您沒有完整的 consumer record 可以新增到異常中,只需新增失敗記錄的索引即可:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < things.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
當容器配置為 AckMode.MANUAL_IMMEDIATE
時,錯誤處理器可以配置為提交已恢復記錄的 offset;將 commitRecovered
屬性設定為 true
。
另請參閱釋出死信記錄。
使用事務時,DefaultAfterRollbackProcessor
提供了類似的功能。請參閱回滾後處理器。
DefaultErrorHandler
認為某些異常是致命的,會跳過此類異常的重試;recoverer 在第一次失敗時就被呼叫。預設情況下,被認為是致命的異常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
因為這些異常在重試投遞時不太可能得到解決。
您可以將更多異常型別新增到不可重試類別中,或者完全替換已分類異常的對映。有關更多資訊,請參閱 DefaultErrorHandler.addNotRetryableException()
和 DefaultErrorHandler.setClassifications()
的 Javadoc,以及 spring-retry
的 BinaryExceptionClassifier
的 Javadoc。
以下是將 IllegalArgumentException
新增到不可重試異常的示例:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
錯誤處理器可以配置一個或多個 RetryListener
,接收重試和恢復進度的通知。從 2.8.10 版本開始,添加了用於 batch 監聽器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
有關更多資訊,請參閱 Javadoc。
如果 recoverer 失敗(丟擲異常),失敗的記錄將包含在 seek 操作中。從 2.5.5 版本開始,如果 recoverer 失敗,BackOff 預設情況下會重置,並且在再次嘗試恢復之前,重新投遞會再次經歷退避過程。在早期版本中,BackOff 不會重置,並在下一次失敗時再次嘗試恢復。要恢復到之前的行為,請將處理器的 resetStateOnRecoveryFailure 屬性設定為 false 。 |
從 2.6 版本開始,您現在可以為處理器提供一個 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
,以便根據失敗的記錄和/或異常來確定使用的 BackOff
:
handler.setBackOffFunction((record, ex) -> { ... });
如果函式返回 null
,則使用處理器的預設 BackOff
。
從 2.6.3 版本開始,將 resetStateOnExceptionChange
設定為 true
,如果異常型別在失敗之間發生變化,重試序列將重新啟動(包括選擇新的 BackOff
,如果已配置)。預設情況下,不考慮異常型別。
從 2.9 版本開始,預設情況下此值為 true
。
另請參閱投遞嘗試次數頭部。
Batch 錯誤處理器中的轉換錯誤
從 2.8 版本開始,使用 ByteArrayDeserializer
、BytesDeserializer
或 StringDeserializer
以及 DefaultErrorHandler
的 MessageConverter
時,batch 監聽器現在可以正確處理轉換錯誤。發生轉換錯誤時,payload 被設定為 null
,並且會將反序列化異常新增到記錄頭部中,類似於 ErrorHandlingDeserializer
。監聽器中提供了一個 ConversionException
列表,因此監聽器可以丟擲 BatchListenerFailedException
來指示發生轉換異常的第一個索引。
示例
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
重試完整的批次
當 batch 監聽器丟擲 BatchListenerFailedException
以外的異常時,這現在是 DefaultErrorHandler
的回退行為。
無法保證在批次重新投遞時,批次中的記錄數量相同和/或重新投遞的記錄順序相同。因此,很難為批次輕鬆維護重試狀態。FallbackBatchErrorHandler
採用以下方法。如果 batch 監聽器丟擲非 BatchListenerFailedException
的異常,則從記憶體中的記錄批次執行重試。為了避免在長時間的重試序列中發生 rebalance,錯誤處理器會暫停 consumer,在每次重試的退避睡眠之前進行 poll,然後再次呼叫監聽器。如果/當重試耗盡時,會為批次中的每條記錄呼叫 ConsumerRecordRecoverer
。如果 recoverer 丟擲異常,或者執行緒在其睡眠期間被中斷,則批次記錄將在下一次 poll 時重新投遞。在退出之前,無論結果如何,consumer 都會恢復。
此機制不能用於事務。 |
在等待 BackOff
間隔期間,錯誤處理器會迴圈短暫睡眠,直到達到期望的延遲,同時檢查容器是否已停止,以便睡眠能在 stop()
後儘快退出,而不是導致延遲。
停止容器錯誤處理器
如果監聽器丟擲異常,CommonContainerStoppingErrorHandler
會停止容器。對於 record 監聽器,當 AckMode
是 RECORD
時,已處理記錄的 offset 會被提交。對於 record 監聽器,當 AckMode
是任何手動值時,已確認記錄的 offset 會被提交。對於 record 監聽器,當 AckMode
是 BATCH
,或者對於 batch 監聽器,容器重新啟動時會重新處理整個批次。
容器停止後,會丟擲一個包裝了 ListenerExecutionFailedException
的異常。這是為了使事務回滾(如果啟用了事務)。
委託錯誤處理器
CommonDelegatingErrorHandler
可以根據異常型別委託給不同的錯誤處理器。例如,您可能希望對大多數異常呼叫 DefaultErrorHandler
,而對其他異常呼叫 CommonContainerStoppingErrorHandler
。
所有委託必須共享相同的相容屬性(ackAfterHandle
, seekAfterError
等)。
記錄日誌錯誤處理器
CommonLoggingErrorHandler
只記錄異常日誌;對於 record 監聽器,前一次 poll 中剩餘的記錄會傳遞給監聽器。對於 batch 監聽器,批次中的所有記錄都會被記錄日誌。
對 Record 和 Batch 監聽器使用不同的通用錯誤處理器
如果您希望對 record 和 batch 監聽器採用不同的錯誤處理策略,框架提供了 CommonMixedErrorHandler
,它允許為每種監聽器型別配置特定的錯誤處理器。
通用錯誤處理器總結
-
DefaultErrorHandler
-
CommonContainerStoppingErrorHandler
-
CommonDelegatingErrorHandler
-
CommonLoggingErrorHandler
-
CommonMixedErrorHandler
遺留錯誤處理器及其替代方案
遺留錯誤處理器 | 替代方案 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
沒有替代方案,使用配置了無限 |
|
|
|
沒有替代方案,使用 |
將自定義遺留錯誤處理器實現遷移到 CommonErrorHandler
請參閱 CommonErrorHandler
中的 Javadoc。
要替換 ErrorHandler
或 ConsumerAwareErrorHandler
實現,您應該實現 handleOne()
並讓 seeksAfterHandle()
返回 false
(預設值)。您還應該實現 handleOtherException()
來處理在記錄處理範圍之外發生的異常(例如 consumer 錯誤)。
要替換 RemainingRecordsErrorHandler
實現,您應該實現 handleRemaining()
並重寫 seeksAfterHandle()
以返回 true
(錯誤處理器必須執行必要的 seek 操作)。您還應該實現 handleOtherException()
- 以處理在記錄處理範圍之外發生的異常(例如 consumer 錯誤)。
要替換任何 BatchErrorHandler
實現,您應該實現 handleBatch()
。您還應該實現 handleOtherException()
- 以處理在記錄處理範圍之外發生的異常(例如 consumer 錯誤)。
回滾後處理器
使用事務時,如果監聽器丟擲異常(並且如果存在錯誤處理器,它也丟擲異常),事務將回滾。預設情況下,任何未處理的記錄(包括失敗的記錄)會在下一次 poll 時重新獲取。這是透過在 DefaultAfterRollbackProcessor
中執行 seek 操作實現的。對於 batch 監聽器,整個批次記錄會重新處理(容器不知道批次中哪個記錄失敗了)。要修改此行為,您可以為監聽器容器配置自定義的 AfterRollbackProcessor
。例如,對於基於 record 的監聽器,您可能希望跟蹤失敗的記錄並在嘗試一定次數後放棄,或許透過將其釋出到死信主題。
從 2.2 版本開始,DefaultAfterRollbackProcessor
現在可以恢復(跳過)持續失敗的記錄。預設情況下,在失敗十次後,失敗的記錄會被記錄日誌(在 ERROR
級別)。您可以為處理器配置自定義的 recoverer (BiConsumer
) 和最大失敗次數。將 maxFailures
屬性設定為負數會導致無限重試。以下示例配置了失敗三次後進行恢復:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
當您不使用事務時,可以透過配置 DefaultErrorHandler
實現類似功能。請參閱容器錯誤處理器。
從 3.2 版本開始,現在可以恢復(跳過)持續失敗的整個批次記錄。設定 ContainerProperties.setBatchRecoverAfterRollback(true)
來啟用此功能。
預設行為下,對於 batch 監聽器無法進行恢復,因為框架不知道批次中哪個記錄持續失敗。在這種情況下,應用程式監聽器必須處理持續失敗的記錄。 |
另請參閱釋出死信記錄。
從 2.2.5 版本開始,可以在新事務中呼叫 DefaultAfterRollbackProcessor
(在失敗事務回滾後啟動)。然後,如果您使用 DeadLetterPublishingRecoverer
釋出失敗的記錄,處理器會將已恢復記錄在原始 topic/partition 中的 offset 傳送到該事務。要啟用此功能,請在 DefaultAfterRollbackProcessor
上設定 commitRecovered
和 kafkaTemplate
屬性。
如果 recoverer 失敗(丟擲異常),失敗的記錄將包含在 seek 操作中。從 2.5.5 版本開始,如果 recoverer 失敗,BackOff 預設情況下會重置,並且在再次嘗試恢復之前,重新投遞會再次經歷退避過程。在早期版本中,BackOff 不會重置,並在下一次失敗時再次嘗試恢復。要恢復到之前的行為,請將處理器的 resetStateOnRecoveryFailure 屬性設定為 false 。 |
從 2.6 版本開始,您現在可以為處理器提供一個 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
,以便根據失敗的記錄和/或異常來確定使用的 BackOff
:
handler.setBackOffFunction((record, ex) -> { ... });
如果函式返回 null
,則使用處理器的預設 BackOff
。
從 2.6.3 版本開始,將 resetStateOnExceptionChange
設定為 true
,如果異常型別在失敗之間發生變化,重試序列將重新啟動(包括選擇新的 BackOff
,如果已配置)。預設情況下,不考慮異常型別。
從 2.3.1 版本開始,與 DefaultErrorHandler
類似,DefaultAfterRollbackProcessor
認為某些異常是致命的,會跳過此類異常的重試;recoverer 在第一次失敗時就被呼叫。預設情況下,被認為是致命的異常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
因為這些異常在重試投遞時不太可能得到解決。
您可以將更多異常型別新增到不可重試類別中,或者完全替換已分類異常的對映。有關更多資訊,請參閱 DefaultAfterRollbackProcessor.setClassifications()
的 Javadoc,以及 spring-retry
的 BinaryExceptionClassifier
的 Javadoc。
以下是將 IllegalArgumentException
新增到不可重試異常的示例:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另請參閱投遞嘗試次數頭部。
使用當前的 kafka-clients ,容器無法檢測到 ProducerFencedException 是由 rebalance 引起的,還是由於超時或過期導致 producer 的 transactional.id 被撤銷。因為在大多數情況下,它是由 rebalance 引起的,容器不會呼叫 AfterRollbackProcessor (因為對分割槽執行 seek 操作不合適,因為我們不再被分配這些分割槽)。如果您確保超時時間足夠長以處理每個事務,並定期執行“空”事務(例如,透過 ListenerContainerIdleEvent ),則可以避免因超時和過期導致的 fencing。或者,您可以將 stopContainerWhenFenced 容器屬性設定為 true ,容器將停止,避免記錄丟失。您可以消費 ConsumerStoppedEvent 並檢查其 Reason 屬性是否為 FENCED 來檢測此情況。由於事件還包含對容器的引用,您可以使用此事件重新啟動容器。 |
從 2.7 版本開始,在等待 BackOff
間隔期間,錯誤處理器會迴圈短暫睡眠,直到達到期望的延遲,同時檢查容器是否已停止,以便睡眠能在 stop()
後儘快退出,而不是導致延遲。
從 2.7 版本開始,處理器可以配置一個或多個 RetryListener
,接收重試和恢復進度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有關更多資訊,請參閱 Javadoc。
投遞嘗試次數頭部
以下內容僅適用於 record 監聽器,不適用於 batch 監聽器。
從 2.5 版本開始,使用實現 DeliveryAttemptAware
的 ErrorHandler
或 AfterRollbackProcessor
時,可以將 KafkaHeaders.DELIVERY_ATTEMPT
頭部 (kafka_deliveryAttempt
) 新增到記錄中。此頭部的值是一個從 1 開始遞增的整數。接收原始 ConsumerRecord<?, ?>
時,該整數儲存在 byte[4]
中。
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
使用 @KafkaListener
並配合 DefaultKafkaHeaderMapper
或 SimpleKafkaHeaderMapper
時,可以透過向監聽器方法新增引數 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery
來獲取該頭部。
要啟用此頭部的填充,請將容器屬性 deliveryAttemptHeader
設定為 true
。預設情況下停用此功能,以避免查詢每條記錄的狀態並新增頭部帶來的(較小的)開銷。
DefaultErrorHandler
和 DefaultAfterRollbackProcessor
支援此功能。
Batch 監聽器的投遞嘗試次數頭部
使用 BatchListener
處理 ConsumerRecord
時,與 SingleRecordListener
相比,KafkaHeaders.DELIVERY_ATTEMPT
頭部可能以不同的方式存在。
從 3.3 版本開始,如果您希望在使用 BatchListener
時將 KafkaHeaders.DELIVERY_ATTEMPT
頭部注入到 ConsumerRecord
中,請將 DeliveryAttemptAwareRetryListener
設定為 ErrorHandler
中的 RetryListener
。
請參考以下程式碼。
final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
然後,每當批次未能完成時,DeliveryAttemptAwareRetryListener
會將 KafkaHeaders.DELIVERY_ATTMPT
頭部注入到 ConsumerRecord
中。
監聽器資訊頭部
在某些情況下,能夠知道監聽器執行在哪個容器中很有用。
從 2.8.4 版本開始,您現在可以在監聽器容器上設定 listenerInfo
屬性,或在 @KafkaListener
註解上設定 info
屬性。然後,容器會將此資訊新增到所有入站訊息的 KafkaListener.LISTENER_INFO
頭部中;此頭部可用於 record 攔截器、過濾器等,或在監聽器本身中使用。
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
在 RecordInterceptor
或 RecordFilterStrategy
實現中使用時,頭部以位元組陣列形式存在於 consumer record 中,使用 KafkaListenerAnnotationBeanPostProcessor
的 charSet
屬性進行轉換。
頭部對映器在從 consumer record 建立 MessageHeaders
時也會轉換為 String
,並且永遠不會在出站記錄上對映此頭部。
對於 POJO batch 監聽器,從 2.8.6 版本開始,頭部會複製到批次中的每個成員,並在轉換後也作為單個 String
引數可用。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果 batch 監聽器有過濾器且過濾結果為空批次,您需要在 @Header 引數中新增 required = false ,因為空批次不提供該資訊。 |
如果您接收到 List<Message<Thing>>
,則資訊位於每個 Message<?>
的 KafkaHeaders.LISTENER_INFO
頭部中。
有關消費批次的更多資訊,請參閱Batch 監聽器。
釋出死信記錄
您可以使用記錄恢復器配置 DefaultErrorHandler
和 DefaultAfterRollbackProcessor
,以便在記錄達到最大失敗次數時進行處理。該框架提供了 DeadLetterPublishingRecoverer
,它將失敗的訊息釋出到另一個主題。此恢復器需要一個 KafkaTemplate<Object, Object>
,用於傳送記錄。您還可以選擇性地配置一個 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
,它會被呼叫來解析目標主題和分割槽。
預設情況下,死信記錄會被髮送到名為 <originalTopic>-dlt 的主題(原始主題名稱後附加 -dlt ),併發送到與原始記錄相同的分割槽。因此,當您使用預設解析器時,死信主題必須至少擁有與原始主題一樣多的分割槽。 |
如果返回的 TopicPartition
的分割槽為負數,則在 ProducerRecord
中不會設定分割槽,分割槽將由 Kafka 選擇。從版本 2.2.4 開始,任何 ListenerExecutionFailedException
(例如,在 @KafkaListener
方法中檢測到異常時丟擲)都增強了 groupId
屬性。這使得目標解析器除了使用 ConsumerRecord
中的資訊外,還可以利用此屬性來選擇死信主題。
以下示例展示瞭如何配置一個自定義目標解析器
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
傳送到死信主題的記錄會增強以下頭部(headers)
-
KafkaHeaders.DLT_EXCEPTION_FQCN
:異常類名(通常是ListenerExecutionFailedException
,但也可以是其他)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN
:異常原因的類名,如果存在的話(從版本 2.8 開始)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE
:異常的堆疊跟蹤。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE
:異常訊息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN
:異常類名(僅限鍵反序列化錯誤)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE
:異常的堆疊跟蹤(僅限鍵反序列化錯誤)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
:異常訊息(僅限鍵反序列化錯誤)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC
:原始主題。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION
:原始分割槽。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET
:原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP
:原始時間戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE
:原始時間戳型別。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP
:未能處理該記錄的原始消費者組(從版本 2.8 開始)。
鍵異常僅由 DeserializationException
引起,因此沒有 DLT_KEY_EXCEPTION_CAUSE_FQCN
。
有兩種機制可以新增更多頭部。
-
子類化恢復器並覆蓋
createProducerRecord()
方法 - 呼叫super.createProducerRecord()
並新增更多頭部。 -
提供一個
BiFunction
來接收消費者記錄和異常,並返回一個Headers
物件;這些頭部將複製到最終的生產者記錄中;另請參閱 管理死信記錄頭部。使用setHeadersFunction()
方法設定此BiFunction
。
第二種實現起來更簡單,但第一種提供了更多可用資訊,包括已經組裝好的標準頭部。
從版本 2.3 開始,當與 ErrorHandlingDeserializer
一起使用時,釋出器將在死信生產者記錄中恢復記錄的 value()
為原始的、未能反序列化的值。在此之前,value()
是 null,使用者程式碼必須從訊息頭部中解碼 DeserializationException
。此外,您可以向釋出器提供多個 KafkaTemplate
;例如,如果您想釋出 DeserializationException
中的 byte[]
,以及使用與成功反序列化的記錄不同的序列化器釋出值,這可能就需要這樣做。以下是使用 String
和 byte[]
序列化器配置釋出器的示例
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
釋出器使用 Map 的鍵來查詢適用於即將釋出的值(value()
)的模板。建議使用 LinkedHashMap
,以便按順序檢查鍵。
釋出 null
值且存在多個模板時,恢復器將查詢適用於 Void
類的模板;如果不存在,將使用 values().iterator()
中的第一個模板。
從 2.7 版本開始,您可以使用 setFailIfSendResultIsError
方法,以便在訊息釋出失敗時丟擲異常。您還可以使用 setWaitForSendResultTimeout
方法設定傳送器成功驗證的超時時間。
如果恢復器失敗(丟擲異常),失敗的記錄將包含在 seeks 操作中。從版本 2.5.5 開始,如果恢復器失敗,預設情況下將重置 BackOff ,並且在再次嘗試恢復之前,重新投遞將再次經歷回退過程。在早期版本中,BackOff 未被重置,並在下次失敗時再次嘗試恢復。要恢復到先前的行為,請將錯誤處理器的 resetStateOnRecoveryFailure 屬性設定為 false 。 |
從 2.6.3 版本開始,將 resetStateOnExceptionChange
設定為 true
,如果異常型別在失敗之間發生變化,重試序列將重新啟動(包括選擇新的 BackOff
,如果已配置)。預設情況下,不考慮異常型別。
從版本 2.3 開始,恢復器也可以用於 Kafka Streams - 有關更多資訊,請參閱 從反序列化異常中恢復。
ErrorHandlingDeserializer
在頭部 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER
和 ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER
中新增反序列化異常(使用 Java 序列化)。預設情況下,這些頭部不會保留在釋出到死信主題的訊息中。從版本 2.7 開始,如果鍵和值都反序列化失敗,則兩者在傳送到 DLT 的記錄中都會填充原始值。
如果入站記錄相互依賴,但可能亂序到達,將失敗的記錄重新發布到原始主題的末尾(若干次)可能比直接將其傳送到死信主題更有用。有關示例,請參閱 此 Stack Overflow 問題。
以下錯誤處理器配置將完全實現這一點
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic-dlt", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
從版本 2.7 開始,恢復器檢查目標解析器選擇的分割槽是否存在。如果分割槽不存在,ProducerRecord
中的分割槽將設定為 null
,允許 KafkaProducer
選擇分割槽。您可以透過將 verifyPartition
屬性設定為 false
來停用此檢查。
從版本 3.1 開始,將 logRecoveryRecord
屬性設定為 true
將記錄恢復記錄和異常。
管理死信記錄頭部
-
appendOriginalHeaders
(預設true
) -
stripPreviousExceptionHeaders
(自版本 2.8 起預設true
)
Apache Kafka 支援多個同名頭部;要獲取“最新”值,您可以使用 headers.lastHeader(headerName)
;要獲取多個頭部的迭代器,請使用 headers.headers(headerName).iterator()
。
當重複釋出失敗的記錄時,這些頭部可能會增長(並最終由於 RecordTooLargeException
導致釋出失敗);異常頭部尤其如此,特別是堆疊跟蹤頭部。
這兩個屬性的原因是,雖然您可能只想保留最後的異常資訊,但您可能希望保留記錄在每次失敗時經過哪些主題的歷史記錄。
appendOriginalHeaders
應用於所有名為 ORIGINAL
的頭部,而 stripPreviousExceptionHeaders
應用於所有名為 EXCEPTION
的頭部。
從版本 2.8.4 開始,您現在可以控制將哪些標準頭部新增到輸出記錄中。請參閱 enum HeadersToAdd
以瞭解預設新增的(當前)10 個標準頭部的通用名稱(這些不是實際的頭部名稱,只是一個抽象;實際的頭部名稱由 getHeaderNames()
方法設定,子類可以覆蓋此方法)。
要排除頭部,請使用 excludeHeaders()
方法;例如,要禁止在頭部中新增異常堆疊跟蹤,請使用
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以透過新增一個 ExceptionHeadersCreator
來完全自定義異常頭部的新增;這也會停用所有標準異常頭部。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同樣從版本 2.8.4 開始,您現在可以透過 addHeadersFunction
方法提供多個頭部函式。這允許應用額外的函式,即使已經註冊了另一個函式,例如在使用 非阻塞重試 時。
另請參閱 使用非阻塞重試進行失敗頭部管理。
ExponentialBackOffWithMaxRetries
實現
Spring Framework 提供了許多 BackOff
實現。預設情況下,ExponentialBackOff
會無限重試;要在若干次重試嘗試後放棄,需要計算 maxElapsedTime
。從版本 2.7.3 開始,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries
,它是 ExponentialBackOff
的一個子類,接收 maxRetries
屬性並自動計算 maxElapsedTime
,這會更方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
這將在 1、2、4、8、10、10 秒後重試,然後呼叫恢復器。