使用 Spring Cloud Sleuth 進行追蹤
當 Spring Cloud Sleuth 位於基於 Spring Cloud Stream Kafka Streams binder 的應用程式的類路徑中時,其消費者和生產者都會自動新增追蹤資訊。但是,為了追蹤任何應用程式特定的操作,這些操作需要由使用者程式碼明確地進行插樁。這可以透過在應用程式中注入 Spring Cloud Sleuth 的 KafkaStreamsTracing bean,然後透過此注入的 bean 呼叫各種 Kafka Streams 操作來完成。以下是一些使用示例。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
return new KeyValue<>(value.getRegion(),
value.getClicks());
}))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
.toStream());
}
在上面的示例中,有兩處添加了明確的追蹤插樁。首先,我們正在記錄來自傳入 KStream 的鍵/值資訊。當記錄此資訊時,相關的 span 和 trace ID 也會被記錄,以便監控系統可以追蹤它們並與相同的 span ID 相關聯。其次,當我們呼叫 map 操作時,我們沒有直接在 KStream 類上呼叫它,而是將其封裝在 transform 操作中,然後從 KafkaStreamsTracing 呼叫 map。在這種情況下,記錄的訊息也將包含 span ID 和 trace ID。
這是另一個示例,我們使用低階轉換器 API 訪問各種 Kafka Streams 頭部。當 spring-cloud-sleuth 在類路徑中時,所有追蹤頭部也可以這樣訪問。
@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
return input -> input.transform(kafkaStreamsTracing.transformer(
"transformer-1",
() -> new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
LOG.info("Headers: " + this.context.headers());
LOG.info("K/V:" + key + "/" + value);
// More transformations, business logic execution, etc. go here.
return KeyValue.pair(key, value);
}
@Override
public void close() {
}
}));
}