Kafka 繫結器監聽器容器定製器

Spring Cloud Stream 透過使用定製器為訊息監聽器容器提供了強大的定製選項。本節介紹適用於 Kafka 的定製器介面:ListenerContainerCustomizer、其 Kafka 專用擴充套件 KafkaListenerContainerCustomizer,以及專門的 ListenerContainerWithDlqAndRetryCustomizer

ListenerContainerCustomizer

ListenerContainerCustomizer 是 Spring Cloud Stream 中的一個通用介面,允許對訊息監聽器容器進行定製。

用途

當您需要修改監聽器容器的行為時,請使用此定製器。

用法

要使用 ListenerContainerCustomizer,請在您的配置中建立一個實現此介面的 bean

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
    return (container, destinationName, group) -> {
        // Customize the container here
    };
}

ListenerContainerCustomizer 介面定義了以下方法

void configure(C container, String destinationName, String group);
  • container:要定製的訊息監聽器容器。

  • destinationName:目標(主題)的名稱。

  • group:消費者組 ID。

KafkaListenerContainerCustomizer

KafkaListenerContainerCustomizer 介面擴充套件了 ListenerContainerCustomizer,用於修改監聽器容器的行為,並提供對繫結特定擴充套件 Kafka 消費者屬性的訪問。

用途

當您需要在定製監聽器容器時訪問繫結特定的擴充套件 Kafka 消費者屬性時,請使用此定製器。

用法

要使用 KafkaListenerContainerCustomizer,請在您的配置中建立一個實現此介面的 bean

@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
    return (container, destinationName, group, properties) -> {
        // Customize the Kafka container here
    };
}

KafkaListenerContainerCustomizer 介面添加了以下方法

default void configureKafkaListenerContainer(
    C container,
    String destinationName,
    String group,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        configure(container, destinationName, group);
}

此方法使用一個額外的引數擴充套件了基類 configure 方法

  • extendedConsumerProperties:擴充套件的消費者屬性,包括 Kafka 特定的屬性。

ListenerContainerWithDlqAndRetryCustomizer

ListenerContainerWithDlqAndRetryCustomizer 介面為涉及死信佇列 (DLQ) 和重試機制的場景提供了額外的定製選項。

用途

當您需要微調 DLQ 行為或為您的 Kafka 消費者實現自定義重試邏輯時,請使用此定製器。

用法

要使用 ListenerContainerWithDlqAndRetryCustomizer,請在您的配置中建立一個實現此介面的 bean

@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
    return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
        // Access the container here with access to the extended consumer binding properties.
    };
}

ListenerContainerWithDlqAndRetryCustomizer 介面定義了以下方法

void configure(
    AbstractMessageListenerContainer<?, ?> container,
    String destinationName,
    String group,
    BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
    BackOff backOff,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
  • container:要定製的 Kafka 監聽器容器。

  • destinationName:目標(主題)的名稱。

  • group:消費者組 ID。

  • dlqDestinationResolver:用於解析失敗記錄的 DLQ 目標的函式。

  • backOff:重試的退避策略。

  • extendedConsumerProperties:擴充套件的消費者屬性,包括 Kafka 特定的屬性。

總結

  • 如果啟用了 DLQ,則使用 ListenerContainerWithDlqAndRetryCustomizer

  • KafkaListenerContainerCustomizer 用於沒有 DLQ 的 Kafka 特定定製。

  • 基類 ListenerContainerCustomizer 用於通用定製。

這種分層方法允許在 Spring Cloud Stream 應用程式中靈活且特定地定製您的 Kafka 監聽器容器。

© . This site is unofficial and not affiliated with VMware.