時間戳提取器

Kafka Streams 允許你基於不同的時間戳概念來控制消費者記錄的處理。預設情況下,Kafka Streams 會提取嵌入在消費者記錄中的時間戳元資料。你可以透過為每個輸入繫結提供不同的 TimestampExtractor 實現來改變此預設行為。以下是一些關於如何實現這一點的詳細資訊。

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
    return orderStream ->
            customers ->
                products -> orderStream;
}

@Bean
public TimestampExtractor timestampExtractor() {
    return new WallclockTimestampExtractor();
}

然後,你可以為每個消費者繫結設定上述 TimestampExtractor 的 bean 名稱。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"

如果你跳過為輸入消費者繫結設定自定義時間戳提取器,那麼該消費者將使用預設設定。