連線到 Kafka
從版本 2.5 開始,這些都擴充套件了 KafkaResourceFactory
。這允許透過在配置中新增 Supplier<String>
來在執行時更改 bootstrap servers:setBootstrapServersSupplier(() -> …)
。對於所有新連線,都將呼叫此方法以獲取伺服器列表。消費者和生產者通常是長期存活的。要關閉現有生產者,請在 DefaultKafkaProducerFactory
上呼叫 reset()
。要關閉現有消費者,請在 KafkaListenerEndpointRegistry
上呼叫 stop()
(然後 start()
),或者在任何其他監聽器容器 bean 上呼叫 stop()
和 start()
。
為方便起見,框架還提供了 ABSwitchCluster
,它支援兩組 bootstrap servers,其中一組在任何時候都是活躍的。透過呼叫 setBootstrapServersSupplier()
配置 ABSwitchCluster
並將其新增到生產者和消費者工廠以及 KafkaAdmin
中。當需要切換時,呼叫 primary()
或 secondary()
並在生產者工廠上呼叫 reset()
以建立新連線;對於消費者,停止 (stop()
) 並啟動 (start()
) 所有監聽器容器。使用 @KafkaListener
時,停止 (stop()
) 並啟動 (start()
) KafkaListenerEndpointRegistry
bean。
更多資訊請參見 Javadocs。
工廠監聽器
從版本 2.5 開始,DefaultKafkaProducerFactory
和 DefaultKafkaConsumerFactory
可以配置一個 Listener
,以便在建立或關閉生產者或消費者時接收通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每種情況下,id
都是透過將 client-id
屬性(在建立後透過 metrics()
獲取)附加到工廠 beanName
屬性後面建立的,中間用 .
分隔。
這些監聽器可以用於,例如,在建立新客戶端時建立並繫結一個 Micrometer KafkaClientMetrics
例項(並在客戶端關閉時關閉它)。
框架提供了完全執行此操作的監聽器;參見 Micrometer 原生指標。
預設客戶端 ID 字首
從版本 3.2 開始,對於使用 spring.application.name
屬性定義了應用名稱的 Spring Boot 應用,該名稱現在將用作這些客戶端型別自動生成客戶端 ID 的預設字首
-
不使用消費者組的消費者客戶端
-
生產者客戶端
-
管理客戶端
這使得在伺服器端更容易識別這些客戶端,以便進行故障排除或應用配額。
客戶端型別 | 無應用名稱 | 有應用名稱 |
---|---|---|
無消費者組的消費者 |
consumer-null-1 |
myapp-consumer-1 |
使用消費者組 "mygroup" 的消費者 |
consumer-mygroup-1 |
consumer-mygroup-1 |
生產者 |
producer-1 |
myapp-producer-1 |
管理端 |
adminclient-1 |
myapp-admin-1 |