監控
監控監聽器效能
從 2.3 版本開始,如果在 classpath 中檢測到 Micrometer 且應用程式上下文中存在單個 MeterRegistry,則監聽器容器將自動為監聽器建立和更新 Micrometer Timer
。可以透過將 ContainerProperty
的 micrometerEnabled
設定為 false
來停用計時器。
維護兩個計時器 - 一個用於監聽器成功呼叫,一個用於失敗呼叫。
計時器命名為 spring.kafka.listener
並具有以下標籤
-
name
: (容器 bean 名稱) -
result
:success
或failure
-
exception
:none
或ListenerExecutionFailedException
可以使用 ContainerProperties
的 micrometerTags
屬性新增額外的標籤。
從 2.9.8、3.0.6 版本開始,您可以在 ContainerProperties
的 micrometerTagsProvider
中提供一個函式;該函式接收 ConsumerRecord<?, ?>
並返回可以基於該記錄生成的標籤,這些標籤將與 micrometerTags
中的任何靜態標籤合併。
對於併發容器,會為每個執行緒建立計時器,並且 name 標籤會附加 -n 字尾,其中 n 是 0 到 concurrency-1 。 |
監控 KafkaTemplate 效能
從 2.5 版本開始,如果在 classpath 中檢測到 Micrometer 且應用程式上下文中存在單個 MeterRegistry,則模板將自動為傳送操作建立和更新 Micrometer Timer
。可以透過將模板的 micrometerEnabled
屬性設定為 false
來停用計時器。
維護兩個計時器 - 一個用於監聽器成功呼叫,一個用於失敗呼叫。
計時器命名為 spring.kafka.template
並具有以下標籤
-
name
: (模板 bean 名稱) -
result
:success
或failure
-
exception
:none
或失敗時的異常類名
可以使用模板的 micrometerTags
屬性新增額外的標籤。
從 2.9.8、3.0.6 版本開始,您可以提供一個 KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)
屬性;該函式接收 ProducerRecord<?, ?>
並返回可以基於該記錄生成的標籤,這些標籤將與 micrometerTags
中的任何靜態標籤合併。
Micrometer 原生指標
從 2.5 版本開始,該框架提供 Factory Listeners,用於在生產者和消費者建立和關閉時管理 Micrometer KafkaClientMetrics
例項。
要啟用此功能,只需將監聽器新增到您的生產者和消費者工廠中
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
傳遞給監聽器的 consumer/producer id
會以標籤名 spring.id
新增到指標的標籤中。
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
為 StreamsBuilderFactoryBean
提供了類似的監聽器 - 請參閱 KafkaStreams Micrometer 支援。
從 3.3 版本開始,引入了一個 KafkaMetricsSupport
抽象類,用於管理 io.micrometer.core.instrument.binder.kafka.KafkaMetrics
與提供的 Kafka 客戶端的 MeterRegistry
的繫結。這個類是上面提到的 MicrometerConsumerListener
、MicrometerProducerListener
和 KafkaStreamsMicrometerListener
的超類。然而,它可用於任何 Kafka 客戶端用例。該類需要擴充套件,並且必須呼叫其 bindClient()
和 unbindClient()
API 來將 Kafka 客戶端指標連線到 Micrometer 收集器。
Micrometer Observation
從 3.0 版本開始,現在支援將 Micrometer 用於 observation,適用於 KafkaTemplate
和監聽器容器。
在 KafkaTemplate
和 ContainerProperties
上將 observationEnabled
設定為 true
以啟用 observation;這將停用 Micrometer Timers,因為現在計時器將透過每次 observation 進行管理。
Micrometer Observation 不支援批次監聽器;這將啟用 Micrometer Timers |
請參閱 Micrometer Tracing 以獲取更多資訊。
要向計時器/跟蹤新增標籤,請分別為模板或監聽器容器配置一個自定義的 KafkaTemplateObservationConvention
或 KafkaListenerObservationConvention
。
預設實現為模板 observations 新增 bean.name
標籤,為容器新增 listener.id
標籤。
您可以繼承 DefaultKafkaTemplateObservationConvention
或 DefaultKafkaListenerObservationConvention
,也可以提供全新的實現。
請參閱 Micrometer Observation 文件,瞭解記錄的預設 observations 詳情。
從 3.0.6 版本開始,您可以根據 consumer 或 producer 記錄中的資訊向計時器和跟蹤新增動態標籤。為此,請分別為監聽器容器屬性或 KafkaTemplate
新增一個自定義的 KafkaListenerObservationConvention
和/或 KafkaTemplateObservationConvention
。兩個 observation 上下文中的 record
屬性分別包含 ConsumerRecord
或 ProducerRecord
。
傳送方和接收方上下文的 remoteServiceName
屬性設定為 Kafka clusterId
屬性;這由 KafkaAdmin
獲取。如果由於某種原因(可能缺乏管理許可權)無法獲取 cluster id,從 3.1 版本開始,您可以在 KafkaAdmin
上設定一個手動 clusterId
,並將其注入到 KafkaTemplate
和監聽器容器中。當其為 null
(預設值)時,admin 將呼叫 describeCluster
管理操作從 broker 獲取它。