查詢特定偏移量
為了定位,您的監聽器必須實現 ConsumerSeekAware,它包含以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
registerSeekCallback 在容器啟動和分割槽分配時都會被呼叫。在初始化後某個任意時間進行定位時,您應該使用此回撥。您應該儲存對回撥的引用。如果您在多個容器(或在 ConcurrentMessageListenerContainer 中)使用相同的監聽器,則應將回調儲存在 ThreadLocal 或由監聽器 Thread 鍵控的其他結構中。
使用組管理時,當分割槽被分配時會呼叫 onPartitionsAssigned。例如,您可以使用此方法透過呼叫回撥來為分割槽設定初始偏移量。您還可以使用此方法將此執行緒的回撥與分配的分割槽關聯起來(請參閱下面的示例)。您必須使用回撥引數,而不是傳遞給 registerSeekCallback 的引數。從版本 2.5.5 開始,即使使用手動分割槽分配,此方法也會被呼叫。
當容器停止或 Kafka 撤銷分配時,會呼叫 onPartitionsRevoked。您應該丟棄此執行緒的回撥並刪除與已撤銷分割槽的任何關聯。
回撥具有以下方法:
void seek(String topic, int partition, long offset);
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
String getGroupId();
seek 方法的兩種不同變體提供了一種定位到任意偏移量的方法。接受 Function 作為引數來計算偏移量的方法是在框架的 3.2 版中新增的。此函式提供了對當前偏移量的訪問(消費者返回的當前位置,即要獲取的下一個偏移量)。使用者可以根據函式定義中消費者的當前偏移量來決定要定位到哪個偏移量。
seekRelative 在 2.3 版中新增,用於執行相對定位。
-
offset為負且toCurrent為false- 相對於分割槽的末尾定位。 -
offset為正且toCurrent為false- 相對於分割槽的開頭定位。 -
offset為負且toCurrent為true- 相對於當前位置定位(倒帶)。 -
offset為正且toCurrent為true- 相對於當前位置定位(快進)。
seekToTimestamp 方法也在 2.3 版中新增。
在 onIdleContainer 或 onPartitionsAssigned 方法中為多個分割槽定位到相同時間戳時,第二種方法更受青睞,因為它透過一次呼叫消費者的 offsetsForTimes 方法來查詢時間戳的偏移量更高效。從其他位置呼叫時,容器將收集所有時間戳定位請求,並對 offsetsForTimes 進行一次呼叫。 |
當檢測到空閒容器時,您也可以從 onIdleContainer() 執行定位操作。有關如何啟用空閒容器檢測,請參閱檢測空閒和無響應的消費者。
接受集合的 seekToBeginning 方法很有用,例如,在處理壓縮主題並且您希望每次應用程式啟動時都定位到開頭時。 |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
要在執行時任意定位,請使用 registerSeekCallback 中針對相應執行緒的回撥引用。
這是一個簡單的 Spring Boot 應用程式,演示瞭如何使用回撥;它向主題傳送 10 條記錄;在控制檯中按 <Enter> 會導致所有分割槽都定位到開頭。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
為了簡化操作,2.3 版添加了 AbstractConsumerSeekAware 類,該類跟蹤哪個回撥將用於主題/分割槽。以下示例演示瞭如何在容器每次空閒時,在每個分割槽中定位到上次處理的記錄。它還包含允許任意外部呼叫將分割槽倒退一條記錄的方法。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getTopicsAndCallbacks()
.forEach((tp, callbacks) ->
callbacks.forEach(callback -> callback.seekRelative(tp.topic(), tp.partition(), -1, true))
);
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbacksFor(new TopicPartition(topic, partition))
.forEach(callback -> callback.seekRelative(topic, partition, -1, true));
}
}
2.6 版向抽象類添加了便利方法:
-
seekToBeginning()- 將所有分配的分割槽定位到開頭。 -
seekToEnd()- 將所有分配的分割槽定位到末尾。 -
seekToTimestamp(long timestamp)- 將所有分配的分割槽定位到該時間戳表示的偏移量。
示例
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listen(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}
自 3.3 版起,ConsumerSeekAware.ConsumerSeekCallback 介面中引入了一個新方法 getGroupId()。當您需要識別與特定定位回撥關聯的消費者組時,此方法特別有用。
當使用擴充套件 AbstractConsumerSeekAware 的類時,在一個監聽器中執行的定位操作可能會影響同一類中的所有監聽器。這可能並非總是所需行為。為了解決此問題,您可以使用回撥提供的 getGroupId() 方法。這允許您有選擇地執行定位操作,僅針對感興趣的消費者組。 |