響應式 Kafka 繫結器中的可觀察性

本節描述瞭如何在響應式 Kafka 繫結器中啟用基於 Micrometer 的可觀測性。

生產者繫結

生產者繫結內建支援可觀測性。要啟用它,請設定以下屬性

spring.cloud.stream.kafka.binder.enable-observation

當此屬性設定為 true 時,您可以觀察記錄的釋出。使用 StreamBridge 和常規 Supplier<?> bean 釋出記錄都可以被觀察到。

消費者繫結

在消費者端啟用可觀測性比在生產者端更復雜。消費者繫結有兩個起點

  1. 透過生產者繫結釋出資料的主題

  2. 在 Spring Cloud Stream 外部產生資料的主題

在第一種情況下,應用程式理想情況下希望將可觀測性頭資訊傳遞到消費者入站。在第二種情況下,如果沒有啟動上游觀察,它將啟動一個新的觀察。

示例:具有可觀測性的函式

@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {

	return s -> s.flatMap(record -> {
		Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
		null,
		KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
		() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
		observationRegistry
		);

		return Mono.deferContextual(contextView -> Mono.just(record)
			.map(rec -> new String(rec.value()).toLowerCase())
			.map(rec -> MessageBuilder.withPayload(rec)
				.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
				.build()))
			.doOnTerminate(receiverObservation::stop)
			.doOnError(receiverObservation::error)
			.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
    });
}

在此示例中

  1. 當接收到一條記錄時,會建立一個觀察。

  2. 如果存在上游觀察,它將成為 KafkaRecordReceiverContext 的一部分。

  3. 使用延遲上下文建立一個 Mono

  4. 當呼叫 map 操作時,上下文可以訪問正確的觀察。

  5. flatMap 操作的結果以 Flux<Message<?>> 的形式傳送回繫結。

  6. 出站記錄將具有與輸入繫結相同的可觀測性頭資訊。

示例:具有可觀測性的消費者

@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
	return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
			null,
			KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
			() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
			observationRegistry).observe(() -> System.out.println(record)))
		.subscribe();
}

在這種情況下

  1. 由於沒有輸出繫結,因此在 Flux 上使用 doOnNext 而不是 flatMap

  2. 直接呼叫 observe 會啟動觀察並在完成後正確關閉它。

© . This site is unofficial and not affiliated with VMware.