錯誤處理
Apache Kafka Streams 提供了原生處理反序列化錯誤的異常處理能力。有關此支援的詳細資訊,請參閱 此文。開箱即用,Apache Kafka Streams 提供了兩種反序列化異常處理器 - LogAndContinueExceptionHandler 和 LogAndFailExceptionHandler。顧名思義,前者將記錄錯誤並繼續處理下一個記錄,而後者將記錄錯誤並失敗。LogAndFailExceptionHandler 是預設的反序列化異常處理器。
在繫結器中處理反序列化異常
Kafka Streams 繫結器允許使用以下屬性指定上述反序列化異常處理器。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述兩種反序列化異常處理器,繫結器還提供了第三種,用於將錯誤記錄(毒丸)傳送到 DLQ(死信佇列)主題。以下是啟用此 DLQ 異常處理器的方法。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
當設定上述屬性時,所有反序列化錯誤的記錄將自動傳送到 DLQ 主題。
您可以設定 DLQ 訊息釋出的議題名稱,如下所示。
您可以提供 DlqDestinationResolver 的實現,它是一個函式式介面。DlqDestinationResolver 將 ConsumerRecord 和異常作為輸入,然後允許指定主題名稱作為輸出。透過訪問 Kafka ConsumerRecord,可以在 BiFunction 的實現中內省頭記錄。
以下是提供 DlqDestinationResolver 實現的示例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在提供 DlqDestinationResolver 的實現時,需要記住一件重要的事情,那就是繫結器中的 provisioner 不會自動為應用程式建立主題。這是因為繫結器無法推斷出實現可能傳送到的所有 DLQ 主題的名稱。因此,如果您使用此策略提供 DLQ 名稱,應用程式有責任確保這些主題事先已建立。
如果應用程式中存在 DlqDestinationResolver 作為 bean,則其優先順序更高。如果您不想遵循此方法,而是希望使用配置提供靜態 DLQ 名稱,則可以設定以下屬性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果設定了此屬性,則錯誤記錄將傳送到主題 custom-dlq。如果應用程式未使用上述任何一種策略,則它將建立一個名為 error.<input-topic-name>.<application-id> 的 DLQ 主題。例如,如果您的繫結的目標主題是 inputTopic 且應用程式 ID 是 process-applicationId,則預設 DLQ 主題為 error.inputTopic.process-applicationId。如果您打算啟用 DLQ,始終建議為每個輸入繫結明確建立一個 DLQ 主題。
每個輸入消費者繫結一個 DLQ
屬性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 適用於整個應用程式。這意味著,如果同一個應用程式中有多個函式,此屬性將應用於所有這些函式。但是,如果單個處理器中有多個處理器或多個輸入繫結,則可以使用繫結器為每個輸入消費者繫結提供的更細粒度的 DLQ 控制。
如果您有以下處理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
並且您只想在第一個輸入繫結上啟用 DLQ,並在第二個繫結上跳過並繼續,那麼您可以在消費者上按如下方式進行操作。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
以這種方式設定反序列化異常處理器的優先順序高於在繫結器級別設定。
DLQ 分割槽
預設情況下,記錄釋出到死信主題時使用與原始記錄相同的分割槽。這意味著死信主題必須至少有與原始記錄相同數量的分割槽。
要更改此行為,請將 DlqPartitionFunction 實現作為 @Bean 新增到應用程式上下文中。只能有一個這樣的 bean。該函式提供消費者組(在大多數情況下與應用程式 ID 相同)、失敗的 ConsumerRecord 和異常。例如,如果您總是希望路由到分割槽 0,則可以使用
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果您將消費者繫結的 dlqPartitions 屬性設定為 1(並且繫結的 minPartitionCount 等於 1),則無需提供 DlqPartitionFunction;框架將始終使用分割槽 0。如果您將消費者繫結的 dlqPartitions 屬性設定為大於 1 的值(或繫結的 minPartitionCount 大於 1),則您必須提供一個 DlqPartitionFunction bean,即使分割槽計數與原始主題相同。 |
在使用 Kafka Streams 繫結器中的異常處理功能時,需要注意以下幾點。
-
屬性
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler適用於整個應用程式。這意味著,如果同一個應用程式中有多個函式,此屬性將應用於所有這些函式。 -
反序列化的異常處理與原生反序列化和框架提供的訊息轉換保持一致。
在繫結器中處理生產異常
與上述反序列化異常處理器的支援不同,繫結器不提供此類一流的機制來處理生產異常。但是,您仍然可以使用 StreamsBuilderFactoryBean 自定義器配置生產異常處理器,您可以在下面的後續部分中找到更多詳細資訊。
執行時錯誤處理
當涉及到處理應用程式程式碼(即業務邏輯執行)中的錯誤時,通常由應用程式來處理。因為 Kafka Streams 繫結器無法干擾應用程式程式碼。但是,為了使應用程式更容易一些,繫結器提供了一個方便的 RecordRecoverableProcessor,透過它,您可以決定如何處理應用程式級別的錯誤。
考慮以下程式碼。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.map(...);
}
如果上述 map 呼叫中的業務程式碼丟擲異常,則您有責任處理該錯誤。這就是 RecordRecoverableProcessor 派上用場的地方。預設情況下,RecordRecoverableProcessor 只會簡單地記錄錯誤並讓應用程式繼續。假設您想將失敗的記錄釋出到 DLT,而不是在應用程式內部處理它。在這種情況下,您必須使用 RecordRecoverableProcessor 的自定義實現,稱為 DltAwareProcessor。以下是實現方法。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
return input -> input
.process(() -> new DltAwareProcessor<>(record -> {
throw new RuntimeException("error");
}, "hello-dlt-1", dltPublishingContext));
}
原始 map 呼叫中的業務邏輯程式碼現在已作為 KStream#process 方法呼叫的一部分移動,該方法採用 ProcessorSupplier。然後,我們傳入自定義的 DltAwareProcessor,它能夠釋出到 DLT。上述 DltAwareProcessor 的建構函式採用三個引數 - 一個 Function,它將輸入記錄作為輸入,然後將業務邏輯操作作為 Function 主體的一部分,DLT 主題,最後是 DltPublishingContext。當 Function 的 lambda 表示式丟擲異常時,DltAwareProcessor 將把輸入記錄傳送到 DLT。DltPublishingContext 為 DltAwareProcessor 提供了必要的釋出基礎設施 bean。DltPublishingContext 由繫結器自動配置,因此您可以直接將其注入到應用程式中。
如果您不希望繫結器將失敗的記錄釋出到 DLT,那麼您必須直接使用 RecordRecoverableProcessor 而不是 DltAwareProcessor。您可以提供自己的恢復器,它是一個 BiConsumer,將輸入 Record 和異常作為引數。假設一個場景,您不想將記錄傳送到 DLT,而只是簡單地記錄訊息並繼續。下面是一個如何實現此目的的示例。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.process(() -> new RecordRecoverableProcessor<>(record -> {
throw new RuntimeException("error");
},
(record, exception) -> {
// Handle the record
}));
}
在這種情況下,當記錄失敗時,RecordRecoverableProcessor 使用使用者提供的恢復器,它是一個 BiConsumer,將失敗的記錄和丟擲的異常作為引數。
在 DltAwareProcessor 中處理記錄鍵
當使用 DltAwareProcessor 將失敗的記錄傳送到 DLT 時,如果您想將記錄鍵傳送到 DLT 主題,則需要在 DLT 繫結上設定正確的序列化器。這是因為 DltAwareProcessor 使用 StreamBridge,它使用常規的 Kafka 繫結器(基於訊息通道),該繫結器預設使用 ByteArraySerializer 作為鍵。對於記錄值,Spring Cloud Stream 將有效負載轉換為正確的 byte[];但是,對於鍵則不然,它只是傳遞它在頭部收到的內容作為鍵。如果您提供非位元組陣列鍵,則可能會導致類轉換異常,為了避免這種情況,您需要如下在 DLT 繫結上設定序列化器。
假設 DLT 目標是 hello-dlt-1 並且記錄鍵是 String 資料型別。
spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer