錯誤處理

Apache Kafka Streams 提供了原生處理反序列化錯誤異常的能力。有關此支援的詳細資訊,請參閱此連結。開箱即用地,Apache Kafka Streams 提供了兩種反序列化異常處理器 - LogAndContinueExceptionHandlerLogAndFailExceptionHandler。顧名思義,前者將記錄錯誤並繼續處理下一條記錄,而後者將記錄錯誤並失敗。LogAndFailExceptionHandler 是預設的反序列化異常處理器。

在 Binder 中處理反序列化異常

Kafka Streams Binder 允許使用以下屬性指定上述反序列化異常處理器。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

除了上述兩種反序列化異常處理器,Binder 還提供了第三種,用於將錯誤記錄(毒丸訊息)傳送到 DLQ(死信佇列)主題。啟用此 DLQ 異常處理器的方法如下。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

設定上述屬性後,所有反序列化錯誤的記錄都會自動傳送到 DLQ 主題。

您可以如下設定釋出 DLQ 訊息的主題名稱。

您可以提供 DlqDestinationResolver 的實現,它是一個函式式介面。DlqDestinationResolver 接受 ConsumerRecord 和異常作為輸入,然後允許指定一個主題名稱作為輸出。透過訪問 Kafka ConsumerRecord,可以在 BiFunction 的實現中檢查頭記錄。

以下是提供 DlqDestinationResolver 實現的一個示例。

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在提供 DlqDestinationResolver 實現時,需要記住一件重要的事情是,Binder 中的 provisioner 不會自動為應用程式建立主題。這是因為 Binder 無法推斷出實現可能傳送到的所有 DLQ 主題的名稱。因此,如果使用此策略提供 DLQ 名稱,應用程式有責任確保這些主題事先建立。

如果 DlqDestinationResolver 在應用程式中作為 bean 存在,則其優先順序更高。如果您不想遵循此方法,而是希望使用配置提供靜態 DLQ 名稱,則可以設定以下屬性。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

如果設定了此屬性,則錯誤記錄將傳送到主題 custom-dlq。如果應用程式未使用上述任何一種策略,它將建立一個名為 error.<input-topic-name>.<application-id> 的 DLQ 主題。例如,如果您的繫結的目標主題是 inputTopic,並且應用程式 ID 是 process-applicationId,則預設的 DLQ 主題是 error.inputTopic.process-applicationId。如果您打算啟用 DLQ,始終建議為每個輸入繫結顯式建立一個 DLQ 主題。

每個輸入消費者繫結的 DLQ

屬性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 適用於整個應用程式。這意味著如果同一個應用程式中有多個函式,則此屬性將應用於所有這些函式。但是,如果單個 Processor 中有多個 Processor 或多個輸入繫結,則可以使用 Binder 為每個輸入消費者繫結提供的更細粒度的 DLQ 控制。

如果您有以下 Processor:

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

並且您只想在第一個輸入繫結上啟用 DLQ,在第二個繫結上啟用 skipAndContinue,那麼可以在消費者上如下操作。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue

以這種方式設定反序列化異常處理器比在 Binder 級別設定具有更高的優先順序。

DLQ 分割槽

預設情況下,記錄使用與原始記錄相同的分割槽釋出到死信主題。這意味著死信主題必須至少有與原始記錄相同數量的分割槽。

要更改此行為,請將 DlqPartitionFunction 實現作為 @Bean 新增到應用程式上下文中。只能存在一個此類 bean。該函式提供消費者組(在大多數情況下與應用程式 ID 相同)、失敗的 ConsumerRecord 和異常。例如,如果您總是想路由到分割槽 0,您可以使用

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果將消費者繫結的 dlqPartitions 屬性設定為 1(並且 Binder 的 minPartitionCount 等於 1),則無需提供 DlqPartitionFunction;框架將始終使用分割槽 0。如果將消費者繫結的 dlqPartitions 屬性設定為大於 1 的值(或 Binder 的 minPartitionCount 大於 1),則即使分割槽數量與原始主題相同,您也**必須**提供一個 DlqPartitionFunction bean。

使用 Kafka Streams Binder 的異常處理功能時需要記住幾點。

  • 屬性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 適用於整個應用程式。這意味著如果同一個應用程式中有多個函式,則此屬性將應用於所有這些函式。

  • 反序列化的異常處理與原生反序列化和框架提供的訊息轉換一致。

在 Binder 中處理生產異常

與上述描述的反序列化異常處理器的支援不同,Binder 沒有提供這種一流的機制來處理生產異常。但是,您仍然可以使用 StreamsBuilderFactoryBean 定製器配置生產異常處理器,有關更多詳細資訊,請參閱下面的後續部分。

執行時錯誤處理

處理應用程式程式碼中的錯誤,即業務邏輯執行中的錯誤時,通常由應用程式負責處理。因為 Kafka Streams Binder 沒有辦法干預應用程式程式碼。然而,為了讓應用程式更容易一些,Binder 提供了一個方便的 RecordRecoverableProcessor,您可以使用它來指定如何處理應用程式級別的錯誤。

考慮以下程式碼。

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .map(...);
}

如果上面您的 map 呼叫中的業務程式碼丟擲異常,則您有責任處理該錯誤。這就是 RecordRecoverableProcessor 變得方便的地方。預設情況下,RecordRecoverableProcessor 將只記錄錯誤並讓應用程式繼續。假設您想將失敗的記錄釋出到 DLT,而不是在應用程式內部處理它。在這種情況下,您必須使用 RecordRecoverableProcessor 的自定義實現,稱為 DltAwareProcessor。您可以這樣做:

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
    return input -> input
        .process(() -> new DltAwareProcessor<>(record -> {
					throw new RuntimeException("error");
				}, "hello-dlt-1", dltPublishingContext));
}

原始 map 呼叫中的業務邏輯程式碼現在已作為 KStream#process 方法呼叫的一部分移動,該方法接受一個 ProcessorSupplier。然後,我們傳入自定義的 DltAwareProcessor,它能夠釋出到 DLT。上面 DltAwareProcessor 的建構函式接受三個引數 - 一個接受輸入記錄並將業務邏輯操作作為 Function 主體一部分的 Function,DLT 主題,以及最後的 DltPublishingContext。當 Function 的 lambda 表示式丟擲異常時,DltAwareProcessor 會將輸入記錄傳送到 DLT。DltPublishingContextDltAwareProcessor 提供必要的釋出基礎設施 bean。DltPublishingContext 由 Binder 自動配置,因此您可以直接將其注入到應用程式中。

如果您不希望 Binder 將失敗的記錄釋出到 DLT,則必須直接使用 RecordRecoverableProcessor 而不是 DltAwareProcessor。您可以提供自己的 recoverer 作為 BiConsumer,它將輸入 Record 和異常作為引數。假設一個場景,您不想將記錄傳送到 DLT,而只想簡單地記錄訊息然後繼續。下面是如何實現該目標的一個示例。

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .process(() -> new RecordRecoverableProcessor<>(record -> {
					throw new RuntimeException("error");
				},
                (record, exception) -> {
                  // Handle the record
                }));
}

在這種情況下,當記錄失敗時,RecordRecoverableProcessor 使用使用者提供的 recoverer,它是一個 BiConsumer,接受失敗的記錄和丟擲的異常作為引數。

在 DltAwareProcessor 中處理記錄鍵

使用 DltAwareProcessor 將失敗記錄傳送到 DLT 時,如果您想將記錄鍵傳送到 DLT 主題,則需要在 DLT 繫結上設定正確的序列化器。這是因為 DltAwareProcessor 使用 StreamBridge,它使用常規的 Kafka Binder(基於訊息通道),預設情況下對鍵使用 ByteArraySerializer。對於記錄值,Spring Cloud Stream 會將有效載荷轉換為適當的 byte[];但是,對於鍵,情況並非如此,因為它只是傳遞在頭部接收到的內容作為鍵。如果您提供的鍵不是位元組陣列,則可能會導致類轉換異常,為避免這種情況,您需要在 DLT 繫結上如下設定序列化器。

假設 DLT 目標是 hello-dlt-1 並且記錄鍵是 String 資料型別。

spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer