@KafkaListener 註解
@KafkaListener 註解用於將 bean 方法指定為偵聽器容器的偵聽器。該 bean 被封裝在一個 MessagingMessageListenerAdapter 中,該介面卡配置了各種功能,例如,在必要時將資料轉換為匹配方法引數的轉換器。
您可以使用 #{…} 或屬性佔位符 (${…}) 透過 SpEL 配置註解上的大多數屬性。有關更多資訊,請參閱 Javadoc。
記錄偵聽器
@KafkaListener 註解提供了一種用於簡單 POJO 偵聽器的機制。以下示例展示瞭如何使用它:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
此機制需要在您的 @Configuration 類之一上使用 @EnableKafka 註解以及一個偵聽器容器工廠,該工廠用於配置底層的 ConcurrentMessageListenerContainer。預設情況下,期望有一個名為 kafkaListenerContainerFactory 的 bean。以下示例展示瞭如何使用 ConcurrentMessageListenerContainer:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
return props;
}
}
請注意,要設定容器屬性,您必須使用工廠上的 getContainerProperties() 方法。它用作注入到容器中的實際屬性的模板。
從 2.1.1 版本開始,您現在可以為由註解建立的消費者設定 client.id 屬性。clientIdPrefix 將以 -n 為字尾,其中 n 是一個整數,表示使用併發時容器的編號。
從 2.2 版本開始,您現在可以使用註解本身的屬性來覆蓋容器工廠的 concurrency 和 autoStartup 屬性。這些屬性可以是簡單值、屬性佔位符或 SpEL 表示式。以下示例展示瞭如何實現:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
顯式分割槽分配
您還可以使用顯式主題和分割槽(以及可選的初始偏移量)配置 POJO 偵聽器。以下示例展示瞭如何實現:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
您可以在 partitions 或 partitionOffsets 屬性中指定每個分割槽,但不能同時指定兩者。
與大多數註解屬性一樣,您可以使用 SpEL 表示式;有關如何生成大量分割槽的示例,請參閱手動分配所有分割槽。
從 2.5.5 版本開始,您可以將初始偏移量應用於所有已分配的分割槽:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
* 萬用字元表示 partitions 屬性中的所有分割槽。每個 @TopicPartition 中只能有一個帶有萬用字元的 @PartitionOffset。
此外,當偵聽器實現 ConsumerSeekAware 時,即使使用手動分配,也會呼叫 onPartitionsAssigned。這允許,例如,在該時間進行任何任意的查詢操作。
從 2.6.4 版本開始,您可以指定一個逗號分隔的分割槽列表或分割槽範圍:
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
該範圍是包含的;上面的示例將分配分割槽 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15。
在指定初始偏移量時可以使用相同的技術:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初始偏移量將應用於所有 6 個分割槽。
自 3.2 版本以來,@PartitionOffset 支援 SeekPosition.END、SeekPosition.BEGINNING、SeekPosition.TIMESTAMP,seekPosition 匹配 SeekPosition 列舉名稱
@KafkaListener(id = "seekPositionTime", topicPartitions = {
@TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
@PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
@PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
})
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
如果將 seekPosition 設定為 END 或 BEGINNING,則將忽略 initialOffset 和 relativeToCurrent。如果將 seekPosition 設定為 TIMESTAMP,則 initialOffset 表示時間戳。
手動確認
使用手動 AckMode 時,您還可以為偵聽器提供 Acknowledgment。要啟用手動 AckMode,您需要將 ContainerProperties 中的確認模式設定為適當的手動模式。以下示例還展示瞭如何使用不同的容器工廠。此自定義容器工廠必須透過呼叫 getContainerProperties(),然後在其上呼叫 setAckMode 來將 AckMode 設定為手動型別。否則,Acknowledgment 物件將為 null。
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
消費者記錄元資料
最後,記錄的元資料可從訊息頭中獲取。您可以使用以下頭名稱來檢索訊息的頭:
-
KafkaHeaders.OFFSET -
KafkaHeaders.RECEIVED_KEY -
KafkaHeaders.RECEIVED_TOPIC -
KafkaHeaders.RECEIVED_PARTITION -
KafkaHeaders.RECEIVED_TIMESTAMP -
KafkaHeaders.TIMESTAMP_TYPE
從 2.5 版本開始,如果傳入記錄的鍵為 null,則 RECEIVED_KEY 不存在;以前,該頭會填充 null 值。此更改是為了使框架與 spring-messaging 約定保持一致,即 null 值的頭不存在。
以下示例展示瞭如何使用這些頭:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
引數註解(@Payload、@Header)必須在偵聽器方法的具體實現上指定;如果它們定義在介面上,將不會被檢測到。 |
從 2.5 版本開始,您可以選擇在 ConsumerRecordMetadata 引數中接收記錄元資料,而不是使用離散的頭。
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
這包含了 ConsumerRecord 中除鍵和值之外的所有資料。
批處理偵聽器
從 1.1 版本開始,您可以配置 @KafkaListener 方法以接收從消費者輪詢收到的整個批處理消費者記錄。
| 非阻塞重試不支援批處理偵聽器。 |
要配置偵聽器容器工廠以建立批處理偵聽器,您可以設定 batchListener 屬性。以下示例展示瞭如何實現:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
從 2.8 版本開始,您可以使用 @KafkaListener 註解上的 batch 屬性來覆蓋工廠的 batchListener 屬性。這與容器錯誤處理器的更改一起,允許同一個工廠用於記錄和批處理偵聽器。 |
從 2.9.6 版本開始,容器工廠為 recordMessageConverter 和 batchMessageConverter 屬性提供了單獨的設定器。以前,只有一個屬性 messageConverter,它適用於記錄和批處理偵聽器。 |
以下示例展示瞭如何接收有效負載列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
主題、分割槽、偏移量等在與有效負載並行的頭中可用。以下示例展示瞭如何使用這些頭:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
或者,您可以接收一個 List 的 Message<?> 物件,每個訊息中包含每個偏移量和其他詳細資訊,但它必須是方法上定義的唯一引數(除了可選的 Acknowledgment(在使用手動提交時)和/或 Consumer<?, ?> 引數)。以下示例展示瞭如何實現:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
在這種情況下,不會對有效負載執行任何轉換。
如果 BatchMessagingMessageConverter 配置了 RecordMessageConverter,您還可以向 Message 引數新增一個泛型型別,並且有效負載將被轉換。有關更多資訊,請參閱使用批處理偵聽器進行有效負載轉換。
您還可以接收一個 ConsumerRecord<?, ?> 物件列表,但它必須是方法上定義的唯一引數(除了可選的 Acknowledgment(在使用手動提交時)和 Consumer<?, ?> 引數)。以下示例展示瞭如何實現:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
從 2.2 版本開始,偵聽器可以接收 poll() 方法返回的完整 ConsumerRecords<?, ?> 物件,允許偵聽器訪問其他方法,例如 partitions()(返回列表中的 TopicPartition 例項)和 records(TopicPartition)(獲取選擇性記錄)。同樣,這必須是方法上的唯一引數(除了可選的 Acknowledgment(在使用手動提交時)或 Consumer<?, ?> 引數)。以下示例展示瞭如何實現:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
如果容器工廠配置了 RecordFilterStrategy,則對於 ConsumerRecords<?, ?> 偵聽器,它將被忽略,併發出 WARN 日誌訊息。只有在使用 List<?> 形式的偵聽器時,才能使用批處理偵聽器過濾記錄。預設情況下,記錄是逐個過濾的;從 2.8 版本開始,您可以重寫 filterBatch 以一次性過濾整個批處理。 |
註解屬性
從 2.0 版本開始,id 屬性(如果存在)用作 Kafka 消費者 group.id 屬性,如果消費者工廠中配置了該屬性,則會覆蓋它。您也可以顯式設定 groupId 或將 idIsGroup 設定為 false,以恢復以前使用消費者工廠 group.id 的行為。
您可以在大多數註解屬性中使用屬性佔位符或 SpEL 表示式,如下例所示:
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
從 2.1.2 版本開始,SpEL 表示式支援一個特殊標記:__listener。它是一個偽 bean 名稱,表示此註解所在當前 bean 例項。
考慮以下示例
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
鑑於上例中的 bean,我們可以使用以下內容:
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
如果,在極不可能的情況下,您有一個實際名為 __listener 的 bean,您可以透過使用 beanRef 屬性更改表示式標記。以下示例展示瞭如何實現:
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
從 2.2.4 版本開始,您可以直接在註解上指定 Kafka 消費者屬性,這些屬性將覆蓋消費者工廠中配置的同名屬性。您不能以這種方式指定 group.id 和 client.id 屬性;它們將被忽略;請使用 groupId 和 clientIdPrefix 註解屬性。
這些屬性以單個字串的形式指定,採用正常的 Java Properties 檔案格式:foo:bar、foo=bar 或 foo bar,如下例所示:
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下是使用 RoutingKafkaTemplate示例中相應偵聽器的示例。
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}