Kafka Binder 監聽器容器定製器

Spring Cloud Stream 透過使用定製器 (customizers) 為訊息監聽器容器提供了強大的定製選項。本節涵蓋了適用於 Kafka 的定製器介面:ListenerContainerCustomizer、其 Kafka 特定的擴充套件 KafkaListenerContainerCustomizer 以及專門的 ListenerContainerWithDlqAndRetryCustomizer

ListenerContainerCustomizer

ListenerContainerCustomizer 是 Spring Cloud Stream 中一個通用的介面,允許定製訊息監聽器容器。

目的

當你需要修改監聽器容器的行為時,使用此定製器。

用法

要使用 ListenerContainerCustomizer,請在你的配置中建立一個實現此介面的 bean

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
    return (container, destinationName, group) -> {
        // Customize the container here
    };
}

ListenerContainerCustomizer 介面定義了以下方法

void configure(C container, String destinationName, String group);
  • container: 要定製的訊息監聽器容器。

  • destinationName: 目標名稱 (主題)。

  • group: 消費者組 ID。

KafkaListenerContainerCustomizer

KafkaListenerContainerCustomizer 介面擴充套件了 ListenerContainerCustomizer,用於修改監聽器容器的行為,並提供對繫結特定的擴充套件 Kafka 消費者屬性的訪問。

目的

在定製監聽器容器時,當你需要訪問繫結特定的擴充套件 Kafka 消費者屬性時,使用此定製器。

用法

要使用 KafkaListenerContainerCustomizer,請在你的配置中建立一個實現此介面的 bean

@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
    return (container, destinationName, group, properties) -> {
        // Customize the Kafka container here
    };
}

KafkaListenerContainerCustomizer 介面添加了以下方法

default void configureKafkaListenerContainer(
    C container,
    String destinationName,
    String group,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        configure(container, destinationName, group);
}

此方法擴充套件了基礎的 configure 方法,增加了一個附加引數

  • extendedConsumerProperties: 擴充套件的消費者屬性,包括 Kafka 特定的屬性。

ListenerContainerWithDlqAndRetryCustomizer

ListenerContainerWithDlqAndRetryCustomizer 介面為涉及死信佇列 (DLQ) 和重試機制的場景提供了額外的定製選項。

目的

當你需要微調 DLQ 行為或為你的 Kafka 消費者實現自定義重試邏輯時,使用此定製器。

用法

要使用 ListenerContainerWithDlqAndRetryCustomizer,請在你的配置中建立一個實現此介面的 bean

@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
    return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
        // Access the container here with access to the extended consumer binding properties.
    };
}

ListenerContainerWithDlqAndRetryCustomizer 介面定義了以下方法

void configure(
    AbstractMessageListenerContainer<?, ?> container,
    String destinationName,
    String group,
    BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
    BackOff backOff,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
  • container: 要定製的 Kafka 監聽器容器。

  • destinationName: 目標名稱 (主題)。

  • group: 消費者組 ID。

  • dlqDestinationResolver: 一個用於解析失敗記錄的 DLQ 目標的函式。

  • backOff: 重試的退避策略。

  • extendedConsumerProperties: 擴充套件的消費者屬性,包括 Kafka 特定的屬性。

總結

  • 如果啟用了 DLQ,則使用 ListenerContainerWithDlqAndRetryCustomizer

  • KafkaListenerContainerCustomizer 用於不涉及 DLQ 的 Kafka 特定定製。

  • 基礎的 ListenerContainerCustomizer 用於通用定製。

這種分層方法允許在 Spring Cloud Stream 應用中對 Kafka 監聽器容器進行靈活且特定的定製。