異常處理

本節描述了在使用 Spring for Apache Kafka 時如何處理可能出現的各種異常。

監聽器錯誤處理器

從 2.0 版本開始,@KafkaListener 註解新增了一個屬性:errorHandler

您可以使用 errorHandler 來提供 KafkaListenerErrorHandler 實現的 bean 名稱。此函式式介面有一個方法,如下所示:

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

您可以訪問訊息轉換器生成的 spring-messaging Message<?> 物件以及監聽器丟擲的異常,該異常封裝在 ListenerExecutionFailedException 中。錯誤處理器可以丟擲原始異常或新異常,該異常會被拋給容器。錯誤處理器返回的任何內容都會被忽略。

從 2.7 版本開始,您可以在 MessagingMessageConverterBatchMessagingMessageConverter 上設定 rawRecordHeader 屬性,這會導致原始的 ConsumerRecord 被新增到轉換後的 Message<?>KafkaHeaders.RAW_DATA 標頭中。這很有用,例如,如果您希望在監聽器錯誤處理器中使用 DeadLetterPublishingRecoverer。它可以在請求/回覆場景中使用,在該場景中,您希望在多次重試後,將失敗結果傳送給傳送者,並將失敗的記錄捕獲到死信主題中。

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

它有一個子介面(ConsumerAwareListenerErrorHandler),透過以下方法可以訪問消費者物件:

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

另一個子介面(ManualAckListenerErrorHandler)在使用手動 AckMode 時提供對 Acknowledgment 物件的訪問。

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

在這兩種情況下,您都不應該在消費者上執行任何定址操作,因為容器將無法感知它們。

容器錯誤處理器

從 2.8 版本開始,舊的 ErrorHandlerBatchErrorHandler 介面已被新的 CommonErrorHandler 取代。這些錯誤處理器可以處理記錄和批處理監聽器的錯誤,允許單個監聽器容器工廠為兩種型別的監聽器建立容器。提供了 CommonErrorHandler 的實現以替換大多數舊框架錯誤處理器實現。

有關將自定義舊錯誤處理器遷移到 CommonErrorHandler 的資訊,請參見 將自定義舊錯誤處理器實現遷移到 CommonErrorHandler

當使用事務時,預設情況下不配置錯誤處理器,以便異常會回滾事務。事務容器的錯誤處理由 AfterRollbackProcessor 處理。如果在使用事務時提供自定義錯誤處理器,如果您希望回滾事務,它必須丟擲異常。

該介面有一個預設方法 isAckAfterHandle(),容器呼叫它來確定如果錯誤處理器返回而不丟擲異常,是否應提交偏移量;它預設返回 true。

通常,框架提供的錯誤處理器在錯誤未“處理”(例如,執行定址操作後)時會丟擲異常。預設情況下,此類異常由容器以 ERROR 級別記錄。所有框架錯誤處理器都擴充套件了 KafkaExceptionLogLevelAware,它允許您控制這些異常的日誌級別。

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

您可以指定一個全域性錯誤處理器,用於容器工廠中的所有監聽器。以下示例演示瞭如何實現:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

預設情況下,如果帶註解的監聽器方法丟擲異常,它會被拋給容器,並且訊息將根據容器配置進行處理。

容器在呼叫錯誤處理器之前提交任何掛起的偏移量提交。

如果您正在使用 Spring Boot,您只需將錯誤處理器新增為 @Bean,Boot 將其新增到自動配置的工廠中。

退避處理器

諸如 DefaultErrorHandler 之類的錯誤處理器使用 BackOff 來確定在重試傳遞之前等待多長時間。從 2.9 版本開始,您可以配置自定義的 BackOffHandler。預設處理器只是暫停執行緒,直到退避時間過去(或容器停止)。框架還提供了 ContainerPausingBackOffHandler,它會暫停監聽器容器,直到退避時間過去,然後恢復容器。當延遲時間長於 max.poll.interval.ms 消費者屬性時,這很有用。請注意,實際退避時間的解析將受到 pollTimeout 容器屬性的影響。

DefaultErrorHandler

這個新的錯誤處理器取代了 SeekToCurrentErrorHandlerRecoveringBatchErrorHandler,它們已經作為預設錯誤處理器存在了幾個版本。一個不同之處是,對於批處理監聽器(當丟擲 BatchListenerFailedException 以外的異常時)的備用行為相當於 重試完整批次

從 2.9 版本開始,可以配置 DefaultErrorHandler 以提供與下面討論的尋求未處理記錄偏移量相同的語義,但實際上不進行尋求。相反,記錄由監聽器容器保留,並在錯誤處理器退出後(以及在執行一次暫停的 poll() 以保持消費者活動後;如果正在使用 非阻塞重試ContainerPausingBackOffHandler,暫停可能會持續多次輪詢)重新提交給監聽器。錯誤處理器向容器返回一個結果,指示當前失敗的記錄是否可以重新提交,或者它是否已恢復,然後將不再發送給監聽器。要啟用此模式,請將屬性 seekAfterError 設定為 false

錯誤處理器可以恢復(跳過)持續失敗的記錄。預設情況下,在十次失敗後,失敗的記錄會被記錄(以 ERROR 級別)。您可以使用自定義的恢復器(BiConsumer)和控制傳遞嘗試和每次嘗試之間延遲的 BackOff 來配置處理器。使用 FixedBackOffFixedBackOff.UNLIMITED_ATTEMPTS 會導致(實際上)無限重試。以下示例配置了三次嘗試後的恢復:

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

要為監聽器容器配置此處理器的自定義例項,請將其新增到容器工廠。

例如,對於 @KafkaListener 容器工廠,您可以按如下方式新增 DefaultErrorHandler

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

對於記錄監聽器,這將重試傳遞最多 2 次(3 次傳遞嘗試),退避時間為 1 秒,而不是預設配置(FixedBackOff(0L, 9))。重試耗盡後,失敗只會記錄日誌。

例如,如果 poll 返回六條記錄(分割槽 0、1、2 各兩條),並且監聽器在第四條記錄上丟擲異常,則容器透過提交其偏移量來確認前三條訊息。DefaultErrorHandler 尋求分割槽 1 的偏移量 1 和分割槽 2 的偏移量 0。下一次 poll() 返回三條未處理的記錄。

如果 AckModeBATCH,則容器在呼叫錯誤處理器之前提交前兩個分割槽的偏移量。

對於批處理監聽器,監聽器必須丟擲 BatchListenerFailedException,指示批處理中的哪些記錄失敗。

事件序列是:

  • 提交索引之前的記錄的偏移量。

  • 如果重試未耗盡,則執行尋求操作,以便所有剩餘記錄(包括失敗記錄)將重新傳遞。

  • 如果重試耗盡,嘗試恢復失敗記錄(預設只記錄日誌),並執行尋求操作,以便剩餘記錄(不包括失敗記錄)將重新傳遞。已恢復記錄的偏移量將被提交。

  • 如果重試耗盡且恢復失敗,則像重試未耗盡一樣執行尋求操作。

從 2.9 版本開始,可以配置 DefaultErrorHandler 以提供與上面討論的尋求未處理記錄偏移量相同的語義,但實際上不進行尋求。相反,錯誤處理器建立一個新的 ConsumerRecords<?, ?>,其中只包含未處理的記錄,然後將其提交給監聽器(在執行一次暫停的 poll() 以保持消費者活動後)。要啟用此模式,請將屬性 seekAfterError 設定為 false

預設的恢復器在重試耗盡後記錄失敗的記錄。您可以使用自定義恢復器,或框架提供的恢復器,例如 DeadLetterPublishingRecoverer

當使用 POJO 批處理監聽器(例如 List<Thing>),並且您沒有完整的消費者記錄要新增到異常中時,您只需新增失敗記錄的索引:

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

當容器配置為 AckMode.MANUAL_IMMEDIATE 時,可以將錯誤處理器配置為提交已恢復記錄的偏移量;將 commitRecovered 屬性設定為 true

另請參見 釋出死信記錄

當使用事務時,DefaultAfterRollbackProcessor 提供了類似的功能。請參見 回滾後處理器

DefaultErrorHandler 認為某些異常是致命的,並跳過這些異常的重試;恢復器在第一次失敗時被呼叫。預設情況下被認為是致命的異常有:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因為這些異常不太可能在重試傳遞時得到解決。

您可以將更多異常型別新增到不可重試類別中,或者完全替換分類異常的對映。有關更多資訊,請參見 DefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications() 的 Javadoc,以及 ExceptionMatcher

這是一個將 IllegalArgumentException 新增到不可重試異常的示例:

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}
DefaultErrorHandler 只處理繼承自 RuntimeException 的異常。繼承自 Error 的異常會完全繞過錯誤處理器,導致消費者立即終止,關閉 Kafka 連線,並跳過所有重試/恢復機制。這種關鍵的區別意味著應用程式可能會報告健康狀態,儘管已終止的消費者不再處理訊息。始終確保訊息處理程式碼中丟擲的異常明確地擴充套件自 RuntimeException 而不是 Error,以允許正確的錯誤處理。換句話說,如果應用程式丟擲異常,請確保它擴充套件自 RuntimeException,而不是無意中繼承自 Error。像 OutOfMemoryErrorIllegalAccessError 和應用程式無法控制的其他錯誤仍然被視為 Error,並且不會重試。

錯誤處理器可以配置一個或多個 RetryListener,接收重試和恢復進度的通知。從 2.8.10 版本開始,添加了批處理監聽器的方法。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

有關更多資訊,請參見 JavaDocs。

如果恢復器失敗(丟擲異常),失敗的記錄將包含在定址操作中。如果恢復器失敗,預設情況下 BackOff 將被重置,並且重新傳遞將再次透過退避操作,然後再次嘗試恢復。要在恢復失敗後跳過重試,請將錯誤處理器的 resetStateOnRecoveryFailure 設定為 false

您可以為錯誤處理器提供一個 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根據失敗的記錄和/或異常來確定要使用的 BackOff

handler.setBackOffFunction((record, ex) -> { ... });

如果函式返回 null,則將使用處理器的預設 BackOff

resetStateOnExceptionChange 設定為 true,如果異常型別在失敗之間發生變化,則重試序列將重新啟動(包括選擇一個新的 BackOff,如果已配置)。當為 false(2.9 版本之前的預設值)時,不考慮異常型別。

從 2.9 版本開始,這現在預設為 true

另請參見 傳遞嘗試頭

批處理監聽器與死信主題的錯誤處理

非阻塞重試 (@RetryableTopic 註解) 不支援批處理監聽器。對於批處理監聽器與死信主題功能,請使用 DefaultErrorHandlerDeadLetterPublishingRecoverer

使用 BatchListenerFailedException

要指示批次中哪個特定記錄失敗,請丟擲 BatchListenerFailedException

@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, Order>> records) {
    for (ConsumerRecord<String, Order> record : records) {
        try {
            process(record.value());
        }
        catch (Exception e) {
            // Identifies the failed record for error handling
            throw new BatchListenerFailedException("Failed to process", e, record);
        }
    }
}

對於沒有 ConsumerRecord 的 POJO 批處理監聽器,請改用索引:

@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<Order> orders) {
    for (int i = 0; i < orders.size(); i++) {
        try {
            process(orders.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", e, i);
        }
    }
}

為批處理監聽器配置死信主題

在您的批處理監聽器容器工廠上配置一個帶有 DeadLetterPublishingRecovererDefaultErrorHandler

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
        ConsumerFactory<String, Order> consumerFactory,
        KafkaTemplate<String, Order> kafkaTemplate) {

    ConcurrentKafkaListenerContainerFactory<String, Order> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);

    // Configure Dead Letter Publishing
    DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
            (record, ex) -> new TopicPartition(record.topic() + "-dlt", record.partition()));

    // Configure retries: 3 attempts with 1 second between each
    DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
            new FixedBackOff(1000L, 2L)); // 2 retries = 3 total attempts

    factory.setCommonErrorHandler(errorHandler);
    return factory;
}

批處理錯誤處理的工作原理

當丟擲 BatchListenerFailedException 時,DefaultErrorHandler 會:

  1. 提交失敗記錄之前所有記錄的偏移量

  2. 根據 BackOff 配置重試失敗記錄(以及後續記錄)

  3. 當重試耗盡時,釋出到 DLT - 只有失敗的記錄被髮送到 DLT

  4. 提交失敗記錄的偏移量並重新傳遞剩餘記錄進行處理

批處理包含 6 條記錄,其中索引為 2 的記錄失敗的示例流程:

  • 第一次嘗試:記錄 0、1 成功處理;記錄 2 失敗

  • 容器提交記錄 0、1 的偏移量

  • 重試嘗試 1:記錄 2、3、4、5 被重試

  • 重試嘗試 2:記錄 2、3、4、5 再次被重試

  • 重試耗盡後:記錄 2 釋出到 DLT 並提交其偏移量

  • 容器繼續處理記錄 3、4、5

跳過特定異常的重試

預設情況下,DefaultErrorHandler 會重試所有異常,除了致命異常(如 DeserializationExceptionMessageConversionException 等)。要跳過您自己的異常型別的重試,請使用異常分類配置錯誤處理器。

錯誤處理器檢查 BatchListenerFailedException原因以確定是否應跳過重試:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
        ConsumerFactory<String, Order> consumerFactory,
        KafkaTemplate<String, Order> kafkaTemplate) {

    ConcurrentKafkaListenerContainerFactory<String, Order> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);

    DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
            new FixedBackOff(1000L, 2L));

    // Add custom exception types that should skip retries and go directly to DLT
    errorHandler.addNotRetryableExceptions(ValidationException.class, InvalidFormatException.class);

    factory.setCommonErrorHandler(errorHandler);
    return factory;
}

現在在您的監聽器中:

@KafkaListener(id = "batch-listener", topics = "orders", containerFactory = "batchFactory")
public void processOrders(List<ConsumerRecord<String, Order>> records) {
    for (ConsumerRecord<String, Order> record : records) {
        try {
            process(record.value());
        }
        catch (DatabaseException e) {
            // Will be retried 3 times (according to BackOff configuration)
            throw new BatchListenerFailedException("Database error", e, record);
        }
        catch (ValidationException e) {
            // Skips retries - goes directly to DLT
            // (because ValidationException is configured as not retryable)
            throw new BatchListenerFailedException("Validation failed", e, record);
        }
    }
}
錯誤處理器檢查 BatchListenerFailedException原因(第二個引數)。如果原因被歸類為不可重試,則記錄會立即傳送到 DLT,而不進行重試。

偏移量提交行為

瞭解偏移量提交對於批處理錯誤處理很重要:

  • AckMode.BATCH(批處理監聽器最常見)

    • 失敗記錄之前的偏移量在錯誤處理之前提交

    • 失敗記錄的偏移量在成功恢復(DLT 釋出)後提交

  • AckMode.MANUAL_IMMEDIATE:

    • 設定 errorHandler.setCommitRecovered(true) 以提交已恢復記錄的偏移量

    • 您在監聽器中控制確認時機

手動確認示例

@KafkaListener(id = "manual-batch", topics = "myTopic", containerFactory = "manualBatchFactory")
public void listen(List<ConsumerRecord<String, Order>> records, Acknowledgment ack) {
    for (ConsumerRecord<String, Order> record : records) {
        try {
            process(record.value());
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Processing failed", e, record);
        }
    }
    ack.acknowledge();
}

批處理錯誤處理器中的轉換錯誤

從 2.8 版本開始,批處理監聽器現在可以正確處理轉換錯誤,當使用帶有 ByteArrayDeserializerBytesDeserializerStringDeserializerMessageConverterDefaultErrorHandler 時。當發生轉換錯誤時,有效負載被設定為 null,並且反序列化異常被新增到記錄頭中,類似於 ErrorHandlingDeserializer。監聽器中會提供一個 ConversionException 列表,因此監聽器可以丟擲 BatchListenerFailedException,指示發生轉換異常的第一個索引。

示例

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

批處理監聽器中的反序列化錯誤

批處理監聽器需要手動處理反序列化錯誤。與記錄監聽器不同,沒有自動錯誤處理器可以檢測並將反序列化失敗路由到 DLT。您必須顯式檢查失敗的記錄並丟擲 BatchListenerFailedException

使用 ErrorHandlingDeserializer 可防止反序列化失敗停止整個批次

@Bean
public ConsumerFactory<String, Order> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    // Wrap your deserializer with ErrorHandlingDeserializer
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());

    return new DefaultKafkaConsumerFactory<>(props);
}

在您的監聽器中,您必須手動檢查 null 值,這些值表示反序列化失敗

@KafkaListener(id = "batch-deser", topics = "orders", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, Order>> records) {
    for (ConsumerRecord<String, Order> record : records) {
        if (record.value() == null) {
            // Deserialization failed - throw exception to send to DLT
            throw new BatchListenerFailedException("Deserialization failed", record);
        }
        process(record.value());
    }
}

DeadLetterPublishingRecoverer 將反序列化失敗釋出到 DLT 時

  • 無法反序列化的原始 byte[] 資料將作為記錄值恢復

  • 異常資訊(類名、訊息、堆疊跟蹤)將新增到標準 DLT 異常頭中

  • 原始的 ErrorHandlingDeserializer 異常頭預設會被移除(設定 recoverer 的 setRetainExceptionHeader(true) 以保留它)

重試完整批次

對於批處理監聽器,當監聽器丟擲 BatchListenerFailedException 以外的異常時,這現在是 DefaultErrorHandler 的備用行為。

無法保證當批次重新傳遞時,批次具有相同數量的記錄和/或重新傳遞的記錄順序相同。因此,不可能輕易地維護批次的重試狀態。FallbackBatchErrorHandler 採用以下方法。如果批處理監聽器丟擲不是 BatchListenerFailedException 的異常,則從記憶體中的記錄批次執行重試。為了避免在擴充套件重試序列期間發生再平衡,錯誤處理器會暫停消費者,在每次重試時,在退避休眠之前對其進行輪詢,並再次呼叫監聽器。如果/當重試耗盡時,會為批次中的每條記錄呼叫 ConsumerRecordRecoverer。如果恢復器丟擲異常,或者執行緒在其休眠期間被中斷,則記錄批次將在下一次輪詢時重新傳遞。在退出之前,無論結果如何,消費者都會恢復。

此機制不能與事務一起使用。

在等待 BackOff 間隔時,錯誤處理器將以短時休眠迴圈,直到達到所需的延遲,同時檢查容器是否已停止,從而允許休眠在 stop() 後立即退出,而不是導致延遲。

容器停止錯誤處理器

如果監聽器丟擲異常,CommonContainerStoppingErrorHandler 將停止容器。對於記錄監聽器,當 AckModeRECORD 時,已處理記錄的偏移量將提交。對於記錄監聽器,當 AckMode 為任何手動值時,已確認記錄的偏移量將提交。對於記錄監聽器,當 AckModeBATCH 時,或對於批處理監聽器,當容器重新啟動時,整個批次將重播。

容器停止後,會丟擲一個包裝 ListenerExecutionFailedException 的異常。這是為了導致事務回滾(如果啟用了事務)。

委派錯誤處理器

CommonDelegatingErrorHandler 可以根據異常型別委託給不同的錯誤處理器。例如,您可能希望為大多數異常呼叫 DefaultErrorHandler,或者為其他異常呼叫 CommonContainerStoppingErrorHandler

所有委託必須共享相同的相容屬性(ackAfterHandleseekAfterError…​)。

日誌錯誤處理器

CommonLoggingErrorHandler 只是記錄異常;對於記錄監聽器,來自上次輪詢的剩餘記錄會傳遞給監聽器。對於批處理監聽器,批處理中的所有記錄都會被記錄。

對記錄和批處理監聽器使用不同的通用錯誤處理器

如果您希望對記錄和批處理監聽器使用不同的錯誤處理策略,則提供 CommonMixedErrorHandler,允許為每種監聽器型別配置特定的錯誤處理器。

通用錯誤處理器摘要

  • DefaultErrorHandler

  • CommonContainerStoppingErrorHandler

  • CommonDelegatingErrorHandler

  • CommonLoggingErrorHandler

  • CommonMixedErrorHandler

舊版錯誤處理器及其替代品

舊版錯誤處理器 替代

LoggingErrorHandler

CommonLoggingErrorHandler

BatchLoggingErrorHandler

CommonLoggingErrorHandler

ConditionalDelegatingErrorHandler

DelegatingErrorHandler

ConditionalDelegatingBatchErrorHandler

DelegatingErrorHandler

ContainerStoppingErrorHandler

CommonContainerStoppingErrorHandler

ContainerStoppingBatchErrorHandler

CommonContainerStoppingErrorHandler

SeekToCurrentErrorHandler

DefaultErrorHandler

SeekToCurrentBatchErrorHandler

無替代品,使用具有無限 BackOffDefaultErrorHandler

RecoveringBatchErrorHandler

DefaultErrorHandler

RetryingBatchErrorHandler

無替代品,使用 DefaultErrorHandler 並丟擲 BatchListenerFailedException 以外的異常。

將自定義舊版錯誤處理器實現遷移到 CommonErrorHandler

請參閱 CommonErrorHandler 中的 JavaDocs。

要替換 ErrorHandlerConsumerAwareErrorHandler 實現,您應該實現 handleOne() 並讓 seeksAfterHandle() 返回 false(預設)。您還應該實現 handleOtherException() 來處理在記錄處理範圍之外發生的異常(例如消費者錯誤)。

要替換 RemainingRecordsErrorHandler 實現,您應該實現 handleRemaining() 並重寫 seeksAfterHandle() 以返回 true(錯誤處理器必須執行必要的尋求)。您還應該實現 handleOtherException() - 以處理在記錄處理範圍之外發生的異常(例如消費者錯誤)。

要替換任何 BatchErrorHandler 實現,您應該實現 handleBatch()。您還應該實現 handleOtherException() - 以處理在記錄處理範圍之外發生的異常(例如消費者錯誤)。

回滾後處理器

使用事務時,如果監聽器丟擲異常(並且錯誤處理器,如果存在,也丟擲異常),事務將回滾。預設情況下,任何未處理的記錄(包括失敗的記錄)將在下一次輪詢時重新獲取。這是透過在 DefaultAfterRollbackProcessor 中執行 seek 操作來實現的。對於批處理監聽器,整個批處理記錄都會被重新處理(容器不知道批處理中的哪個記錄失敗)。要修改此行為,您可以使用自定義 AfterRollbackProcessor 配置監聽器容器。例如,對於基於記錄的監聽器,您可能希望跟蹤失敗的記錄並在嘗試一定次數後放棄,也許透過將其釋出到死信主題。

從 2.2 版本開始,DefaultAfterRollbackProcessor 現在可以恢復(跳過)持續失敗的記錄。預設情況下,在十次失敗後,失敗的記錄會被記錄(以 ERROR 級別)。您可以配置處理器,使其具有自定義的恢復器(BiConsumer)和最大失敗次數。將 maxFailures 屬性設定為負數會導致無限重試。以下示例配置了三次嘗試後的恢復:

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

當您不使用事務時,可以透過配置 DefaultErrorHandler 來實現類似的功能。請參閱 容器錯誤處理器

從 3.2 版本開始,恢復現在可以恢復(跳過)持續失敗的整個批處理記錄。將 ContainerProperties.setBatchRecoverAfterRollback(true) 設定為 true 以啟用此功能。

預設行為是,批處理監聽器無法進行恢復,因為框架不知道批處理中哪個記錄持續失敗。在這種情況下,應用程式監聽器必須處理持續失敗的記錄。

另請參見 釋出死信記錄

從 2.2.5 版本開始,DefaultAfterRollbackProcessor 可以在新事務中呼叫(在失敗事務回滾後啟動)。然後,如果您正在使用 DeadLetterPublishingRecoverer 釋出失敗記錄,處理器將在原始主題/分割槽中將已恢復記錄的偏移量傳送到事務。要啟用此功能,請在 DefaultAfterRollbackProcessor 上設定 commitRecoveredkafkaTemplate 屬性。

如果恢復器失敗(丟擲異常),失敗的記錄將包含在定址操作中。從 2.5.5 版本開始,如果恢復器失敗,預設情況下 BackOff 將被重置,並且重新傳遞將再次透過退避操作,然後再次嘗試恢復。在早期版本中,BackOff 未被重置,並在下次失敗時重新嘗試恢復。要恢復到以前的行為,請將處理器的 resetStateOnRecoveryFailure 屬性設定為 false

從 2.6 版本開始,您現在可以為處理器提供一個 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根據失敗的記錄和/或異常來確定要使用的 BackOff

handler.setBackOffFunction((record, ex) -> { ... });

如果函式返回 null,則將使用處理器的預設 BackOff

從 2.6.3 版本開始,將 resetStateOnExceptionChange 設定為 true,如果異常型別在失敗之間發生變化,則重試序列將重新啟動(包括選擇一個新的 BackOff,如果已配置)。預設情況下,不考慮異常型別。

從 2.3.1 版本開始,類似於 DefaultErrorHandlerDefaultAfterRollbackProcessor 認為某些異常是致命的,並跳過這些異常的重試;恢復器在第一次失敗時被呼叫。預設情況下被認為是致命的異常有:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因為這些異常不太可能在重試傳遞時得到解決。

您可以將更多異常型別新增到不可重試類別中,或者完全替換分類異常的對映。有關更多資訊,請參見 DefaultAfterRollbackProcessor.setClassifications() 的 Javadoc,以及 ExceptionMatcher

這是一個將 IllegalArgumentException 新增到不可重試異常的示例:

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}

另請參見 傳遞嘗試頭

使用當前的 kafka-clients,容器無法檢測到 ProducerFencedException 是由再平衡引起的,還是由於超時或過期導致生產者的 transactional.id 被撤銷。因為在大多數情況下,它是由再平衡引起的,容器不會呼叫 AfterRollbackProcessor(因為它不適合尋求分割槽,因為我們不再分配它們)。如果您確保超時足夠大以處理每個事務並定期執行“空”事務(例如透過 ListenerContainerIdleEvent),您可以避免因超時和過期而導致的隔離。或者,您可以將 stopContainerWhenFenced 容器屬性設定為 true,容器將停止,避免記錄丟失。您可以消費 ConsumerStoppedEvent 並檢查 Reason 屬性的 FENCED 以檢測此情況。由於事件還具有對容器的引用,您可以使用此事件重新啟動容器。

從 2.7 版本開始,在等待 BackOff 間隔時,錯誤處理器將以短時休眠迴圈,直到達到所需的延遲,同時檢查容器是否已停止,從而允許休眠在 stop() 後立即退出,而不是導致延遲。

從 2.7 版本開始,處理器可以配置一個或多個 RetryListener,接收重試和恢復進度的通知。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

有關更多資訊,請參見 JavaDocs。

傳遞嘗試頭

以下僅適用於記錄監聽器,不適用於批處理監聽器。

從 2.5 版本開始,當使用實現了 DeliveryAttemptAwareErrorHandlerAfterRollbackProcessor 時,可以啟用新增 KafkaHeaders.DELIVERY_ATTEMPT 頭(kafka_deliveryAttempt)到記錄中。此頭的值是一個從 1 開始遞增的整數。當接收原始 ConsumerRecord<?, ?> 時,整數位於 byte[4] 中。

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

當使用 @KafkaListenerJsonKafkaHeaderMapperSimpleKafkaHeaderMapper 時,可以透過在監聽器方法中新增 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery 作為引數來獲取。

要啟用此標頭的填充,請將容器屬性 deliveryAttemptHeader 設定為 true。預設情況下停用此功能,以避免查詢每個記錄狀態和新增標頭(少量)的開銷。

DefaultErrorHandlerDefaultAfterRollbackProcessor 支援此功能。

批處理監聽器的傳遞嘗試頭

當使用 BatchListener 處理 ConsumerRecord 時,KafkaHeaders.DELIVERY_ATTEMPT 頭可能以與 SingleRecordListener 不同的方式存在。

從 3.3 版本開始,如果您想在使用 BatchListener 時將 KafkaHeaders.DELIVERY_ATTEMPT 頭注入到 ConsumerRecord 中,請將 DeliveryAttemptAwareRetryListener 設定為 ErrorHandler 中的 RetryListener

請參閱以下程式碼。

final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);

然後,每當批處理未能完成時,DeliveryAttemptAwareRetryListener 將把 KafkaHeaders.DELIVERY_ATTMPT 頭注入到 ConsumerRecord 中。

監聽器資訊頭

在某些情況下,能夠知道監聽器正在哪個容器中執行是有用的。

從 2.8.4 版本開始,您現在可以設定監聽器容器上的 listenerInfo 屬性,或者在 @KafkaListener 註解上設定 info 屬性。然後,容器會將此資訊新增到所有傳入訊息的 KafkaListener.LISTENER_INFO 頭中;它可以在記錄攔截器、過濾器等中,或在監聽器本身中使用。

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

當在 RecordInterceptorRecordFilterStrategy 實現中使用時,頭在消費者記錄中作為位元組陣列,使用 KafkaListenerAnnotationBeanPostProcessorcharSet 屬性進行轉換。

頭對映器在從消費者記錄建立 MessageHeaders 時也會轉換為 String,並且永遠不會在出站記錄上對映此頭。

對於 POJO 批處理監聽器,從 2.8.6 版本開始,頭會複製到批處理的每個成員中,並且在轉換後也作為單個 String 引數可用。

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
如果批處理監聽器有過濾器,並且過濾器導致空批處理,則需要向 @Header 引數新增 required = false,因為空批處理沒有資訊。

如果您收到 List<Message<Thing>>,則資訊位於每個 Message<?>KafkaHeaders.LISTENER_INFO 頭中。

有關消費批次的更多資訊,請參見 批處理監聽器

釋出死信記錄

當記錄的失敗次數達到最大值時,您可以為 DefaultErrorHandlerDefaultAfterRollbackProcessor 配置一個記錄恢復器。框架提供了 DeadLetterPublishingRecoverer,它將失敗的訊息釋出到另一個主題。恢復器需要一個 KafkaTemplate<Object, Object>,用於傳送記錄。您還可以選擇配置一個 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>,該函式用於解析目標主題和分割槽。

預設情況下,死信記錄被髮送到名為 <originalTopic>-dlt 的主題(原始主題名稱字尾為 -dlt),並且傳送到與原始記錄相同的分割槽。因此,當您使用預設解析器時,死信主題必須至少具有與原始主題相同數量的分割槽。

如果返回的 TopicPartition 具有負分割槽,則不會在 ProducerRecord 中設定分割槽,因此分割槽由 Kafka 選擇。從 2.2.4 版本開始,任何 ListenerExecutionFailedException(例如,當在 @KafkaListener 方法中檢測到異常時丟擲)都使用 groupId 屬性進行了增強。這允許目標解析器除了 ConsumerRecord 中的資訊外,還可以使用此資訊來選擇死信主題。

以下示例演示瞭如何連線自定義目標解析器:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

傳送到死信主題的記錄將新增以下標頭:

  • KafkaHeaders.DLT_EXCEPTION_FQCN: 異常類名(通常是 ListenerExecutionFailedException,但也可以是其他)。

  • KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: 異常原因類名,如果存在(自 2.8 版本起)。

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE: 異常堆疊跟蹤。

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE: 異常訊息。

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: 異常類名(僅限鍵反序列化錯誤)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 異常堆疊跟蹤(僅限鍵反序列化錯誤)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 異常訊息(僅限鍵反序列化錯誤)。

  • KafkaHeaders.DLT_ORIGINAL_TOPIC: 原始主題。

  • KafkaHeaders.DLT_ORIGINAL_PARTITION: 原始分割槽。

  • KafkaHeaders.DLT_ORIGINAL_OFFSET: 原始偏移量。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: 原始時間戳。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: 原始時間戳型別。

  • KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: 原始消費者組,未能處理該記錄(自 2.8 版本起)。

鍵異常僅由 DeserializationException 引起,因此沒有 DLT_KEY_EXCEPTION_CAUSE_FQCN

有兩種機制可以新增更多標頭。

  1. 子類化恢復器並重寫 createProducerRecord() - 呼叫 super.createProducerRecord() 並新增更多標頭。

  2. 提供一個 BiFunction 來接收消費者記錄和異常,返回一個 Headers 物件;其中的標頭將被複制到最終的生產者記錄;另請參閱 管理死信記錄標頭。使用 setHeadersFunction() 設定 BiFunction

第二種實現更簡單,但第一種提供了更多可用資訊,包括已組裝的標準標頭。

從 2.3 版本開始,當與 ErrorHandlingDeserializer 結合使用時,釋出者會將死信生產者記錄中的記錄 value() 恢復為未能反序列化的原始值。以前,value() 為空,使用者程式碼必須從訊息頭中解碼 DeserializationException。此外,您可以向釋出者提供多個 KafkaTemplate;例如,如果您想釋出來自 DeserializationExceptionbyte[] 以及使用與成功反序列化記錄不同的序列化器的值,則可能需要這樣做。這是一個配置釋出者,使其具有使用 Stringbyte[] 序列化器的 KafkaTemplate 的示例:

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

釋出者使用對映鍵來查詢適合要釋出的 value() 的模板。建議使用 LinkedHashMap,以便按順序檢查鍵。

當釋出 null 值時,如果有多個模板,恢復器將查詢 Void 類的模板;如果不存在,將使用 values().iterator() 中的第一個模板。

自 2.7 版本起,您可以使用 setFailIfSendResultIsError 方法,以便在訊息釋出失敗時丟擲異常。您還可以使用 setWaitForSendResultTimeout 設定傳送成功驗證的超時時間。

如果恢復器失敗(丟擲異常),失敗的記錄將包含在定址操作中。從 2.5.5 版本開始,如果恢復器失敗,預設情況下 BackOff 將被重置,並且重新傳遞將再次透過退避操作,然後再次嘗試恢復。在早期版本中,BackOff 未被重置,並在下次失敗時重新嘗試恢復。要恢復到以前的行為,請將錯誤處理器的 resetStateOnRecoveryFailure 屬性設定為 false

從 2.6.3 版本開始,將 resetStateOnExceptionChange 設定為 true,如果異常型別在失敗之間發生變化,則重試序列將重新啟動(包括選擇一個新的 BackOff,如果已配置)。預設情況下,不考慮異常型別。

從 2.3 版本開始,恢復器也可以與 Kafka Streams 一起使用 - 有關更多資訊,請參見 從反序列化異常中恢復

ErrorHandlingDeserializer 將反序列化異常新增到頭 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER 中(使用 Java 序列化)。預設情況下,這些頭不會保留在釋出到死信主題的訊息中。從 2.7 版本開始,如果鍵和值都未能反序列化,則兩者的原始值都將填充到傳送到 DLT 的記錄中。

如果傳入的記錄相互依賴,但可能亂序到達,則將失敗的記錄重新發布到原始主題的尾部(一定次數),而不是直接傳送到死信主題可能很有用。請參見 此 Stack Overflow 問題 以獲取示例。

以下錯誤處理器配置將完全實現這一點:

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic-dlt", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

從 2.7 版本開始,恢復器會檢查目標解析器選擇的分割槽是否存在。如果分割槽不存在,則 ProducerRecord 中的分割槽將設定為 null,允許 KafkaProducer 選擇分割槽。您可以透過將 verifyPartition 屬性設定為 false 來停用此檢查。

從 3.1 版本開始,將 logRecoveryRecord 屬性設定為 true 將記錄恢復記錄和異常。

管理死信記錄頭

參考上面的 釋出死信記錄DeadLetterPublishingRecoverer 有兩個屬性用於管理已存在的頭(例如,當重新處理失敗的死信記錄時,包括使用 非阻塞重試 時)。

  • appendOriginalHeaders (預設 true)

  • stripPreviousExceptionHeaders (預設 true,自 2.8 版本起)

Apache Kafka 支援具有相同名稱的多個頭;要獲取“最新”值,可以使用 headers.lastHeader(headerName);要獲取多個頭的迭代器,可以使用 headers.headers(headerName).iterator()

當重複釋出失敗的記錄時,這些標頭可能會增長(並最終因 RecordTooLargeException 導致釋出失敗);對於異常標頭,特別是堆疊跟蹤標頭,更是如此。

這兩個屬性的原因是,雖然您可能只希望保留最新的異常資訊,但您可能希望保留記錄在每次失敗時經過哪些主題的歷史記錄。

appendOriginalHeaders 應用於所有名為 ORIGINAL 的頭,而 stripPreviousExceptionHeaders 應用於所有名為 EXCEPTION 的頭。

從 2.8.4 版本開始,您現在可以控制哪些標準頭將新增到輸出記錄中。請參閱 enum HeadersToAdd 以瞭解預設新增的(目前)10 個標準頭的通用名稱(這些不是實際的頭名稱,只是一個抽象;實際的頭名稱由 getHeaderNames() 方法設定,子類可以重寫此方法)。

要排除頭,請使用 excludeHeaders() 方法;例如,要禁止在頭中新增異常堆疊跟蹤,請使用:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

此外,您可以透過新增 ExceptionHeadersCreator 來完全自定義異常頭的新增;這也將停用所有標準異常頭。

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

同樣從 2.8.4 版本開始,您現在可以透過 addHeadersFunction 方法提供多個頭函式。這允許應用額外的函式,即使已經註冊了另一個函式,例如,在使用 非阻塞重試 時。

另請參見 故障頭管理非阻塞重試

ExponentialBackOffWithMaxRetries 實現

Spring Framework 提供了許多 BackOff 實現。預設情況下,ExponentialBackOff 將無限期重試;要在一定次數的重試嘗試後放棄,需要計算 maxElapsedTime。自 2.7.3 版本起,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries,這是一個子類,它接收 maxRetries 屬性並自動計算 maxElapsedTime,這更方便一些。

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

這將分別在 1、2、4、8、10、10 秒後重試,然後呼叫恢復器。

© . This site is unofficial and not affiliated with VMware.