連線到 Kafka

從版本 2.5 開始,上述每個類都擴充套件了 KafkaResourceFactory。這允許透過向其配置新增 Supplier<String> 來在執行時更改引導伺服器:setBootstrapServersSupplier(() -> …​)。這將為所有新連線呼叫以獲取伺服器列表。消費者和生產者通常是長期執行的。要關閉現有生產者,請在 DefaultKafkaProducerFactory 上呼叫 reset()。要關閉現有消費者,請在 KafkaListenerEndpointRegistry 上呼叫 stop()(然後是 start()),和/或在任何其他監聽器容器 Bean 上呼叫 stop()start()

為方便起見,框架還提供了一個 ABSwitchCluster,它支援兩組引導伺服器;其中一組在任何給定時間都是活動的。配置 ABSwitchCluster 並透過呼叫 setBootstrapServersSupplier() 將其新增到生產者和消費者工廠以及 KafkaAdmin。當您想要切換時,呼叫 primary()secondary() 並在生產者工廠上呼叫 reset() 以建立新的連線;對於消費者,則 stop()start() 所有監聽器容器。使用 @KafkaListener 時,stop()start() KafkaListenerEndpointRegistry Bean。

有關更多資訊,請參見 Javadoc。

工廠監聽器

從版本 2.5 開始,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 可以配置一個 Listener,以便在建立或關閉生產者或消費者時接收通知。

生產者工廠監聽器
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
消費者工廠監聽器
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在每種情況下,id 都是透過將 client-id 屬性(在建立後從 metrics() 獲取)附加到工廠的 beanName 屬性,並以 . 分隔來建立的。

這些監聽器可以用於,例如,在建立新客戶端時建立並繫結一個 Micrometer KafkaClientMetrics 例項(並在客戶端關閉時關閉它)。

該框架提供了完全實現此功能的監聽器;參見 Micrometer 原生指標

預設客戶端 ID 字首

從版本 3.2 開始,對於使用 spring.application.name 屬性定義應用程式名稱的 Spring Boot 應用程式,此名稱現在用作這些客戶端型別自動生成客戶端 ID 的預設字首:

  • 不使用消費者組的消費者客戶端

  • 生產者客戶端

  • 管理客戶端

這使得在伺服器端識別這些客戶端進行故障排除或應用配額變得更加容易。

表 1. spring.application.name=myapp 的 Spring Boot 應用程式的示例客戶端 ID
客戶端型別 無應用程式名稱 有應用程式名稱

無消費者組的消費者

consumer-null-1

myapp-consumer-1

帶消費者組 "mygroup" 的消費者

consumer-mygroup-1

consumer-mygroup-1

生產者

producer-1

myapp-producer-1

管理

adminclient-1

myapp-admin-1

© . This site is unofficial and not affiliated with VMware.