監控

監控監聽器效能

從 2.3 版本開始,如果在 classpath 中檢測到 Micrometer 且應用程式上下文中存在單個 MeterRegistry,則監聽器容器將自動為監聽器建立和更新 Micrometer Timer。可以透過將 ContainerPropertymicrometerEnabled 設定為 false 來停用計時器。

維護兩個計時器 - 一個用於監聽器成功呼叫,一個用於失敗呼叫。

計時器命名為 spring.kafka.listener 並具有以下標籤

  • name : (容器 bean 名稱)

  • result : successfailure

  • exception : noneListenerExecutionFailedException

可以使用 ContainerPropertiesmicrometerTags 屬性新增額外的標籤。

從 2.9.8、3.0.6 版本開始,您可以在 ContainerPropertiesmicrometerTagsProvider 中提供一個函式;該函式接收 ConsumerRecord<?, ?> 並返回可以基於該記錄生成的標籤,這些標籤將與 micrometerTags 中的任何靜態標籤合併。

對於併發容器,會為每個執行緒建立計時器,並且 name 標籤會附加 -n 字尾,其中 n 是 0concurrency-1

監控 KafkaTemplate 效能

從 2.5 版本開始,如果在 classpath 中檢測到 Micrometer 且應用程式上下文中存在單個 MeterRegistry,則模板將自動為傳送操作建立和更新 Micrometer Timer。可以透過將模板的 micrometerEnabled 屬性設定為 false 來停用計時器。

維護兩個計時器 - 一個用於監聽器成功呼叫,一個用於失敗呼叫。

計時器命名為 spring.kafka.template 並具有以下標籤

  • name : (模板 bean 名稱)

  • result : successfailure

  • exception : none 或失敗時的異常類名

可以使用模板的 micrometerTags 屬性新增額外的標籤。

從 2.9.8、3.0.6 版本開始,您可以提供一個 KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) 屬性;該函式接收 ProducerRecord<?, ?> 並返回可以基於該記錄生成的標籤,這些標籤將與 micrometerTags 中的任何靜態標籤合併。

Micrometer 原生指標

從 2.5 版本開始,該框架提供 Factory Listeners,用於在生產者和消費者建立和關閉時管理 Micrometer KafkaClientMetrics 例項。

要啟用此功能,只需將監聽器新增到您的生產者和消費者工廠中

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

傳遞給監聽器的 consumer/producer id 會以標籤名 spring.id 新增到指標的標籤中。

獲取一個 Kafka 指標的示例
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

StreamsBuilderFactoryBean 提供了類似的監聽器 - 請參閱 KafkaStreams Micrometer 支援

從 3.3 版本開始,引入了一個 KafkaMetricsSupport 抽象類,用於管理 io.micrometer.core.instrument.binder.kafka.KafkaMetrics 與提供的 Kafka 客戶端的 MeterRegistry 的繫結。這個類是上面提到的 MicrometerConsumerListenerMicrometerProducerListenerKafkaStreamsMicrometerListener 的超類。然而,它可用於任何 Kafka 客戶端用例。該類需要擴充套件,並且必須呼叫其 bindClient()unbindClient() API 來將 Kafka 客戶端指標連線到 Micrometer 收集器。

Micrometer Observation

從 3.0 版本開始,現在支援將 Micrometer 用於 observation,適用於 KafkaTemplate 和監聽器容器。

KafkaTemplateContainerProperties 上將 observationEnabled 設定為 true 以啟用 observation;這將停用 Micrometer Timers,因為現在計時器將透過每次 observation 進行管理。

Micrometer Observation 不支援批次監聽器;這將啟用 Micrometer Timers

請參閱 Micrometer Tracing 以獲取更多資訊。

要向計時器/跟蹤新增標籤,請分別為模板或監聽器容器配置一個自定義的 KafkaTemplateObservationConventionKafkaListenerObservationConvention

預設實現為模板 observations 新增 bean.name 標籤,為容器新增 listener.id 標籤。

您可以繼承 DefaultKafkaTemplateObservationConventionDefaultKafkaListenerObservationConvention,也可以提供全新的實現。

請參閱 Micrometer Observation 文件,瞭解記錄的預設 observations 詳情。

從 3.0.6 版本開始,您可以根據 consumer 或 producer 記錄中的資訊向計時器和跟蹤新增動態標籤。為此,請分別為監聽器容器屬性或 KafkaTemplate 新增一個自定義的 KafkaListenerObservationConvention 和/或 KafkaTemplateObservationConvention。兩個 observation 上下文中的 record 屬性分別包含 ConsumerRecordProducerRecord

傳送方和接收方上下文的 remoteServiceName 屬性設定為 Kafka clusterId 屬性;這由 KafkaAdmin 獲取。如果由於某種原因(可能缺乏管理許可權)無法獲取 cluster id,從 3.1 版本開始,您可以在 KafkaAdmin 上設定一個手動 clusterId,並將其注入到 KafkaTemplate 和監聽器容器中。當其為 null(預設值)時,admin 將呼叫 describeCluster 管理操作從 broker 獲取它。