Kafka Binder 監聽器容器定製器
Spring Cloud Stream 透過使用定製器 (customizers) 為訊息監聽器容器提供了強大的定製選項。本節涵蓋了適用於 Kafka 的定製器介面:ListenerContainerCustomizer
、其 Kafka 特定的擴充套件 KafkaListenerContainerCustomizer
以及專門的 ListenerContainerWithDlqAndRetryCustomizer
。
ListenerContainerCustomizer
ListenerContainerCustomizer
是 Spring Cloud Stream 中一個通用的介面,允許定製訊息監聽器容器。
用法
要使用 ListenerContainerCustomizer
,請在你的配置中建立一個實現此介面的 bean
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
return (container, destinationName, group) -> {
// Customize the container here
};
}
ListenerContainerCustomizer
介面定義了以下方法
void configure(C container, String destinationName, String group);
-
container
: 要定製的訊息監聽器容器。 -
destinationName
: 目標名稱 (主題)。 -
group
: 消費者組 ID。
KafkaListenerContainerCustomizer
KafkaListenerContainerCustomizer
介面擴充套件了 ListenerContainerCustomizer
,用於修改監聽器容器的行為,並提供對繫結特定的擴充套件 Kafka 消費者屬性的訪問。
用法
要使用 KafkaListenerContainerCustomizer
,請在你的配置中建立一個實現此介面的 bean
@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
return (container, destinationName, group, properties) -> {
// Customize the Kafka container here
};
}
KafkaListenerContainerCustomizer
介面添加了以下方法
default void configureKafkaListenerContainer(
C container,
String destinationName,
String group,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
configure(container, destinationName, group);
}
此方法擴充套件了基礎的 configure
方法,增加了一個附加引數
-
extendedConsumerProperties
: 擴充套件的消費者屬性,包括 Kafka 特定的屬性。
ListenerContainerWithDlqAndRetryCustomizer
ListenerContainerWithDlqAndRetryCustomizer
介面為涉及死信佇列 (DLQ) 和重試機制的場景提供了額外的定製選項。
用法
要使用 ListenerContainerWithDlqAndRetryCustomizer
,請在你的配置中建立一個實現此介面的 bean
@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
// Access the container here with access to the extended consumer binding properties.
};
}
ListenerContainerWithDlqAndRetryCustomizer
介面定義了以下方法
void configure(
AbstractMessageListenerContainer<?, ?> container,
String destinationName,
String group,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
BackOff backOff,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
-
container
: 要定製的 Kafka 監聽器容器。 -
destinationName
: 目標名稱 (主題)。 -
group
: 消費者組 ID。 -
dlqDestinationResolver
: 一個用於解析失敗記錄的 DLQ 目標的函式。 -
backOff
: 重試的退避策略。 -
extendedConsumerProperties
: 擴充套件的消費者屬性,包括 Kafka 特定的屬性。