錯誤處理
Apache Kafka Streams 提供了原生處理反序列化錯誤異常的能力。有關此支援的詳細資訊,請參閱此連結。開箱即用地,Apache Kafka Streams 提供了兩種反序列化異常處理器 - LogAndContinueExceptionHandler
和 LogAndFailExceptionHandler
。顧名思義,前者將記錄錯誤並繼續處理下一條記錄,而後者將記錄錯誤並失敗。LogAndFailExceptionHandler
是預設的反序列化異常處理器。
在 Binder 中處理反序列化異常
Kafka Streams Binder 允許使用以下屬性指定上述反序列化異常處理器。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述兩種反序列化異常處理器,Binder 還提供了第三種,用於將錯誤記錄(毒丸訊息)傳送到 DLQ(死信佇列)主題。啟用此 DLQ 異常處理器的方法如下。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
設定上述屬性後,所有反序列化錯誤的記錄都會自動傳送到 DLQ 主題。
您可以如下設定釋出 DLQ 訊息的主題名稱。
您可以提供 DlqDestinationResolver
的實現,它是一個函式式介面。DlqDestinationResolver
接受 ConsumerRecord
和異常作為輸入,然後允許指定一個主題名稱作為輸出。透過訪問 Kafka ConsumerRecord
,可以在 BiFunction
的實現中檢查頭記錄。
以下是提供 DlqDestinationResolver
實現的一個示例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在提供 DlqDestinationResolver
實現時,需要記住一件重要的事情是,Binder 中的 provisioner 不會自動為應用程式建立主題。這是因為 Binder 無法推斷出實現可能傳送到的所有 DLQ 主題的名稱。因此,如果使用此策略提供 DLQ 名稱,應用程式有責任確保這些主題事先建立。
如果 DlqDestinationResolver
在應用程式中作為 bean 存在,則其優先順序更高。如果您不想遵循此方法,而是希望使用配置提供靜態 DLQ 名稱,則可以設定以下屬性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果設定了此屬性,則錯誤記錄將傳送到主題 custom-dlq
。如果應用程式未使用上述任何一種策略,它將建立一個名為 error.<input-topic-name>.<application-id>
的 DLQ 主題。例如,如果您的繫結的目標主題是 inputTopic
,並且應用程式 ID 是 process-applicationId
,則預設的 DLQ 主題是 error.inputTopic.process-applicationId
。如果您打算啟用 DLQ,始終建議為每個輸入繫結顯式建立一個 DLQ 主題。
每個輸入消費者繫結的 DLQ
屬性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
適用於整個應用程式。這意味著如果同一個應用程式中有多個函式,則此屬性將應用於所有這些函式。但是,如果單個 Processor 中有多個 Processor 或多個輸入繫結,則可以使用 Binder 為每個輸入消費者繫結提供的更細粒度的 DLQ 控制。
如果您有以下 Processor:
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
並且您只想在第一個輸入繫結上啟用 DLQ,在第二個繫結上啟用 skipAndContinue,那麼可以在消費者上如下操作。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
以這種方式設定反序列化異常處理器比在 Binder 級別設定具有更高的優先順序。
DLQ 分割槽
預設情況下,記錄使用與原始記錄相同的分割槽釋出到死信主題。這意味著死信主題必須至少有與原始記錄相同數量的分割槽。
要更改此行為,請將 DlqPartitionFunction
實現作為 @Bean
新增到應用程式上下文中。只能存在一個此類 bean。該函式提供消費者組(在大多數情況下與應用程式 ID 相同)、失敗的 ConsumerRecord
和異常。例如,如果您總是想路由到分割槽 0,您可以使用
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果將消費者繫結的 dlqPartitions 屬性設定為 1(並且 Binder 的 minPartitionCount 等於 1 ),則無需提供 DlqPartitionFunction ;框架將始終使用分割槽 0。如果將消費者繫結的 dlqPartitions 屬性設定為大於 1 的值(或 Binder 的 minPartitionCount 大於 1 ),則即使分割槽數量與原始主題相同,您也**必須**提供一個 DlqPartitionFunction bean。 |
使用 Kafka Streams Binder 的異常處理功能時需要記住幾點。
-
屬性
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
適用於整個應用程式。這意味著如果同一個應用程式中有多個函式,則此屬性將應用於所有這些函式。 -
反序列化的異常處理與原生反序列化和框架提供的訊息轉換一致。
在 Binder 中處理生產異常
與上述描述的反序列化異常處理器的支援不同,Binder 沒有提供這種一流的機制來處理生產異常。但是,您仍然可以使用 StreamsBuilderFactoryBean
定製器配置生產異常處理器,有關更多詳細資訊,請參閱下面的後續部分。
執行時錯誤處理
處理應用程式程式碼中的錯誤,即業務邏輯執行中的錯誤時,通常由應用程式負責處理。因為 Kafka Streams Binder 沒有辦法干預應用程式程式碼。然而,為了讓應用程式更容易一些,Binder 提供了一個方便的 RecordRecoverableProcessor
,您可以使用它來指定如何處理應用程式級別的錯誤。
考慮以下程式碼。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.map(...);
}
如果上面您的 map
呼叫中的業務程式碼丟擲異常,則您有責任處理該錯誤。這就是 RecordRecoverableProcessor
變得方便的地方。預設情況下,RecordRecoverableProcessor
將只記錄錯誤並讓應用程式繼續。假設您想將失敗的記錄釋出到 DLT,而不是在應用程式內部處理它。在這種情況下,您必須使用 RecordRecoverableProcessor
的自定義實現,稱為 DltAwareProcessor
。您可以這樣做:
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
return input -> input
.process(() -> new DltAwareProcessor<>(record -> {
throw new RuntimeException("error");
}, "hello-dlt-1", dltPublishingContext));
}
原始 map
呼叫中的業務邏輯程式碼現在已作為 KStream#process
方法呼叫的一部分移動,該方法接受一個 ProcessorSupplier
。然後,我們傳入自定義的 DltAwareProcessor
,它能夠釋出到 DLT。上面 DltAwareProcessor
的建構函式接受三個引數 - 一個接受輸入記錄並將業務邏輯操作作為 Function
主體一部分的 Function
,DLT 主題,以及最後的 DltPublishingContext
。當 Function
的 lambda 表示式丟擲異常時,DltAwareProcessor
會將輸入記錄傳送到 DLT。DltPublishingContext
為 DltAwareProcessor
提供必要的釋出基礎設施 bean。DltPublishingContext
由 Binder 自動配置,因此您可以直接將其注入到應用程式中。
如果您不希望 Binder 將失敗的記錄釋出到 DLT,則必須直接使用 RecordRecoverableProcessor
而不是 DltAwareProcessor
。您可以提供自己的 recoverer 作為 BiConsumer
,它將輸入 Record
和異常作為引數。假設一個場景,您不想將記錄傳送到 DLT,而只想簡單地記錄訊息然後繼續。下面是如何實現該目標的一個示例。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.process(() -> new RecordRecoverableProcessor<>(record -> {
throw new RuntimeException("error");
},
(record, exception) -> {
// Handle the record
}));
}
在這種情況下,當記錄失敗時,RecordRecoverableProcessor
使用使用者提供的 recoverer,它是一個 BiConsumer
,接受失敗的記錄和丟擲的異常作為引數。
在 DltAwareProcessor 中處理記錄鍵
使用 DltAwareProcessor
將失敗記錄傳送到 DLT 時,如果您想將記錄鍵傳送到 DLT 主題,則需要在 DLT 繫結上設定正確的序列化器。這是因為 DltAwareProcessor
使用 StreamBridge
,它使用常規的 Kafka Binder(基於訊息通道),預設情況下對鍵使用 ByteArraySerializer
。對於記錄值,Spring Cloud Stream 會將有效載荷轉換為適當的 byte[]
;但是,對於鍵,情況並非如此,因為它只是傳遞在頭部接收到的內容作為鍵。如果您提供的鍵不是位元組陣列,則可能會導致類轉換異常,為避免這種情況,您需要在 DLT 繫結上如下設定序列化器。
假設 DLT 目標是 hello-dlt-1
並且記錄鍵是 String 資料型別。
spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer