Reactive Kafka Binder 中的可觀測性
本節描述瞭如何在響應式 Kafka Binder 中啟用基於 Micrometer 的可觀測性。
生產者繫結
生產者繫結內建了可觀測性支援。要啟用它,請設定以下屬性
spring.cloud.stream.kafka.binder.enable-observation
將此屬性設定為 true
時,您可以觀察記錄的釋出。使用 StreamBridge
釋出記錄以及常規的 Supplier<?>
bean 都可以被觀察到。
消費者繫結
在消費者端啟用可觀測性比生產者端更復雜。消費者繫結有兩個起點
-
透過生產者繫結釋出資料的 Topic
-
在 Spring Cloud Stream 外部生產資料的 Topic
第一種情況下,應用理想情況下希望將可觀測性頭部資訊傳遞到消費者入站。第二種情況下,如果上游沒有開始觀察,則會啟動一個新的觀察。
示例:具有可觀測性的 Function
@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {
return s -> s.flatMap(record -> {
Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
observationRegistry
);
return Mono.deferContextual(contextView -> Mono.just(record)
.map(rec -> new String(rec.value()).toLowerCase())
.map(rec -> MessageBuilder.withPayload(rec)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
.build()))
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
});
}
在此示例中
-
接收到記錄時,將建立一個觀察。
-
如果存在上游觀察,它將成為
KafkaRecordReceiverContext
的一部分。 -
建立一個帶有延遲上下文的
Mono
。 -
呼叫
map
操作時,上下文可以訪問正確的觀察。 -
flatMap
操作的結果作為Flux<Message<?>>
傳送回繫結。 -
出站記錄將包含與入站繫結相同的可觀測性頭部資訊。
示例:具有可觀測性的 Consumer
@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
observationRegistry).observe(() -> System.out.println(record)))
.subscribe();
}
在此情況下
-
由於沒有出站繫結,所以在
Flux
上使用了doOnNext
而不是flatMap
。 -
直接呼叫
observe
會啟動觀察,並在完成後正確關閉它。