監控

監控監聽器效能

從 2.3 版本開始,如果類路徑中檢測到 `Micrometer` 且應用程式上下文中存在單個 `MeterRegistry`,監聽器容器將自動為監聽器建立並更新 Micrometer `Timer`。可以透過將 `ContainerProperty` 的 `micrometerEnabled` 設定為 `false` 來停用計時器。

維護兩個計時器 - 一個用於成功呼叫監聽器,一個用於失敗。

計時器名為 `spring.kafka.listener`,並具有以下標籤:

  • name : (容器 Bean 名稱)

  • result : successfailure

  • exception : noneListenerExecutionFailedException

您可以使用 `ContainerProperties` 的 `micrometerTags` 屬性新增額外標籤。

從 2.9.8、3.0.6 版本開始,您可以在 `ContainerProperties` 的 `micrometerTagsProvider` 中提供一個函式;該函式接收 `ConsumerRecord<?, ?>` 並返回可以基於該記錄的標籤,並與 `micrometerTags` 中的任何靜態標籤合併。

對於併發容器,會為每個執行緒建立計時器,並且 `name` 標籤字尾為 `-n`,其中 n 為 `0` 到 `concurrency-1`。

監控 KafkaTemplate 效能

從 2.5 版本開始,如果類路徑中檢測到 `Micrometer` 且應用程式上下文中存在單個 `MeterRegistry`,模板將自動為傳送操作建立並更新 Micrometer `Timer`。可以透過將模板的 `micrometerEnabled` 屬性設定為 `false` 來停用計時器。

維護兩個計時器 - 一個用於成功呼叫監聽器,一個用於失敗。

計時器名為 `spring.kafka.template`,並具有以下標籤:

  • name : (模板 Bean 名稱)

  • result : successfailure

  • exception : none 或失敗的異常類名

您可以使用模板的 `micrometerTags` 屬性新增額外標籤。

從 2.9.8、3.0.6 版本開始,您可以提供一個 `KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)` 屬性;該函式接收 `ProducerRecord<?, ?>` 並返回可以基於該記錄的標籤,並與 `micrometerTags` 中的任何靜態標籤合併。

Micrometer 原生指標

從 2.5 版本開始,該框架提供了 工廠監聽器,用於在生產者和消費者建立和關閉時管理 Micrometer `KafkaClientMetrics` 例項。

要啟用此功能,只需將監聽器新增到您的生產者和消費者工廠中。

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

傳遞給監聽器的消費者/生產者 `id` 會新增到指標的標籤中,標籤名為 `spring.id`。

獲取其中一個 Kafka 指標的示例:
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

為 `StreamsBuilderFactoryBean` 提供了類似的監聽器 - 請參閱 KafkaStreams Micrometer 支援

從 3.3 版本開始,引入了一個 `KafkaMetricsSupport` 抽象類來管理 `io.micrometer.core.instrument.binder.kafka.KafkaMetrics` 繫結到所提供的 Kafka 客戶端的 `MeterRegistry` 中。這個類是上述 `MicrometerConsumerListener`、`MicrometerProducerListener` 和 `KafkaStreamsMicrometerListener` 的父類。但是,它可以用於任何 Kafka 客戶端用例。該類需要被擴充套件,並且必須呼叫其 `bindClient()` 和 `unbindClient()` API 來將 Kafka 客戶端指標與 Micrometer 收集器連線起來。

Micrometer 觀測

自 3.0 版本起,`KafkaTemplate` 和監聽器容器現在支援使用 Micrometer 進行觀測。

在 `KafkaTemplate` 和 `ContainerProperties` 上將 `observationEnabled` 設定為 `true` 以啟用觀測;這將停用 Micrometer 計時器,因為計時器現在將隨每個觀測進行管理。

Micrometer Observation 不支援批處理監聽器;這將啟用 Micrometer Timers。

有關更多資訊,請參閱 Micrometer Tracing

要向計時器/跟蹤新增標籤,請分別配置自定義的 `KafkaTemplateObservationConvention` 或 `KafkaListenerObservationConvention` 到模板或監聽器容器。

預設實現為模板觀測新增 `bean.name` 標籤,為容器新增 `listener.id` 標籤。

您可以繼承 `DefaultKafkaTemplateObservationConvention` 或 `DefaultKafkaListenerObservationConvention`,或提供全新的實現。

有關記錄的預設觀測的詳細資訊,請參閱 Micrometer 觀測文件

從 3.0.6 版本開始,您可以根據消費者或生產者記錄中的資訊,為計時器和跟蹤新增動態標籤。為此,請分別將自定義的 `KafkaListenerObservationConvention` 和/或 `KafkaTemplateObservationConvention` 新增到監聽器容器屬性或 `KafkaTemplate` 中。兩個觀測上下文中的 `record` 屬性分別包含 `ConsumerRecord` 或 `ProducerRecord`。

傳送方和接收方上下文的 `remoteServiceName` 屬性設定為 Kafka `clusterId` 屬性;此屬性由 `KafkaAdmin` 檢索。如果由於某種原因(例如缺少管理許可權)您無法檢索叢集 ID,從 3.1 版本開始,您可以在 `KafkaAdmin` 上手動設定 `clusterId`,並將其注入到 `KafkaTemplate` 和監聽器容器中。當其為 `null`(預設值)時,admin 將呼叫 `describeCluster` 管理操作從 Broker 檢索它。

批次監聽器觀測

使用批處理監聽器時,即使存在 `ObservationRegistry`,預設情況下也不會建立觀測。這是因為觀測的範圍與執行緒繫結,而對於批處理監聽器,觀測與記錄之間沒有一對一的對映。

要在批處理監聽器中啟用每條記錄的觀測,請將容器工廠屬性 `recordObservationsInBatch` 設定為 `true`。

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setRecordObservationsInBatch(true);
    return factory;
}

當此屬性為 `true` 時,將為批處理中的每條記錄建立一個觀測,但該觀測不會傳播到監聽器方法。然後,應用程式可以使用觀測上下文來跟蹤批處理中每條記錄的處理。這使您可以在批處理上下文中,也能檢視每條記錄的處理情況。

© . This site is unofficial and not affiliated with VMware.