連線到 Kafka

從版本 2.5 開始,這些都擴充套件了 KafkaResourceFactory。這允許透過在配置中新增 Supplier<String> 來在執行時更改 bootstrap servers:setBootstrapServersSupplier(() -> …​)。對於所有新連線,都將呼叫此方法以獲取伺服器列表。消費者和生產者通常是長期存活的。要關閉現有生產者,請在 DefaultKafkaProducerFactory 上呼叫 reset()。要關閉現有消費者,請在 KafkaListenerEndpointRegistry 上呼叫 stop()(然後 start()),或者在任何其他監聽器容器 bean 上呼叫 stop()start()

為方便起見,框架還提供了 ABSwitchCluster,它支援兩組 bootstrap servers,其中一組在任何時候都是活躍的。透過呼叫 setBootstrapServersSupplier() 配置 ABSwitchCluster 並將其新增到生產者和消費者工廠以及 KafkaAdmin 中。當需要切換時,呼叫 primary()secondary() 並在生產者工廠上呼叫 reset() 以建立新連線;對於消費者,停止 (stop()) 並啟動 (start()) 所有監聽器容器。使用 @KafkaListener 時,停止 (stop()) 並啟動 (start()) KafkaListenerEndpointRegistry bean。

更多資訊請參見 Javadocs。

工廠監聽器

從版本 2.5 開始,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 可以配置一個 Listener,以便在建立或關閉生產者或消費者時接收通知。

生產者工廠監聽器
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
消費者工廠監聽器
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在每種情況下,id 都是透過將 client-id 屬性(在建立後透過 metrics() 獲取)附加到工廠 beanName 屬性後面建立的,中間用 . 分隔。

這些監聽器可以用於,例如,在建立新客戶端時建立並繫結一個 Micrometer KafkaClientMetrics 例項(並在客戶端關閉時關閉它)。

框架提供了完全執行此操作的監聽器;參見 Micrometer 原生指標

預設客戶端 ID 字首

從版本 3.2 開始,對於使用 spring.application.name 屬性定義了應用名稱的 Spring Boot 應用,該名稱現在將用作這些客戶端型別自動生成客戶端 ID 的預設字首

  • 不使用消費者組的消費者客戶端

  • 生產者客戶端

  • 管理客戶端

這使得在伺服器端更容易識別這些客戶端,以便進行故障排除或應用配額。

表 1. spring.application.name=myapp 的 Spring Boot 應用生成的示例客戶端 ID
客戶端型別 無應用名稱 有應用名稱

無消費者組的消費者

consumer-null-1

myapp-consumer-1

使用消費者組 "mygroup" 的消費者

consumer-mygroup-1

consumer-mygroup-1

生產者

producer-1

myapp-producer-1

管理端

adminclient-1

myapp-admin-1