監控
監控監聽器效能
從 2.3 版本開始,如果類路徑中檢測到 `Micrometer` 且應用程式上下文中存在單個 `MeterRegistry`,監聽器容器將自動為監聽器建立並更新 Micrometer `Timer`。可以透過將 `ContainerProperty` 的 `micrometerEnabled` 設定為 `false` 來停用計時器。
維護兩個計時器 - 一個用於成功呼叫監聽器,一個用於失敗。
計時器名為 `spring.kafka.listener`,並具有以下標籤:
-
name: (容器 Bean 名稱) -
result:success或failure -
exception:none或ListenerExecutionFailedException
您可以使用 `ContainerProperties` 的 `micrometerTags` 屬性新增額外標籤。
從 2.9.8、3.0.6 版本開始,您可以在 `ContainerProperties` 的 `micrometerTagsProvider` 中提供一個函式;該函式接收 `ConsumerRecord<?, ?>` 並返回可以基於該記錄的標籤,並與 `micrometerTags` 中的任何靜態標籤合併。
| 對於併發容器,會為每個執行緒建立計時器,並且 `name` 標籤字尾為 `-n`,其中 n 為 `0` 到 `concurrency-1`。 |
監控 KafkaTemplate 效能
從 2.5 版本開始,如果類路徑中檢測到 `Micrometer` 且應用程式上下文中存在單個 `MeterRegistry`,模板將自動為傳送操作建立並更新 Micrometer `Timer`。可以透過將模板的 `micrometerEnabled` 屬性設定為 `false` 來停用計時器。
維護兩個計時器 - 一個用於成功呼叫監聽器,一個用於失敗。
計時器名為 `spring.kafka.template`,並具有以下標籤:
-
name: (模板 Bean 名稱) -
result:success或failure -
exception:none或失敗的異常類名
您可以使用模板的 `micrometerTags` 屬性新增額外標籤。
從 2.9.8、3.0.6 版本開始,您可以提供一個 `KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)` 屬性;該函式接收 `ProducerRecord<?, ?>` 並返回可以基於該記錄的標籤,並與 `micrometerTags` 中的任何靜態標籤合併。
Micrometer 原生指標
從 2.5 版本開始,該框架提供了 工廠監聽器,用於在生產者和消費者建立和關閉時管理 Micrometer `KafkaClientMetrics` 例項。
要啟用此功能,只需將監聽器新增到您的生產者和消費者工廠中。
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
傳遞給監聽器的消費者/生產者 `id` 會新增到指標的標籤中,標籤名為 `spring.id`。
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
為 `StreamsBuilderFactoryBean` 提供了類似的監聽器 - 請參閱 KafkaStreams Micrometer 支援。
從 3.3 版本開始,引入了一個 `KafkaMetricsSupport` 抽象類來管理 `io.micrometer.core.instrument.binder.kafka.KafkaMetrics` 繫結到所提供的 Kafka 客戶端的 `MeterRegistry` 中。這個類是上述 `MicrometerConsumerListener`、`MicrometerProducerListener` 和 `KafkaStreamsMicrometerListener` 的父類。但是,它可以用於任何 Kafka 客戶端用例。該類需要被擴充套件,並且必須呼叫其 `bindClient()` 和 `unbindClient()` API 來將 Kafka 客戶端指標與 Micrometer 收集器連線起來。
Micrometer 觀測
自 3.0 版本起,`KafkaTemplate` 和監聽器容器現在支援使用 Micrometer 進行觀測。
在 `KafkaTemplate` 和 `ContainerProperties` 上將 `observationEnabled` 設定為 `true` 以啟用觀測;這將停用 Micrometer 計時器,因為計時器現在將隨每個觀測進行管理。
| Micrometer Observation 不支援批處理監聽器;這將啟用 Micrometer Timers。 |
有關更多資訊,請參閱 Micrometer Tracing。
要向計時器/跟蹤新增標籤,請分別配置自定義的 `KafkaTemplateObservationConvention` 或 `KafkaListenerObservationConvention` 到模板或監聽器容器。
預設實現為模板觀測新增 `bean.name` 標籤,為容器新增 `listener.id` 標籤。
您可以繼承 `DefaultKafkaTemplateObservationConvention` 或 `DefaultKafkaListenerObservationConvention`,或提供全新的實現。
有關記錄的預設觀測的詳細資訊,請參閱 Micrometer 觀測文件。
從 3.0.6 版本開始,您可以根據消費者或生產者記錄中的資訊,為計時器和跟蹤新增動態標籤。為此,請分別將自定義的 `KafkaListenerObservationConvention` 和/或 `KafkaTemplateObservationConvention` 新增到監聽器容器屬性或 `KafkaTemplate` 中。兩個觀測上下文中的 `record` 屬性分別包含 `ConsumerRecord` 或 `ProducerRecord`。
傳送方和接收方上下文的 `remoteServiceName` 屬性設定為 Kafka `clusterId` 屬性;此屬性由 `KafkaAdmin` 檢索。如果由於某種原因(例如缺少管理許可權)您無法檢索叢集 ID,從 3.1 版本開始,您可以在 `KafkaAdmin` 上手動設定 `clusterId`,並將其注入到 `KafkaTemplate` 和監聽器容器中。當其為 `null`(預設值)時,admin 將呼叫 `describeCluster` 管理操作從 Broker 檢索它。
批次監聽器觀測
使用批處理監聽器時,即使存在 `ObservationRegistry`,預設情況下也不會建立觀測。這是因為觀測的範圍與執行緒繫結,而對於批處理監聽器,觀測與記錄之間沒有一對一的對映。
要在批處理監聽器中啟用每條記錄的觀測,請將容器工廠屬性 `recordObservationsInBatch` 設定為 `true`。
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setRecordObservationsInBatch(true);
return factory;
}
當此屬性為 `true` 時,將為批處理中的每條記錄建立一個觀測,但該觀測不會傳播到監聽器方法。然後,應用程式可以使用觀測上下文來跟蹤批處理中每條記錄的處理。這使您可以在批處理上下文中,也能檢視每條記錄的處理情況。