定位到特定偏移量

為了進行 seek 操作,您的監聽器必須實現 ConsumerSeekAware 介面,它包含以下方法:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

registerSeekCallback 方法在容器啟動時以及每次分配分割槽時被呼叫。初始化後,當需要在任意時間點進行 seek 操作時,應使用此回撥。您應該儲存對此回撥的引用。如果您在多個容器(或 ConcurrentMessageListenerContainer)中使用同一個監聽器,您應該將回調儲存在 ThreadLocal 中,或者按監聽器 Thread 為鍵的其他結構中。

使用組管理時,onPartitionsAssigned 方法在分配分割槽時被呼叫。您可以使用此方法,例如透過呼叫回撥來設定分割槽的初始偏移量。您還可以使用此方法將當前執行緒的回撥與分配的分割槽關聯起來(參見下面的示例)。您必須使用回撥引數,而不是傳遞給 registerSeekCallback 的引數。從版本 2.5.5 開始,即使在使用手動分割槽分配時,也會呼叫此方法。

onPartitionsRevoked 方法在容器停止或 Kafka 撤銷分配時被呼叫。您應該丟棄當前執行緒的回撥,並移除與被撤銷分割槽的所有關聯。

回撥具有以下方法:

void seek(String topic, int partition, long offset);

void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);

void seekToBeginning(String topic, int partition);

void seekToBeginning(Collection<TopicPartitions> partitions);

void seekToEnd(String topic, int partition);

void seekToEnd(Collection<TopicPartitions> partitions);

void seekRelative(String topic, int partition, long offset, boolean toCurrent);

void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

String getGroupId();

seek 方法的兩種不同變體提供了定位到任意偏移量的方法。接受 Function 作為引數來計算偏移量的方法是在框架的 3.2 版本中新增的。此函式提供對當前偏移量(消費者返回的當前位置,即下一個要獲取的偏移量)的訪問。使用者可以在函式定義中根據消費者中的當前偏移量來決定要定位到哪個偏移量。

seekRelative 方法在版本 2.3 中新增,用於執行相對定位。

  • offset 為負且 toCurrentfalse - 相對於分割槽末尾定位。

  • offset 為正且 toCurrentfalse - 相對於分割槽開頭定位。

  • offset 為負且 toCurrenttrue - 相對於當前位置定位(後退)。

  • offset 為正且 toCurrenttrue - 相對於當前位置定位(快進)。

seekToTimestamp 方法也在版本 2.3 中新增。

當在 onIdleContaineronPartitionsAssigned 方法中為多個分割槽定位到同一時間戳時,首選第二種方法,因為它透過一次呼叫消費者的 offsetsForTimes 方法來查詢時間戳對應的偏移量效率更高。當從其他位置呼叫時,容器將收集所有時間戳定位請求,並一次呼叫 offsetsForTimes

檢測到空閒容器時,您也可以從 onIdleContainer() 執行 seek 操作。請參閱檢測空閒和無響應的 Consumer 以瞭解如何啟用空閒容器檢測。

接受集合引數的 seekToBeginning 方法很有用,例如,當處理壓縮主題並且希望在應用每次啟動時都定位到開頭時。
public class MyListener implements ConsumerSeekAware {

    ...

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }

}

要在執行時任意定位,請使用相應執行緒從 registerSeekCallback 獲取的回撥引用。

這裡有一個簡單的 Spring Boot 應用,演示瞭如何使用回撥;它向主題傳送 10 條記錄;在控制檯按下 <Enter> 會使所有分割槽定位到開頭。

@SpringBootApplication
public class SeekExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeekExampleApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(
                new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        partitions.forEach(tp -> this.callbacks.remove(tp));
        this.callbackForThread.remove();
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}

為了簡化操作,版本 2.3 添加了 AbstractConsumerSeekAware 類,該類跟蹤主題/分割槽應使用哪個回撥。以下示例展示瞭如何在容器每次空閒時,定位到每個分割槽處理的最後一條記錄。它還提供了允許任意外部呼叫將分割槽回退一條記錄的方法。

public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
    public void listen(String in) {
        ...
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {

            assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind all partitions one record.
    */
    public void rewindAllOneRecord() {
        getTopicsAndCallbacks()
            .forEach((tp, callbacks) ->
                    callbacks.forEach(callback -> callback.seekRelative(tp.topic(), tp.partition(), -1, true))
            );
    }

    /**
    * Rewind one partition one record.
    */
    public void rewindOnePartitionOneRecord(String topic, int partition) {
        getSeekCallbacksFor(new TopicPartition(topic, partition))
            .forEach(callback -> callback.seekRelative(topic, partition, -1, true));
    }

}

版本 2.6 為抽象類添加了便利方法:

  • seekToBeginning() - 將所有已分配分割槽定位到開頭。

  • seekToEnd() - 將所有已分配分割槽定位到末尾。

  • seekToTimestamp(long timestamp) - 將所有已分配分割槽定位到該時間戳表示的偏移量。

示例

public class MyListener extends AbstractConsumerSeekAware {

    @KafkaListener(...)
    void listen(...) {
        ...
    }
}

public class SomeOtherBean {

    MyListener listener;

    ...

    void someMethod() {
        this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
    }

}

從版本 3.3 開始,在 ConsumerSeekAware.ConsumerSeekCallback 介面中引入了一個新方法 getGroupId()。當您需要識別與特定 seek 回撥關聯的消費者組時,此方法特別有用。

當使用繼承 AbstractConsumerSeekAware 的類時,在一個監聽器中執行的 seek 操作可能會影響同一類中的所有監聽器。這並非總是期望的行為。為了解決這個問題,您可以使用回撥提供的 getGroupId() 方法。這允許您有選擇地執行 seek 操作,僅針對感興趣的消費者組。